9#include <spdlog/spdlog.h>
11constexpr uint32_t
LOG_LEVEL_ID = (
static_cast<uint32_t
>(
'L') | (
static_cast<uint32_t
>(
'O') << 8) |
12 (
static_cast<uint32_t
>(
'G') << 16) | (
static_cast<uint32_t
>(
'L') << 24));
13constexpr uint32_t
VERSION_INFO_ID = (
static_cast<uint32_t
>(
'V') | (
static_cast<uint32_t
>(
'E') << 8) |
14 (
static_cast<uint32_t
>(
'R') << 16) | (
static_cast<uint32_t
>(
'I') << 24));
15constexpr uint32_t
PING_PONG_ID = (
static_cast<uint32_t
>(
'P') | (
static_cast<uint32_t
>(
'I') << 8) |
16 (
static_cast<uint32_t
>(
'N') << 16) | (
static_cast<uint32_t
>(
'G') << 24));
17constexpr uint32_t
STATUS_CHECK_ID = (
static_cast<uint32_t
>(
'S') | (
static_cast<uint32_t
>(
'C') << 8) |
18 (
static_cast<uint32_t
>(
'H') << 16) | (
static_cast<uint32_t
>(
'K') << 24));
31 if (!recvMsgs.empty())
33 std::vector<zmq::message_t> replyMsgs;
39 if (
size_t nSentMsg =
sendMessages(replyMsgs); nSentMsg != replyMsgs.size())
41 spdlog::warn(
"Can't send whole reply: Sent messages {} / {}", nSentMsg, replyMsgs.size());
47 _stats->consumeStats(recvMsgs, replyMsgs, serverStats);
54 spdlog::info(
"ZeroMQ server started");
55 while (!stopToken.stop_requested())
65 catch (
const std::exception &e)
67 spdlog::error(
"ZeroMQ server failed: {}", e.what());
70 spdlog::info(
"ZeroMQ server stopped");
74 const std::shared_ptr<prometheus::Registry> ®,
const std::string &prependName)
75 :
ZeroMQ(zmq::socket_type::rep, hostAddr, true),
_checkFlag(std::move(checkFlag))
79 _stats = std::make_unique<ZeroMQStats>(reg, prependName);
107 spdlog::trace(
"Received {} messages", recvMsgs.size());
110 std::string replyBody;
111 int reply = ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL;
112 switch (*(
static_cast<const uint64_t *
>(recvMsgs[0].data())))
115 if (recvMsgs.size() != 2)
117 spdlog::error(
"Received unknown number of messages for log level change");
121 spdlog::warn(
"Log level change request received");
122 const auto receivedMsg = std::string(
static_cast<const char *
>(recvMsgs[1].data()), recvMsgs[1].size());
124 if (receivedMsg ==
"v")
126 spdlog::set_level(spdlog::level::info);
128 if (receivedMsg ==
"vv")
130 spdlog::set_level(spdlog::level::debug);
132 if (receivedMsg ==
"vvv")
134 spdlog::set_level(spdlog::level::trace);
136 reply = ZMQ_EVENT_HANDSHAKE_SUCCEEDED;
140 if (recvMsgs.size() != 1)
142 spdlog::error(
"Received unknown number of messages for version information");
146 reply = ZMQ_EVENT_HANDSHAKE_SUCCEEDED;
147 replyBody = PROJECT_FULL_VERSION_STRING;
151 if (recvMsgs.size() != 1)
153 spdlog::error(
"Received unknown number of messages for ping");
157 reply = ZMQ_EVENT_HANDSHAKE_SUCCEEDED;
162 if (recvMsgs.size() != 1)
164 spdlog::error(
"Received unknown number of messages for status check");
168 reply = ZMQ_EVENT_HANDSHAKE_SUCCEEDED;
170 std::ostringstream oss;
172 for (
const auto &[process, statusFlag] :
vCheckFlag)
174 oss <<
"\"" << process <<
"\":" << (statusFlag->_M_i ?
"1," :
"0,");
176 replyBody = oss.str();
177 replyBody.replace(replyBody.size() - 1, 1,
"}");
188 spdlog::error(
"Unknown command received from control");
193 replyMsgs.emplace_back(&reply,
sizeof(reply));
194 replyMsgs.emplace_back(replyBody.c_str(), replyBody.size());
196 return reply == ZMQ_EVENT_HANDSHAKE_SUCCEEDED;
std::vector< std::pair< std::string, std::shared_ptr< std::atomic_flag > > > vCheckFlag
Global variable to check if the servers are running.
constexpr size_t constHasher(const char *s)
constexpr uint32_t PING_PONG_ID
constexpr uint32_t VERSION_INFO_ID
constexpr uint32_t LOG_LEVEL_ID
constexpr uint32_t STATUS_CHECK_ID
bool ZeroMQServerMessageCallback(const std::vector< zmq::message_t > &recvMsgs, std::vector< zmq::message_t > &replyMsgs)
void startMonitoring(zmq::socket_t *socket, const std::string &monitorAddress)
void update()
Processes new messages.
std::unique_ptr< ZeroMQStats > _stats
std::shared_ptr< std::atomic_flag > _checkFlag
void threadFunc(const std::stop_token &stopToken) noexcept
Main thread function.
FPTR_MessageCallback messageCallback() const
std::unique_ptr< std::jthread > _serverThread
ZeroMQServer(const std::string &hostAddr, std::shared_ptr< std::atomic_flag > checkFlag, const std::shared_ptr< prometheus::Registry > ®=nullptr, const std::string &prependName="")
void shutdown()
Closes the ZeroMQ Server.
std::vector< zmq::message_t > recvMessages()
const std::unique_ptr< zmq::socket_t > & getSocket() const
size_t sendMessages(std::vector< zmq::message_t > &msg)
ZeroMQ(const zmq::socket_type &type, const std::string &addr, bool isBind)
bool isSuccessful
Indicates if processing was successful for this connection.
std::chrono::high_resolution_clock::time_point processingTimeEnd
Processing time end.
std::chrono::high_resolution_clock::time_point processingTimeStart
Processing time start.