Repo-Init
 
All Classes Namespaces Files Functions Variables Typedefs Macros Pages
Loading...
Searching...
No Matches
ZeroMQMonitor.cpp
Go to the documentation of this file.
2
3#include <spdlog/spdlog.h>
4
5constexpr int EVENT_CHECK_TIMEOUT_MS = 100;
6
8{
9 while (!_shouldStop._M_i)
10 {
11 check_event(EVENT_CHECK_TIMEOUT_MS);
12 }
13}
14
15void ZeroMQMonitor::on_event(const std::string &messageStr, int level, const char *addr)
16{
17 switch (level)
18 {
19 case spdlog::level::trace:
20 spdlog::trace("{} {}", messageStr, addr == nullptr ? "" : addr);
21 break;
22 case spdlog::level::debug:
23 spdlog::debug("{} {}", messageStr, addr == nullptr ? "" : addr);
24 break;
25 case spdlog::level::info:
26 spdlog::info("{} {}", messageStr, addr == nullptr ? "" : addr);
27 break;
28 case spdlog::level::warn:
29 spdlog::warn("{} {}", messageStr, addr == nullptr ? "" : addr);
30 break;
31 case spdlog::level::err:
32 spdlog::error("{} {}", messageStr, addr == nullptr ? "" : addr);
33 break;
34 case spdlog::level::critical:
35 spdlog::critical("{} {}", messageStr, addr == nullptr ? "" : addr);
36 break;
37 default:
38 spdlog::warn("Unknown log level {} {} {}", messageStr, addr == nullptr ? "" : addr, level);
39 break;
40 }
41}
42
43void ZeroMQMonitor::on_monitor_started() { on_event("Monitor started", spdlog::level::info); }
44
45void ZeroMQMonitor::on_event_connected(const zmq_event_t & /*unused*/, const char *addr_)
46{
47 _peerCount.fetch_add(1);
48 on_event("Connected", spdlog::level::info, addr_);
49}
50
51void ZeroMQMonitor::on_event_connect_delayed(const zmq_event_t & /*unused*/, const char *addr_)
52{
53 on_event("Connect delayed", spdlog::level::debug, addr_);
54}
55
56void ZeroMQMonitor::on_event_connect_retried(const zmq_event_t & /*unused*/, const char *addr_)
57{
58 on_event("Connect retried", spdlog::level::debug, addr_);
59}
60
61void ZeroMQMonitor::on_event_listening(const zmq_event_t & /*unused*/, const char *addr_)
62{
63 on_event("Listening", spdlog::level::debug, addr_);
64}
65
66void ZeroMQMonitor::on_event_bind_failed(const zmq_event_t & /*unused*/, const char *addr_)
67{
68 on_event("Bind failed", spdlog::level::warn, addr_);
69}
70
71void ZeroMQMonitor::on_event_accepted(const zmq_event_t & /*unused*/, const char *addr_)
72{
73 on_event("Accepted", spdlog::level::info, addr_);
74}
75
76void ZeroMQMonitor::on_event_accept_failed(const zmq_event_t & /*unused*/, const char *addr_)
77{
78 on_event("Accept failed", spdlog::level::warn, addr_);
79}
80
81void ZeroMQMonitor::on_event_closed(const zmq_event_t & /*unused*/, const char *addr_)
82{
83 on_event("Closed", spdlog::level::debug, addr_);
84}
85
86void ZeroMQMonitor::on_event_close_failed(const zmq_event_t & /*unused*/, const char *addr_)
87{
88 on_event("Close failed", spdlog::level::warn, addr_);
89}
90
91void ZeroMQMonitor::on_event_disconnected(const zmq_event_t & /*unused*/, const char *addr_)
92{
93 _peerCount.fetch_sub(1);
94 on_event("Disconnected", spdlog::level::info, addr_);
95}
96
97#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0) || \
98 (defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3))
99void ZeroMQMonitor::on_event_handshake_failed_no_detail(const zmq_event_t & /*unused*/, const char *addr_)
100{
101 on_event("Handshake failed (no detail)", spdlog::level::warn, addr_);
102}
103
104void ZeroMQMonitor::on_event_handshake_failed_protocol(const zmq_event_t & /*unused*/, const char *addr_)
105{
106 on_event("Handshake failed (protocol)", spdlog::level::warn, addr_);
107}
108
109void ZeroMQMonitor::on_event_handshake_failed_auth(const zmq_event_t & /*unused*/, const char *addr_)
110{
111 on_event("Handshake failed (auth)", spdlog::level::warn, addr_);
112}
113
114void ZeroMQMonitor::on_event_handshake_succeeded(const zmq_event_t & /*unused*/, const char *addr_)
115{
116 on_event("Handshake succeeded", spdlog::level::info, addr_);
117}
118
119#elif defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1)
120void ZeroMQMonitor::on_event_handshake_failed(const zmq_event_t & /*unused*/, const char *addr_)
121{
122 on_event("Handshake failed", spdlog::level::warn, addr_);
123}
124
125void ZeroMQMonitor::on_event_handshake_succeed(const zmq_event_t & /*unused*/, const char *addr_)
126{
127 on_event("Handshake succeed", spdlog::level::info, addr_);
128}
129#endif
130
131void ZeroMQMonitor::on_event_unknown(const zmq_event_t & /*unused*/, const char *addr_)
132{
133 on_event("Unknown event", spdlog::level::warn, addr_);
134}
135
136void ZeroMQMonitor::startMonitoring(zmq::socket_t *socket, const std::string &monitorAddress)
137{
138 if (socket == nullptr)
139 {
140 throw std::invalid_argument("ZeroMQ socket to monitor is nullptr");
141 }
142
143 init(*socket, monitorAddress);
144 _monitorThread = std::make_unique<std::thread>(&ZeroMQMonitor::threadFunc, this);
145}
146
148{
149 _shouldStop.test_and_set();
150#ifdef ZMQ_EVENT_MONITOR_STOPPED
151 abort();
152#endif
153
154 // Join the thread
155 if (_monitorThread && _monitorThread->joinable())
156 {
157 _monitorThread->join();
158 _monitorThread.reset();
159 }
160
161 spdlog::info("Monitor stopped");
162}
constexpr int EVENT_CHECK_TIMEOUT_MS
void on_event_unknown(const zmq_event_t &, const char *addr_) override
void on_event_connect_retried(const zmq_event_t &, const char *addr_) override
void on_event_disconnected(const zmq_event_t &, const char *addr_) override
void on_event_bind_failed(const zmq_event_t &, const char *addr_) override
void on_event_connect_delayed(const zmq_event_t &, const char *addr_) override
void on_event_listening(const zmq_event_t &, const char *addr_) override
void on_event_closed(const zmq_event_t &, const char *addr_) override
void startMonitoring(zmq::socket_t *socket, const std::string &monitorAddress)
void on_event_connected(const zmq_event_t &, const char *addr_) override
void on_event_handshake_succeeded(const zmq_event_t &, const char *addr_) override
void on_event_accepted(const zmq_event_t &, const char *addr_) override
std::unique_ptr< std::thread > _monitorThread
std::atomic_int _peerCount
void on_event_handshake_failed_auth(const zmq_event_t &, const char *addr_) override
void on_event_accept_failed(const zmq_event_t &, const char *addr_) override
static void on_event(const std::string &messageStr, int level, const char *addr=nullptr)
void on_monitor_started() override
void on_event_handshake_failed_no_detail(const zmq_event_t &, const char *addr_) override
void on_event_handshake_failed_protocol(const zmq_event_t &, const char *addr_) override
std::atomic_flag _shouldStop
void on_event_close_failed(const zmq_event_t &, const char *addr_) override