#include <ZeroMQServer.hpp>
Definition at line 11 of file ZeroMQServer.hpp.
◆ ZeroMQServer() [1/3]
| ZeroMQServer::ZeroMQServer |
( |
const std::string & | hostAddr, |
|
|
std::shared_ptr< std::atomic_flag > | checkFlag, |
|
|
const std::shared_ptr< prometheus::Registry > & | reg = nullptr, |
|
|
const std::string & | prependName = "" ) |
Constructor for server
- Parameters
-
| [in] | hostAddr | Host address to connect. Can be anything supported by ZeroMQ reply socket |
| [in] | checkFlag | Flag to check if the server is running |
| [in] | reg | Prometheus registry for stats |
| [in] | prependName | Prefix for Prometheus stats |
Definition at line 73 of file ZeroMQServer.cpp.
75 :
ZeroMQ(zmq::socket_type::rep, hostAddr,
true),
_checkFlag(std::move(checkFlag))
76{
77 if (reg)
78 {
79 _stats = std::make_unique<ZeroMQStats>(reg, prependName);
80 }
81
83}
constexpr size_t constHasher(const char *s)
void startMonitoring(zmq::socket_t *socket, const std::string &monitorAddress)
std::unique_ptr< ZeroMQStats > _stats
std::shared_ptr< std::atomic_flag > _checkFlag
const std::unique_ptr< zmq::socket_t > & getSocket() const
ZeroMQ(const zmq::socket_type &type, const std::string &addr, bool isBind)
◆ ZeroMQServer() [2/3]
◆ ZeroMQServer() [3/3]
◆ ~ZeroMQServer()
| ZeroMQServer::~ZeroMQServer |
( |
| ) |
|
|
inlineoverride |
Deconstructor for server
Definition at line 64 of file ZeroMQServer.hpp.
void shutdown()
Closes the ZeroMQ Server.
◆ initialise()
| bool ZeroMQServer::initialise |
( |
| ) |
|
Initializes a new ZeroMQ server
- Returns
- true If initialized
-
false otherwise
Definition at line 85 of file ZeroMQServer.cpp.
86{
88 {
90 return true;
91 }
92 return false;
93}
void threadFunc(const std::stop_token &stopToken) noexcept
Main thread function.
std::unique_ptr< std::jthread > _serverThread
◆ messageCallback() [1/2]
Gets the message callback function
- Returns
- The message callback function
Definition at line 76 of file ZeroMQServer.hpp.
FPTR_MessageCallback _m_messageCallback
◆ messageCallback() [2/2]
Sets the message callback function
- Parameters
-
| [in] | func | The message callback function to be set |
Definition at line 70 of file ZeroMQServer.hpp.
◆ operator=() [1/2]
Move assignment operator.
◆ operator=() [2/2]
Copy assignment operator.
◆ shutdown()
| void ZeroMQServer::shutdown |
( |
| ) |
|
◆ threadFunc()
| void ZeroMQServer::threadFunc |
( |
const std::stop_token & | stopToken | ) |
|
|
privatenoexcept |
Main thread function.
Definition at line 52 of file ZeroMQServer.cpp.
53{
54 spdlog::info("ZeroMQ server started");
55 while (!stopToken.stop_requested())
56 {
57 try
58 {
61 {
63 }
64 }
65 catch (const std::exception &e)
66 {
67 spdlog::error("ZeroMQ server failed: {}", e.what());
68 }
69 }
70 spdlog::info("ZeroMQ server stopped");
71}
void update()
Processes new messages.
◆ update()
| void ZeroMQServer::update |
( |
| ) |
|
|
private |
Processes new messages.
Definition at line 27 of file ZeroMQServer.cpp.
28{
30
31 if (!recvMsgs.empty())
32 {
33 std::vector<zmq::message_t> replyMsgs;
34
35 ZeroMQServerStats serverStats;
38
39 if (
size_t nSentMsg =
sendMessages(replyMsgs); nSentMsg != replyMsgs.size())
40 {
41 spdlog::warn("Can't send whole reply: Sent messages {} / {}", nSentMsg, replyMsgs.size());
42 }
44
46 {
47 _stats->consumeStats(recvMsgs, replyMsgs, serverStats);
48 }
49 }
50}
FPTR_MessageCallback messageCallback() const
std::vector< zmq::message_t > recvMessages()
size_t sendMessages(std::vector< zmq::message_t > &msg)
bool isSuccessful
Indicates if processing was successful for this connection.
std::chrono::high_resolution_clock::time_point processingTimeEnd
Processing time end.
std::chrono::high_resolution_clock::time_point processingTimeStart
Processing time start.
◆ _checkFlag
| std::shared_ptr<std::atomic_flag> ZeroMQServer::_checkFlag |
|
private |
◆ _m_messageCallback
◆ _serverThread
| std::unique_ptr<std::jthread> ZeroMQServer::_serverThread |
|
private |
◆ _stats
The documentation for this class was generated from the following files: