Repo-Init
 
Loading...
Searching...
No Matches
ZeroMQServer.cpp
Go to the documentation of this file.
2
3#include "Version.h"
5#include "utils/Hasher.hpp"
6
7#include <spdlog/spdlog.h>
8
9constexpr uint32_t LOG_LEVEL_ID = (static_cast<uint32_t>('L') | (static_cast<uint32_t>('O') << 8) |
10 (static_cast<uint32_t>('G') << 16) | (static_cast<uint32_t>('L') << 24));
11constexpr uint32_t VERSION_INFO_ID = (static_cast<uint32_t>('V') | (static_cast<uint32_t>('E') << 8) |
12 (static_cast<uint32_t>('R') << 16) | (static_cast<uint32_t>('I') << 24));
13constexpr uint32_t PING_PONG_ID = (static_cast<uint32_t>('P') | (static_cast<uint32_t>('I') << 8) |
14 (static_cast<uint32_t>('N') << 16) | (static_cast<uint32_t>('G') << 24));
15constexpr uint32_t STATUS_CHECK_ID = (static_cast<uint32_t>('S') | (static_cast<uint32_t>('C') << 8) |
16 (static_cast<uint32_t>('H') << 16) | (static_cast<uint32_t>('K') << 24));
17/* ################################################################################### */
18/* ############################# MAKE MODIFICATIONS HERE ############################# */
19/* ################################################################################### */
20
21/* ################################################################################### */
22/* ################################ END MODIFICATIONS ################################ */
23/* ################################################################################### */
24
26{
27 auto recvMsgs = recvMessages();
28
29 if (!recvMsgs.empty())
30 {
31 std::vector<zmq::message_t> replyMsgs;
32
33 ZeroMQServerStats serverStats;
34 serverStats.processingTimeStart = std::chrono::high_resolution_clock::now();
35 serverStats.isSuccessful = messageCallback() && messageCallback()(recvMsgs, replyMsgs);
36
37 if (size_t nSentMsg = sendMessages(replyMsgs); nSentMsg != replyMsgs.size())
38 {
39 spdlog::warn("Can't send whole reply: Sent messages {} / {}", nSentMsg, replyMsgs.size());
40 }
41 serverStats.processingTimeEnd = std::chrono::high_resolution_clock::now();
42
43 if (_stats)
44 {
45 _stats->consumeStats(recvMsgs, replyMsgs, serverStats);
46 }
47 }
48}
49
51{
52 spdlog::info("ZeroMQ server started");
53 while (!_shouldStop._M_i)
54 {
55 try
56 {
57 update();
58 if (_checkFlag)
59 {
60 _checkFlag->test_and_set();
61 }
62 }
63 catch (const std::exception &e)
64 {
65 spdlog::error("ZeroMQ server failed: {}", e.what());
66 }
67 }
68 spdlog::info("ZeroMQ server stopped");
69}
70
71ZeroMQServer::ZeroMQServer(const std::string &hostAddr, std::shared_ptr<std::atomic_flag> checkFlag,
72 const std::shared_ptr<prometheus::Registry> &reg, const std::string &prependName)
73 : ZeroMQ(zmq::socket_type::rep, hostAddr, true), _checkFlag(std::move(checkFlag))
74{
75 if (reg)
76 {
77 _stats = std::make_unique<ZeroMQStats>(reg, prependName);
78 }
79
80 startMonitoring(getSocket().get(), "inproc://" + std::to_string(constHasher(hostAddr.c_str())) + ".rep");
81}
82
84{
85 _shouldStop.clear();
86
87 if (start())
88 {
89 _serverThread = std::make_unique<std::thread>(&ZeroMQServer::threadFunc, this);
90 return true;
91 }
92 return false;
93}
94
96{
97 _shouldStop.test_and_set();
98 if (_serverThread && _serverThread->joinable())
99 {
100 _serverThread->join();
101 _serverThread.reset();
102 }
103
104 stop();
105}
106
107bool ZeroMQServerMessageCallback(const std::vector<zmq::message_t> &recvMsgs, std::vector<zmq::message_t> &replyMsgs)
108{
109 spdlog::trace("Received {} messages", recvMsgs.size());
110 replyMsgs.clear();
111
112 std::string replyBody;
113 int reply = ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL;
114 switch (*(static_cast<const uint64_t *>(recvMsgs[0].data())))
115 {
116 case LOG_LEVEL_ID: {
117 if (recvMsgs.size() != 2)
118 {
119 spdlog::error("Received unknown number of messages for log level change");
120 break;
121 }
122
123 spdlog::warn("Log level change request received");
124 const auto receivedMsg = std::string(static_cast<const char *>(recvMsgs[1].data()), recvMsgs[1].size());
125
126 if (receivedMsg == "v")
127 {
128 spdlog::set_level(spdlog::level::info);
129 }
130 if (receivedMsg == "vv")
131 {
132 spdlog::set_level(spdlog::level::debug);
133 }
134 if (receivedMsg == "vvv")
135 {
136 spdlog::set_level(spdlog::level::trace);
137 }
138 reply = ZMQ_EVENT_HANDSHAKE_SUCCEEDED;
139 break;
140 }
141 case VERSION_INFO_ID: {
142 if (recvMsgs.size() != 1)
143 {
144 spdlog::error("Received unknown number of messages for version information");
145 break;
146 }
147
148 reply = ZMQ_EVENT_HANDSHAKE_SUCCEEDED;
149 replyBody = PROJECT_FULL_VERSION_STRING;
150 break;
151 }
152 case PING_PONG_ID: {
153 if (recvMsgs.size() != 1)
154 {
155 spdlog::error("Received unknown number of messages for ping");
156 break;
157 }
158
159 reply = ZMQ_EVENT_HANDSHAKE_SUCCEEDED;
160 replyBody = "PONG";
161 break;
162 }
163 case STATUS_CHECK_ID: {
164 if (recvMsgs.size() != 1)
165 {
166 spdlog::error("Received unknown number of messages for status check");
167 break;
168 }
169
170 reply = ZMQ_EVENT_HANDSHAKE_SUCCEEDED;
171
172 std::ostringstream oss;
173 oss << "{";
174 for (const auto &[process, statusFlag] : vCheckFlag)
175 {
176 oss << "\"" << process << "\":" << (statusFlag->_M_i ? "1," : "0,");
177 }
178 replyBody = oss.str();
179 replyBody.replace(replyBody.size() - 1, 1, "}");
180 break;
181 }
182 /* ################################################################################### */
183 /* ############################# MAKE MODIFICATIONS HERE ############################# */
184 /* ################################################################################### */
185
186 /* ################################################################################### */
187 /* ################################ END MODIFICATIONS ################################ */
188 /* ################################################################################### */
189 default:
190 spdlog::error("Unknown command received from control");
191 break;
192 }
193
194 // Prepare reply
195 replyMsgs.emplace_back(&reply, sizeof(reply));
196 replyMsgs.emplace_back(replyBody.c_str(), replyBody.size());
197
198 return reply == ZMQ_EVENT_HANDSHAKE_SUCCEEDED;
199}
std::vector< std::pair< std::string, std::shared_ptr< std::atomic_flag > > > vCheckFlag
Global variable to check if the servers are running.
constexpr size_t constHasher(const char *s)
Definition Hasher.hpp:13
constexpr uint32_t PING_PONG_ID
constexpr uint32_t VERSION_INFO_ID
constexpr uint32_t LOG_LEVEL_ID
constexpr uint32_t STATUS_CHECK_ID
bool ZeroMQServerMessageCallback(const std::vector< zmq::message_t > &recvMsgs, std::vector< zmq::message_t > &replyMsgs)
void startMonitoring(zmq::socket_t *socket, const std::string &monitorAddress)
std::atomic_flag _shouldStop
void update()
Processes new messages.
std::unique_ptr< ZeroMQStats > _stats
std::shared_ptr< std::atomic_flag > _checkFlag
void threadFunc() noexcept
Main thread function.
FPTR_MessageCallback messageCallback() const
ZeroMQServer(const std::string &hostAddr, std::shared_ptr< std::atomic_flag > checkFlag, const std::shared_ptr< prometheus::Registry > &reg=nullptr, const std::string &prependName="")
std::unique_ptr< std::thread > _serverThread
void shutdown()
Closes the ZeroMQ Server.
void stop()
Definition ZeroMQ.cpp:63
std::vector< zmq::message_t > recvMessages()
Definition ZeroMQ.cpp:81
const std::unique_ptr< zmq::socket_t > & getSocket() const
Definition ZeroMQ.hpp:90
size_t sendMessages(std::vector< zmq::message_t > &msg)
Definition ZeroMQ.cpp:96
bool start()
Definition ZeroMQ.cpp:43
bool isSuccessful
Indicates if processing was successful for this connection.
std::chrono::high_resolution_clock::time_point processingTimeEnd
Processing time end.
std::chrono::high_resolution_clock::time_point processingTimeStart
Processing time start.