Repo-Init
 
Loading...
Searching...
No Matches
ZeroMQServer.cpp
Go to the documentation of this file.
2
3#include "Version.h"
5#include "utils/Hasher.hpp"
6
7#include <spdlog/spdlog.h>
8
9constexpr uint32_t LOG_LEVEL_ID = ('L' | ('O' << 8) | ('G' << 16) | ('L' << 24));
10constexpr uint32_t VERSION_INFO_ID = ('V' | ('E' << 8) | ('R' << 16) | ('I' << 24));
11constexpr uint32_t PING_PONG_ID = ('P' | ('I' << 8) | ('N' << 16) | ('G' << 24));
12constexpr uint32_t STATUS_CHECK_ID = ('S' | ('C' << 8) | ('H' << 16) | ('K' << 24));
13/* ################################################################################### */
14/* ############################# MAKE MODIFICATIONS HERE ############################# */
15/* ################################################################################### */
16
17/* ################################################################################### */
18/* ################################ END MODIFICATIONS ################################ */
19/* ################################################################################### */
20
22{
23 auto recvMsgs = recvMessages();
24
25 if (!recvMsgs.empty())
26 {
27 std::vector<zmq::message_t> replyMsgs;
28
29 ZeroMQServerStats serverStats;
30 serverStats.processingTimeStart = std::chrono::high_resolution_clock::now();
31 serverStats.isSuccessful = messageCallback() && messageCallback()(recvMsgs, replyMsgs);
32
33 size_t nSentMsg = sendMessages(replyMsgs);
34 if (nSentMsg != replyMsgs.size())
35 {
36 spdlog::warn("Can't send whole reply: Sent messages {} / {}", nSentMsg, replyMsgs.size());
37 }
38 serverStats.processingTimeEnd = std::chrono::high_resolution_clock::now();
39
40 if (_stats)
41 {
42 _stats->consumeStats(recvMsgs, replyMsgs, serverStats);
43 }
44 }
45}
46
48{
49 spdlog::info("ZeroMQ server started");
50 while (!_shouldStop._M_i)
51 {
52 try
53 {
54 update();
55 if (_checkFlag)
56 {
57 _checkFlag->test_and_set();
58 }
59 }
60 catch (const std::exception &e)
61 {
62 spdlog::error("ZeroMQ server failed: {}", e.what());
63 }
64 }
65 spdlog::info("ZeroMQ server stopped");
66}
67
68ZeroMQServer::ZeroMQServer(const std::string &hostAddr, std::shared_ptr<std::atomic_flag> checkFlag,
69 const std::shared_ptr<prometheus::Registry> &reg, const std::string &prependName)
70 : ZeroMQ(zmq::socket_type::rep, hostAddr, true), _checkFlag(std::move(checkFlag))
71{
72 if (reg)
73 {
74 _stats = std::make_unique<ZeroMQStats>(reg, prependName);
75 }
76
77 startMonitoring(getSocket().get(), "inproc://" + std::to_string(constHasher(hostAddr.c_str())) + ".rep");
78}
79
81{
82 _shouldStop.clear();
83
84 if (start())
85 {
86 _serverThread = std::make_unique<std::thread>(&ZeroMQServer::threadFunc, this);
87 return true;
88 }
89 return false;
90}
91
93{
94 _shouldStop.test_and_set();
95 if (_serverThread && _serverThread->joinable())
96 {
97 _serverThread->join();
98 _serverThread.reset();
99 }
100
101 stop();
102}
103
104bool ZeroMQServerMessageCallback(const std::vector<zmq::message_t> &recvMsgs, std::vector<zmq::message_t> &replyMsgs)
105{
106 spdlog::trace("Received {} messages", recvMsgs.size());
107 replyMsgs.clear();
108
109 std::string replyBody;
110 int reply = ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL;
111 switch (*(static_cast<const uint64_t *>(recvMsgs[0].data())))
112 {
113 case LOG_LEVEL_ID: {
114 if (recvMsgs.size() != 2)
115 {
116 spdlog::error("Received unknown number of messages for log level change");
117 break;
118 }
119
120 spdlog::warn("Log level change request received");
121 const auto receivedMsg = std::string(static_cast<const char *>(recvMsgs[1].data()), recvMsgs[1].size());
122
123 if (receivedMsg == "v")
124 {
125 spdlog::set_level(spdlog::level::info);
126 }
127 if (receivedMsg == "vv")
128 {
129 spdlog::set_level(spdlog::level::debug);
130 }
131 if (receivedMsg == "vvv")
132 {
133 spdlog::set_level(spdlog::level::trace);
134 }
135 reply = ZMQ_EVENT_HANDSHAKE_SUCCEEDED;
136 break;
137 }
138 case VERSION_INFO_ID: {
139 if (recvMsgs.size() != 1)
140 {
141 spdlog::error("Received unknown number of messages for version information");
142 break;
143 }
144
145 reply = ZMQ_EVENT_HANDSHAKE_SUCCEEDED;
146 replyBody = PROJECT_FULL_VERSION_STRING;
147 break;
148 }
149 case PING_PONG_ID: {
150 if (recvMsgs.size() != 1)
151 {
152 spdlog::error("Received unknown number of messages for ping");
153 break;
154 }
155
156 reply = ZMQ_EVENT_HANDSHAKE_SUCCEEDED;
157 replyBody = "PONG";
158 break;
159 }
160 case STATUS_CHECK_ID: {
161 if (recvMsgs.size() != 1)
162 {
163 spdlog::error("Received unknown number of messages for status check");
164 break;
165 }
166
167 reply = ZMQ_EVENT_HANDSHAKE_SUCCEEDED;
168
169 std::ostringstream oss;
170 oss << "{";
171 for (const auto &entry : vCheckFlag)
172 {
173 oss << "\"" << entry.first << "\":" << (entry.second->_M_i ? "1," : "0,");
174 }
175 replyBody = oss.str();
176 replyBody.replace(replyBody.size() - 1, 1, "}");
177 break;
178 }
179 /* ################################################################################### */
180 /* ############################# MAKE MODIFICATIONS HERE ############################# */
181 /* ################################################################################### */
182
183 /* ################################################################################### */
184 /* ################################ END MODIFICATIONS ################################ */
185 /* ################################################################################### */
186 default:
187 spdlog::error("Unknown command received from control");
188 break;
189 }
190
191 // Prepare reply
192 replyMsgs.emplace_back(&reply, sizeof(reply));
193 replyMsgs.emplace_back(replyBody.c_str(), replyBody.size());
194
195 return reply == ZMQ_EVENT_HANDSHAKE_SUCCEEDED;
196}
std::vector< std::pair< std::string, std::shared_ptr< std::atomic_flag > > > vCheckFlag
Global variable to check if the servers are running.
constexpr size_t constHasher(const char *s)
Definition Hasher.hpp:13
constexpr uint32_t PING_PONG_ID
constexpr uint32_t VERSION_INFO_ID
constexpr uint32_t LOG_LEVEL_ID
constexpr uint32_t STATUS_CHECK_ID
bool ZeroMQServerMessageCallback(const std::vector< zmq::message_t > &recvMsgs, std::vector< zmq::message_t > &replyMsgs)
void startMonitoring(zmq::socket_t *socket, const std::string &monitorAddress)
std::atomic_flag _shouldStop
void update()
Processes new messages.
std::unique_ptr< ZeroMQStats > _stats
std::shared_ptr< std::atomic_flag > _checkFlag
void threadFunc() noexcept
Main thread function.
FPTR_MessageCallback messageCallback() const
ZeroMQServer(const std::string &hostAddr, std::shared_ptr< std::atomic_flag > checkFlag, const std::shared_ptr< prometheus::Registry > &reg=nullptr, const std::string &prependName="")
std::unique_ptr< std::thread > _serverThread
void shutdown()
Closes the ZeroMQ Server.
void stop()
Definition ZeroMQ.cpp:63
std::vector< zmq::message_t > recvMessages()
Definition ZeroMQ.cpp:81
const std::unique_ptr< zmq::socket_t > & getSocket() const
Definition ZeroMQ.hpp:90
size_t sendMessages(std::vector< zmq::message_t > &msg)
Definition ZeroMQ.cpp:96
bool start()
Definition ZeroMQ.cpp:43
bool isSuccessful
Indicates if processing was successful for this connection.
std::chrono::high_resolution_clock::time_point processingTimeEnd
Processing time end.
std::chrono::high_resolution_clock::time_point processingTimeStart
Processing time start.