Repo-Init
 
Loading...
Searching...
No Matches
ZeroMQ.cpp
Go to the documentation of this file.
1#include "zeromq/ZeroMQ.hpp"
2
3#include <iostream>
4#include <optional>
5
6#include <spdlog/spdlog.h>
7#include <zmq_addon.hpp>
8
9// ZeroMQ send/receive timeouts in milliseconds
10constexpr int ZEROMQ_MSG_TIMEOUT_MS = 1000;
11// ZeroMQ heartbeat timeout in milliseconds
12constexpr int ZEROMQ_HEARTBEAT_TIMEOUT_MS = 1000;
13
14void ZeroMQ::init(const std::shared_ptr<zmq::context_t> &ctx, const zmq::socket_type &type,
15 const std::string_view &addr, bool isBind)
16{
17 _contextPtr = ctx;
18 _socketAddr = addr;
19 _isBinded = isBind;
20 _isActive = false;
21
22 // Init ZMQ connection
23 _socketPtr = std::make_unique<zmq::socket_t>(*_contextPtr, type);
24 _socketPtr->set(zmq::sockopt::linger, 0);
25 _socketPtr->set(zmq::sockopt::sndtimeo, ZEROMQ_MSG_TIMEOUT_MS);
26 _socketPtr->set(zmq::sockopt::rcvtimeo, ZEROMQ_MSG_TIMEOUT_MS);
27 _socketPtr->set(zmq::sockopt::heartbeat_ivl, ZEROMQ_HEARTBEAT_TIMEOUT_MS);
28 _socketPtr->set(zmq::sockopt::heartbeat_ttl, ZEROMQ_HEARTBEAT_TIMEOUT_MS * 3);
29 _socketPtr->set(zmq::sockopt::heartbeat_timeout, ZEROMQ_HEARTBEAT_TIMEOUT_MS);
30}
31
32ZeroMQ::ZeroMQ(const zmq::socket_type &type, const std::string &addr, bool isBind)
33{
34 init(std::make_shared<zmq::context_t>(1), type, addr, isBind);
35}
36
37ZeroMQ::ZeroMQ(const std::shared_ptr<zmq::context_t> &ctx, const zmq::socket_type &type, const std::string &addr,
38 bool isBind)
39{
40 init(ctx, type, addr, isBind);
41}
42
44{
45 if (_isActive)
46 {
47 return false;
48 }
49
50 if (_isBinded)
51 {
53 }
54 else
55 {
56 _socketPtr->connect(_socketAddr);
57 }
58 _isActive = true;
59
60 return true;
61}
62
64{
65 if (!_isActive)
66 {
67 return;
68 }
69
70 if (_isBinded)
71 {
72 _socketPtr->unbind(_socketAddr);
73 }
74 else
75 {
76 _socketPtr->disconnect(_socketAddr);
77 }
78 _isActive = false;
79}
80
81std::vector<zmq::message_t> ZeroMQ::recvMessages()
82{
83 std::vector<zmq::message_t> recvMsgs;
84 if (!_isActive)
85 {
86 spdlog::warn("Connection needs to starting");
87 }
88 else
89 {
90 auto nMsgs = zmq::recv_multipart(*_socketPtr, std::back_inserter(recvMsgs));
91 spdlog::debug("Received {} messages", nMsgs.value_or(0));
92 }
93 return recvMsgs;
94}
95
96size_t ZeroMQ::sendMessages(std::vector<zmq::message_t> &msg)
97{
98 zmq::send_result_t res;
99 if (!_isActive)
100 {
101 spdlog::warn("Connection needs to starting");
102 }
103 else
104 {
105 res = zmq::send_multipart(*_socketPtr, msg);
106 }
107
108 return res.value_or(0);
109}
110
112{
113 try
114 {
115 stop();
116 }
117 catch (const std::exception &e)
118 {
119 try
120 {
121 spdlog::error("Error while stopping ZeroMQ connection {} ({})", _socketAddr, e.what());
122 }
123 catch (const std::exception &e2)
124 {
125 std::cerr << "Error while stopping ZeroMQ connection and logger for connection " << _socketAddr << " ("
126 << e.what() << ")" << '\n'
127 << e2.what() << '\n';
128 }
129 }
130}
constexpr int ZEROMQ_HEARTBEAT_TIMEOUT_MS
Definition ZeroMQ.cpp:12
constexpr int ZEROMQ_MSG_TIMEOUT_MS
Definition ZeroMQ.cpp:10
void stop()
Definition ZeroMQ.cpp:63
std::vector< zmq::message_t > recvMessages()
Definition ZeroMQ.cpp:81
void init(const std::shared_ptr< zmq::context_t > &ctx, const zmq::socket_type &type, const std::string_view &addr, bool isBind)
Definition ZeroMQ.cpp:14
size_t sendMessages(std::vector< zmq::message_t > &msg)
Definition ZeroMQ.cpp:96
std::string _socketAddr
Definition ZeroMQ.hpp:25
bool _isActive
Definition ZeroMQ.hpp:21
std::unique_ptr< zmq::socket_t > _socketPtr
Definition ZeroMQ.hpp:18
~ZeroMQ()
Definition ZeroMQ.cpp:111
bool _isBinded
Definition ZeroMQ.hpp:23
bool start()
Definition ZeroMQ.cpp:43
std::shared_ptr< zmq::context_t > _contextPtr
Definition ZeroMQ.hpp:16
ZeroMQ(const zmq::socket_type &type, const std::string &addr, bool isBind)
Definition ZeroMQ.cpp:32