Repo-Init
 
Loading...
Searching...
No Matches
ZeroMQMonitor.cpp
Go to the documentation of this file.
2
3#include <spdlog/spdlog.h>
4
5constexpr int EVENT_CHECK_TIMEOUT_MS = 100;
6
8{
9 while (!_shouldStop._M_i)
10 {
11 check_event(EVENT_CHECK_TIMEOUT_MS);
12 }
13}
14
15void ZeroMQMonitor::on_event(const std::string &messageStr, int level, const char *addr)
16{
17 switch (level)
18 {
19 case spdlog::level::trace:
20 spdlog::trace("{} {}", messageStr, addr == nullptr ? "" : addr);
21 break;
22 case spdlog::level::debug:
23 spdlog::debug("{} {}", messageStr, addr == nullptr ? "" : addr);
24 break;
25 case spdlog::level::info:
26 spdlog::info("{} {}", messageStr, addr == nullptr ? "" : addr);
27 break;
28 case spdlog::level::warn:
29 spdlog::warn("{} {}", messageStr, addr == nullptr ? "" : addr);
30 break;
31 case spdlog::level::err:
32 spdlog::error("{} {}", messageStr, addr == nullptr ? "" : addr);
33 break;
34 case spdlog::level::critical:
35 spdlog::critical("{} {}", messageStr, addr == nullptr ? "" : addr);
36 break;
37 default:
38 spdlog::warn("Unknown log level {} {} {}", messageStr, addr == nullptr ? "" : addr, level);
39 break;
40 }
41}
42
43void ZeroMQMonitor::on_monitor_started() { on_event("Monitor started", spdlog::level::info); }
44
45void ZeroMQMonitor::on_event_connected(const zmq_event_t & /*unused*/, const char *addr_)
46{
47 on_event("Connected", spdlog::level::info, addr_);
48}
49
50void ZeroMQMonitor::on_event_connect_delayed(const zmq_event_t & /*unused*/, const char *addr_)
51{
52 on_event("Connect delayed", spdlog::level::debug, addr_);
53}
54
55void ZeroMQMonitor::on_event_connect_retried(const zmq_event_t & /*unused*/, const char *addr_)
56{
57 on_event("Connect retried", spdlog::level::debug, addr_);
58}
59
60void ZeroMQMonitor::on_event_listening(const zmq_event_t & /*unused*/, const char *addr_)
61{
62 on_event("Listening", spdlog::level::debug, addr_);
63}
64
65void ZeroMQMonitor::on_event_bind_failed(const zmq_event_t & /*unused*/, const char *addr_)
66{
67 on_event("Bind failed", spdlog::level::warn, addr_);
68}
69
70void ZeroMQMonitor::on_event_accepted(const zmq_event_t & /*unused*/, const char *addr_)
71{
72 on_event("Accepted", spdlog::level::info, addr_);
73}
74
75void ZeroMQMonitor::on_event_accept_failed(const zmq_event_t & /*unused*/, const char *addr_)
76{
77 on_event("Accept failed", spdlog::level::warn, addr_);
78}
79
80void ZeroMQMonitor::on_event_closed(const zmq_event_t & /*unused*/, const char *addr_)
81{
82 on_event("Closed", spdlog::level::debug, addr_);
83}
84
85void ZeroMQMonitor::on_event_close_failed(const zmq_event_t & /*unused*/, const char *addr_)
86{
87 on_event("Close failed", spdlog::level::warn, addr_);
88}
89
90void ZeroMQMonitor::on_event_disconnected(const zmq_event_t & /*unused*/, const char *addr_)
91{
92 on_event("Disconnected", spdlog::level::info, addr_);
93}
94
95#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0) || \
96 (defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3))
97void ZeroMQMonitor::on_event_handshake_failed_no_detail(const zmq_event_t & /*unused*/, const char *addr_)
98{
99 on_event("Handshake failed (no detail)", spdlog::level::warn, addr_);
100}
101
102void ZeroMQMonitor::on_event_handshake_failed_protocol(const zmq_event_t & /*unused*/, const char *addr_)
103{
104 on_event("Handshake failed (protocol)", spdlog::level::warn, addr_);
105}
106
107void ZeroMQMonitor::on_event_handshake_failed_auth(const zmq_event_t & /*unused*/, const char *addr_)
108{
109 on_event("Handshake failed (auth)", spdlog::level::warn, addr_);
110}
111
112void ZeroMQMonitor::on_event_handshake_succeeded(const zmq_event_t & /*unused*/, const char *addr_)
113{
114 on_event("Handshake succeeded", spdlog::level::info, addr_);
115}
116
117#elif defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1)
118void ZeroMQMonitor::on_event_handshake_failed(const zmq_event_t & /*unused*/, const char *addr_)
119{
120 on_event("Handshake failed", spdlog::level::warn, addr_);
121}
122
123void ZeroMQMonitor::on_event_handshake_succeed(const zmq_event_t & /*unused*/, const char *addr_)
124{
125 on_event("Handshake succeed", spdlog::level::info, addr_);
126}
127#endif
128
129void ZeroMQMonitor::on_event_unknown(const zmq_event_t & /*unused*/, const char *addr_)
130{
131 on_event("Unknown event", spdlog::level::warn, addr_);
132}
133
134void ZeroMQMonitor::startMonitoring(zmq::socket_t *socket, const std::string &monitorAddress)
135{
136 if (socket == nullptr)
137 {
138 throw std::invalid_argument("ZeroMQ socket to monitor is nullptr");
139 }
140
141 init(*socket, monitorAddress);
142 _monitorThread = std::make_unique<std::thread>(&ZeroMQMonitor::threadFunc, this);
143}
144
146{
147 _shouldStop.test_and_set();
148#ifdef ZMQ_EVENT_MONITOR_STOPPED
149 abort();
150#endif
151
152 // Join the thread
153 if (_monitorThread && _monitorThread->joinable())
154 {
155 _monitorThread->join();
156 _monitorThread.reset();
157 }
158
159 spdlog::info("Monitor stopped");
160}
constexpr int EVENT_CHECK_TIMEOUT_MS
void on_event_unknown(const zmq_event_t &, const char *addr_) override
void on_event_connect_retried(const zmq_event_t &, const char *addr_) override
void on_event_disconnected(const zmq_event_t &, const char *addr_) override
void on_event_bind_failed(const zmq_event_t &, const char *addr_) override
void on_event_connect_delayed(const zmq_event_t &, const char *addr_) override
void on_event_listening(const zmq_event_t &, const char *addr_) override
void on_event_closed(const zmq_event_t &, const char *addr_) override
void startMonitoring(zmq::socket_t *socket, const std::string &monitorAddress)
void on_event_connected(const zmq_event_t &, const char *addr_) override
void on_event_handshake_succeeded(const zmq_event_t &, const char *addr_) override
void on_event_accepted(const zmq_event_t &, const char *addr_) override
std::unique_ptr< std::thread > _monitorThread
void on_event_handshake_failed_auth(const zmq_event_t &, const char *addr_) override
void on_event_accept_failed(const zmq_event_t &, const char *addr_) override
static void on_event(const std::string &messageStr, int level, const char *addr=nullptr)
void on_monitor_started() override
void on_event_handshake_failed_no_detail(const zmq_event_t &, const char *addr_) override
void on_event_handshake_failed_protocol(const zmq_event_t &, const char *addr_) override
std::atomic_flag _shouldStop
void on_event_close_failed(const zmq_event_t &, const char *addr_) override