Repo-Init
 
Loading...
Searching...
No Matches
ZeroMQServer.cpp
Go to the documentation of this file.
2
3#include <format>
4
5#include "Version.h"
7#include "utils/Hasher.hpp"
8
9#include <spdlog/spdlog.h>
10
11constexpr uint32_t LOG_LEVEL_ID = (static_cast<uint32_t>('L') | (static_cast<uint32_t>('O') << 8) |
12 (static_cast<uint32_t>('G') << 16) | (static_cast<uint32_t>('L') << 24));
13constexpr uint32_t VERSION_INFO_ID = (static_cast<uint32_t>('V') | (static_cast<uint32_t>('E') << 8) |
14 (static_cast<uint32_t>('R') << 16) | (static_cast<uint32_t>('I') << 24));
15constexpr uint32_t PING_PONG_ID = (static_cast<uint32_t>('P') | (static_cast<uint32_t>('I') << 8) |
16 (static_cast<uint32_t>('N') << 16) | (static_cast<uint32_t>('G') << 24));
17constexpr uint32_t STATUS_CHECK_ID = (static_cast<uint32_t>('S') | (static_cast<uint32_t>('C') << 8) |
18 (static_cast<uint32_t>('H') << 16) | (static_cast<uint32_t>('K') << 24));
19/* ################################################################################### */
20/* ############################# MAKE MODIFICATIONS HERE ############################# */
21/* ################################################################################### */
22
23/* ################################################################################### */
24/* ################################ END MODIFICATIONS ################################ */
25/* ################################################################################### */
26
28{
29 auto recvMsgs = recvMessages();
30
31 if (!recvMsgs.empty())
32 {
33 std::vector<zmq::message_t> replyMsgs;
34
35 ZeroMQServerStats serverStats;
36 serverStats.processingTimeStart = std::chrono::high_resolution_clock::now();
37 serverStats.isSuccessful = messageCallback() && messageCallback()(recvMsgs, replyMsgs);
38
39 if (size_t nSentMsg = sendMessages(replyMsgs); nSentMsg != replyMsgs.size())
40 {
41 spdlog::warn("Can't send whole reply: Sent messages {} / {}", nSentMsg, replyMsgs.size());
42 }
43 serverStats.processingTimeEnd = std::chrono::high_resolution_clock::now();
44
45 if (_stats)
46 {
47 _stats->consumeStats(recvMsgs, replyMsgs, serverStats);
48 }
49 }
50}
51
52void ZeroMQServer::threadFunc(const std::stop_token &stopToken) noexcept
53{
54 spdlog::info("ZeroMQ server started");
55 while (!stopToken.stop_requested())
56 {
57 try
58 {
59 update();
60 if (_checkFlag)
61 {
62 _checkFlag->test_and_set();
63 }
64 }
65 catch (const std::exception &e)
66 {
67 spdlog::error("ZeroMQ server failed: {}", e.what());
68 }
69 }
70 spdlog::info("ZeroMQ server stopped");
71}
72
73ZeroMQServer::ZeroMQServer(const std::string &hostAddr, std::shared_ptr<std::atomic_flag> checkFlag,
74 const std::shared_ptr<prometheus::Registry> &reg, const std::string &prependName)
75 : ZeroMQ(zmq::socket_type::rep, hostAddr, true), _checkFlag(std::move(checkFlag))
76{
77 if (reg)
78 {
79 _stats = std::make_unique<ZeroMQStats>(reg, prependName);
80 }
81
82 startMonitoring(getSocket().get(), std::format("{}{}{}", "inproc://", constHasher(hostAddr.c_str()), ".rep"));
83}
84
86{
87 if (start())
88 {
89 _serverThread = std::make_unique<std::jthread>([this](const std::stop_token &sToken) { threadFunc(sToken); });
90 return true;
91 }
92 return false;
93}
94
96{
97 if (_serverThread)
98 {
99 _serverThread.reset();
100 }
101
102 stop();
103}
104
105bool ZeroMQServerMessageCallback(const std::vector<zmq::message_t> &recvMsgs, std::vector<zmq::message_t> &replyMsgs)
106{
107 spdlog::trace("Received {} messages", recvMsgs.size());
108 replyMsgs.clear();
109
110 std::string replyBody;
111 int reply = ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL;
112 switch (*(static_cast<const uint64_t *>(recvMsgs[0].data())))
113 {
114 case LOG_LEVEL_ID: {
115 if (recvMsgs.size() != 2)
116 {
117 spdlog::error("Received unknown number of messages for log level change");
118 break;
119 }
120
121 spdlog::warn("Log level change request received");
122 const auto receivedMsg = std::string(static_cast<const char *>(recvMsgs[1].data()), recvMsgs[1].size());
123
124 if (receivedMsg == "v")
125 {
126 spdlog::set_level(spdlog::level::info);
127 }
128 if (receivedMsg == "vv")
129 {
130 spdlog::set_level(spdlog::level::debug);
131 }
132 if (receivedMsg == "vvv")
133 {
134 spdlog::set_level(spdlog::level::trace);
135 }
136 if (receivedMsg == "r")
137 {
138#ifdef NDEBUG
139 spdlog::set_level(spdlog::level::warn);
140#else
141 spdlog::set_level(spdlog::level::info);
142#endif
143 }
144 reply = ZMQ_EVENT_HANDSHAKE_SUCCEEDED;
145 break;
146 }
147 case VERSION_INFO_ID: {
148 if (recvMsgs.size() != 1)
149 {
150 spdlog::error("Received unknown number of messages for version information");
151 break;
152 }
153
154 reply = ZMQ_EVENT_HANDSHAKE_SUCCEEDED;
155 replyBody = PROJECT_FULL_VERSION_STRING;
156 break;
157 }
158 case PING_PONG_ID: {
159 if (recvMsgs.size() != 1)
160 {
161 spdlog::error("Received unknown number of messages for ping");
162 break;
163 }
164
165 reply = ZMQ_EVENT_HANDSHAKE_SUCCEEDED;
166 replyBody = "PONG";
167 break;
168 }
169 case STATUS_CHECK_ID: {
170 if (recvMsgs.size() != 1)
171 {
172 spdlog::error("Received unknown number of messages for status check");
173 break;
174 }
175
176 reply = ZMQ_EVENT_HANDSHAKE_SUCCEEDED;
177
178 std::ostringstream oss;
179 oss << "{";
180 for (const auto &[process, statusFlag] : vCheckFlag)
181 {
182 oss << "\"" << process << "\":" << (statusFlag->_M_i ? "1," : "0,");
183 }
184 replyBody = oss.str();
185 replyBody.replace(replyBody.size() - 1, 1, "}");
186 break;
187 }
188 /* ################################################################################### */
189 /* ############################# MAKE MODIFICATIONS HERE ############################# */
190 /* ################################################################################### */
191
192 /* ################################################################################### */
193 /* ################################ END MODIFICATIONS ################################ */
194 /* ################################################################################### */
195 default:
196 spdlog::error("Unknown command received from control");
197 break;
198 }
199
200 // Prepare reply
201 replyMsgs.emplace_back(&reply, sizeof(reply));
202 replyMsgs.emplace_back(replyBody.c_str(), replyBody.size());
203
204 return reply == ZMQ_EVENT_HANDSHAKE_SUCCEEDED;
205}
std::vector< std::pair< std::string, std::shared_ptr< std::atomic_flag > > > vCheckFlag
Global variable to check if the servers are running.
constexpr size_t constHasher(const char *s)
Definition Hasher.hpp:13
constexpr uint32_t PING_PONG_ID
constexpr uint32_t VERSION_INFO_ID
constexpr uint32_t LOG_LEVEL_ID
constexpr uint32_t STATUS_CHECK_ID
bool ZeroMQServerMessageCallback(const std::vector< zmq::message_t > &recvMsgs, std::vector< zmq::message_t > &replyMsgs)
void startMonitoring(zmq::socket_t *socket, const std::string &monitorAddress)
void update()
Processes new messages.
std::unique_ptr< ZeroMQStats > _stats
std::shared_ptr< std::atomic_flag > _checkFlag
void threadFunc(const std::stop_token &stopToken) noexcept
Main thread function.
FPTR_MessageCallback messageCallback() const
std::unique_ptr< std::jthread > _serverThread
ZeroMQServer(const std::string &hostAddr, std::shared_ptr< std::atomic_flag > checkFlag, const std::shared_ptr< prometheus::Registry > &reg=nullptr, const std::string &prependName="")
void shutdown()
Closes the ZeroMQ Server.
void stop()
Definition ZeroMQ.cpp:63
std::vector< zmq::message_t > recvMessages()
Definition ZeroMQ.cpp:81
const std::unique_ptr< zmq::socket_t > & getSocket() const
Definition ZeroMQ.hpp:90
size_t sendMessages(std::vector< zmq::message_t > &msg)
Definition ZeroMQ.cpp:96
bool start()
Definition ZeroMQ.cpp:43
ZeroMQ(const zmq::socket_type &type, const std::string &addr, bool isBind)
Definition ZeroMQ.cpp:32
bool isSuccessful
Indicates if processing was successful for this connection.
std::chrono::high_resolution_clock::time_point processingTimeEnd
Processing time end.
std::chrono::high_resolution_clock::time_point processingTimeStart
Processing time start.