Repo-Init
 
Loading...
Searching...
No Matches
ZeroMQMonitor.cpp
Go to the documentation of this file.
2
3#include <spdlog/spdlog.h>
4
5constexpr int EVENT_CHECK_TIMEOUT_MS = 100;
6
8{
9 while (!_shouldStop._M_i)
10 {
11 check_event(EVENT_CHECK_TIMEOUT_MS);
12 }
13}
14
15void ZeroMQMonitor::on_event(const std::string &messageStr, int level, const char *addr)
16{
17 switch (level)
18 {
19 case spdlog::level::debug:
20 spdlog::debug("{} {}", messageStr, addr == nullptr ? "" : addr);
21 break;
22 case spdlog::level::info:
23 spdlog::info("{} {}", messageStr, addr == nullptr ? "" : addr);
24 break;
25 case spdlog::level::warn:
26 spdlog::warn("{} {}", messageStr, addr == nullptr ? "" : addr);
27 break;
28 default:
29 break;
30 }
31}
32
33void ZeroMQMonitor::on_monitor_started() { on_event("Monitor started", spdlog::level::info); }
34
35void ZeroMQMonitor::on_event_connected(const zmq_event_t & /*unused*/, const char *addr_)
36{
37 _peerCount.fetch_add(1);
38 on_event("Connected", spdlog::level::info, addr_);
39}
40
41void ZeroMQMonitor::on_event_connect_delayed(const zmq_event_t & /*unused*/, const char *addr_)
42{
43 on_event("Connect delayed", spdlog::level::debug, addr_);
44}
45
46void ZeroMQMonitor::on_event_connect_retried(const zmq_event_t & /*unused*/, const char *addr_)
47{
48 on_event("Connect retried", spdlog::level::debug, addr_);
49}
50
51void ZeroMQMonitor::on_event_listening(const zmq_event_t & /*unused*/, const char *addr_)
52{
53 on_event("Listening", spdlog::level::debug, addr_);
54}
55
56void ZeroMQMonitor::on_event_bind_failed(const zmq_event_t & /*unused*/, const char *addr_)
57{
58 on_event("Bind failed", spdlog::level::warn, addr_);
59}
60
61void ZeroMQMonitor::on_event_accepted(const zmq_event_t & /*unused*/, const char *addr_)
62{
63 on_event("Accepted", spdlog::level::info, addr_);
64}
65
66void ZeroMQMonitor::on_event_accept_failed(const zmq_event_t & /*unused*/, const char *addr_)
67{
68 on_event("Accept failed", spdlog::level::warn, addr_);
69}
70
71void ZeroMQMonitor::on_event_closed(const zmq_event_t & /*unused*/, const char *addr_)
72{
73 on_event("Closed", spdlog::level::debug, addr_);
74}
75
76void ZeroMQMonitor::on_event_close_failed(const zmq_event_t & /*unused*/, const char *addr_)
77{
78 on_event("Close failed", spdlog::level::warn, addr_);
79}
80
81void ZeroMQMonitor::on_event_disconnected(const zmq_event_t & /*unused*/, const char *addr_)
82{
83 _peerCount.fetch_sub(1);
84 on_event("Disconnected", spdlog::level::info, addr_);
85}
86
87#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0) || \
88 (defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3))
89void ZeroMQMonitor::on_event_handshake_failed_no_detail(const zmq_event_t & /*unused*/, const char *addr_)
90{
91 on_event("Handshake failed (no detail)", spdlog::level::warn, addr_);
92}
93
94void ZeroMQMonitor::on_event_handshake_failed_protocol(const zmq_event_t & /*unused*/, const char *addr_)
95{
96 on_event("Handshake failed (protocol)", spdlog::level::warn, addr_);
97}
98
99void ZeroMQMonitor::on_event_handshake_failed_auth(const zmq_event_t & /*unused*/, const char *addr_)
100{
101 on_event("Handshake failed (auth)", spdlog::level::warn, addr_);
102}
103
104void ZeroMQMonitor::on_event_handshake_succeeded(const zmq_event_t & /*unused*/, const char *addr_)
105{
106 on_event("Handshake succeeded", spdlog::level::info, addr_);
107}
108
109#elif defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1)
110void ZeroMQMonitor::on_event_handshake_failed(const zmq_event_t & /*unused*/, const char *addr_)
111{
112 on_event("Handshake failed", spdlog::level::warn, addr_);
113}
114
115void ZeroMQMonitor::on_event_handshake_succeed(const zmq_event_t & /*unused*/, const char *addr_)
116{
117 on_event("Handshake succeed", spdlog::level::info, addr_);
118}
119#endif
120
121void ZeroMQMonitor::on_event_unknown(const zmq_event_t & /*unused*/, const char *addr_)
122{
123 on_event("Unknown event", spdlog::level::warn, addr_);
124}
125
126void ZeroMQMonitor::startMonitoring(zmq::socket_t *socket, const std::string &monitorAddress)
127{
128 if (socket == nullptr)
129 {
130 throw std::invalid_argument("ZeroMQ socket to monitor is nullptr");
131 }
132
133 init(*socket, monitorAddress);
134 _monitorThread = std::make_unique<std::thread>(&ZeroMQMonitor::threadFunc, this);
135}
136
138{
139 const std::vector<std::function<void()>> testFunctions = {
140 [this]() { on_event_connected(zmq_event_t{}, "test_address"); },
141 [this]() { on_event_connect_delayed(zmq_event_t{}, "test_address"); },
142 [this]() { on_event_connect_retried(zmq_event_t{}, "test_address"); },
143 [this]() { on_event_listening(zmq_event_t{}, "test_address"); },
144 [this]() { on_event_bind_failed(zmq_event_t{}, "test_address"); },
145 [this]() { on_event_accepted(zmq_event_t{}, "test_address"); },
146 [this]() { on_event_accept_failed(zmq_event_t{}, "test_address"); },
147 [this]() { on_event_closed(zmq_event_t{}, "test_address"); },
148 [this]() { on_event_close_failed(zmq_event_t{}, "test_address"); },
149 [this]() { on_event_disconnected(zmq_event_t{}, "test_address"); },
150#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0) || \
151 (defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3))
152 [this]() { on_event_handshake_failed_no_detail(zmq_event_t{}, "test_address"); },
153 [this]() { on_event_handshake_failed_protocol(zmq_event_t{}, "test_address"); },
154 [this]() { on_event_handshake_failed_auth(zmq_event_t{}, "test_address"); },
155 [this]() { on_event_handshake_succeeded(zmq_event_t{}, "test_address"); },
156#elif defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1)
157 [this]() { on_event_handshake_failed(zmq_event_t{}, "test_address"); },
158 [this]() { on_event_handshake_succeed(zmq_event_t{}, "test_address"); },
159#endif
160 [this]() { on_event_unknown(zmq_event_t{}, "test_address"); }};
161
162 for (const auto &testFunc : testFunctions)
163 {
164 testFunc();
165 }
166}
167
169{
170 _shouldStop.test_and_set();
171#ifdef ZMQ_EVENT_MONITOR_STOPPED
172 abort();
173#endif
174
175 // Join the thread
176 if (_monitorThread && _monitorThread->joinable())
177 {
178 _monitorThread->join();
179 _monitorThread.reset();
180 }
181
182 spdlog::info("Monitor stopped");
183}
constexpr int EVENT_CHECK_TIMEOUT_MS
void on_event_unknown(const zmq_event_t &, const char *addr_) override
void on_event_connect_retried(const zmq_event_t &, const char *addr_) override
void on_event_disconnected(const zmq_event_t &, const char *addr_) override
void on_event_bind_failed(const zmq_event_t &, const char *addr_) override
void on_event_connect_delayed(const zmq_event_t &, const char *addr_) override
void on_event_listening(const zmq_event_t &, const char *addr_) override
void on_event_closed(const zmq_event_t &, const char *addr_) override
void startMonitoring(zmq::socket_t *socket, const std::string &monitorAddress)
void on_event_connected(const zmq_event_t &, const char *addr_) override
void on_event_handshake_succeeded(const zmq_event_t &, const char *addr_) override
void on_event_accepted(const zmq_event_t &, const char *addr_) override
std::unique_ptr< std::thread > _monitorThread
std::atomic_int _peerCount
void on_event_handshake_failed_auth(const zmq_event_t &, const char *addr_) override
void on_event_accept_failed(const zmq_event_t &, const char *addr_) override
static void on_event(const std::string &messageStr, int level, const char *addr=nullptr)
void on_monitor_started() override
void on_event_handshake_failed_no_detail(const zmq_event_t &, const char *addr_) override
void on_event_handshake_failed_protocol(const zmq_event_t &, const char *addr_) override
std::atomic_flag _shouldStop
void on_event_close_failed(const zmq_event_t &, const char *addr_) override