Repo-Init
 
Loading...
Searching...
No Matches
ZeroMQServer Class Reference

#include <ZeroMQServer.hpp>

Inheritance diagram for ZeroMQServer:
Collaboration diagram for ZeroMQServer:

Public Member Functions

 ZeroMQServer (const std::string &hostAddr, std::shared_ptr< std::atomic_flag > checkFlag, const std::shared_ptr< prometheus::Registry > &reg=nullptr, const std::string &prependName="")
 
 ZeroMQServer (const ZeroMQServer &)=delete
 Copy constructor.
 
 ZeroMQServer (ZeroMQServer &&)=delete
 Move constructor.
 
ZeroMQServeroperator= (ZeroMQServer)=delete
 Copy assignment operator.
 
ZeroMQServeroperator= (ZeroMQServer &&)=delete
 Move assignment operator.
 
bool initialise ()
 
void shutdown ()
 Closes the ZeroMQ Server.
 
 ~ZeroMQServer () override
 
void messageCallback (FPTR_MessageCallback func)
 
FPTR_MessageCallback messageCallback () const
 

Private Member Functions

void update ()
 Processes new messages.
 
void threadFunc (const std::stop_token &stopToken) noexcept
 Main thread function.
 
- Private Member Functions inherited from ZeroMQ
 ZeroMQ (const zmq::socket_type &type, const std::string &addr, bool isBind)
 
 ZeroMQ (const std::shared_ptr< zmq::context_t > &ctx, const zmq::socket_type &type, const std::string &addr, bool isBind)
 
 ZeroMQ (const ZeroMQ &)=delete
 Copy constructor.
 
 ZeroMQ (ZeroMQ &&)=delete
 Move constructor.
 
ZeroMQoperator= (ZeroMQ)=delete
 Copy assignment operator.
 
ZeroMQoperator= (ZeroMQ &&)=delete
 Move assignment operator.
 
bool start ()
 
void stop ()
 
std::vector< zmq::message_t > recvMessages ()
 
size_t sendMessages (std::vector< zmq::message_t > &msg)
 
const std::unique_ptr< zmq::socket_t > & getSocket () const
 
const std::shared_ptr< zmq::context_t > & getContext () const
 
const std::string & getAddress () const
 
 ~ZeroMQ ()
 
- Private Member Functions inherited from ZeroMQMonitor
 ZeroMQMonitor ()=default
 Constructor.
 
 ZeroMQMonitor (const ZeroMQMonitor &)=delete
 Copy constructor.
 
 ZeroMQMonitor (ZeroMQMonitor &&)=delete
 Move constructor.
 
ZeroMQMonitoroperator= (ZeroMQMonitor)=delete
 Copy assignment operator.
 
ZeroMQMonitoroperator= (ZeroMQMonitor &&)=delete
 Move assignment operator.
 
void startMonitoring (zmq::socket_t *socket, const std::string &monitorAddress)
 
int getPeerCount () const
 
void stopMonitoring ()
 
void testInternals ()
 
 ~ZeroMQMonitor () override
 

Private Attributes

std::shared_ptr< std::atomic_flag > _checkFlag
 
std::unique_ptr< ZeroMQStats_stats
 
FPTR_MessageCallback _m_messageCallback
 
std::unique_ptr< std::jthread > _serverThread
 

Detailed Description

Definition at line 11 of file ZeroMQServer.hpp.

Constructor & Destructor Documentation

◆ ZeroMQServer() [1/3]

ZeroMQServer::ZeroMQServer ( const std::string & hostAddr,
std::shared_ptr< std::atomic_flag > checkFlag,
const std::shared_ptr< prometheus::Registry > & reg = nullptr,
const std::string & prependName = "" )

Constructor for server

Parameters
[in]hostAddrHost address to connect. Can be anything supported by ZeroMQ reply socket
[in]checkFlagFlag to check if the server is running
[in]regPrometheus registry for stats
[in]prependNamePrefix for Prometheus stats

Definition at line 73 of file ZeroMQServer.cpp.

75 : ZeroMQ(zmq::socket_type::rep, hostAddr, true), _checkFlag(std::move(checkFlag))
76{
77 if (reg)
78 {
79 _stats = std::make_unique<ZeroMQStats>(reg, prependName);
80 }
81
82 startMonitoring(getSocket().get(), std::format("{}{}{}", "inproc://", constHasher(hostAddr.c_str()), ".rep"));
83}
constexpr size_t constHasher(const char *s)
Definition Hasher.hpp:13
void startMonitoring(zmq::socket_t *socket, const std::string &monitorAddress)
std::unique_ptr< ZeroMQStats > _stats
std::shared_ptr< std::atomic_flag > _checkFlag
const std::unique_ptr< zmq::socket_t > & getSocket() const
Definition ZeroMQ.hpp:90
ZeroMQ(const zmq::socket_type &type, const std::string &addr, bool isBind)
Definition ZeroMQ.cpp:32
Here is the call graph for this function:
Here is the caller graph for this function:

◆ ZeroMQServer() [2/3]

ZeroMQServer::ZeroMQServer ( const ZeroMQServer & )
delete

Copy constructor.

Here is the call graph for this function:

◆ ZeroMQServer() [3/3]

ZeroMQServer::ZeroMQServer ( ZeroMQServer && )
delete

Move constructor.

Here is the call graph for this function:

◆ ~ZeroMQServer()

ZeroMQServer::~ZeroMQServer ( )
inlineoverride

Deconstructor for server

Definition at line 64 of file ZeroMQServer.hpp.

64{ shutdown(); }
void shutdown()
Closes the ZeroMQ Server.
Here is the call graph for this function:

Member Function Documentation

◆ initialise()

bool ZeroMQServer::initialise ( )

Initializes a new ZeroMQ server

Returns
true If initialized
false otherwise

Definition at line 85 of file ZeroMQServer.cpp.

86{
87 if (start())
88 {
89 _serverThread = std::make_unique<std::jthread>([this](const std::stop_token &sToken) { threadFunc(sToken); });
90 return true;
91 }
92 return false;
93}
void threadFunc(const std::stop_token &stopToken) noexcept
Main thread function.
std::unique_ptr< std::jthread > _serverThread
bool start()
Definition ZeroMQ.cpp:43
Here is the call graph for this function:

◆ messageCallback() [1/2]

FPTR_MessageCallback ZeroMQServer::messageCallback ( ) const
inlinenodiscard

Gets the message callback function

Returns
The message callback function

Definition at line 76 of file ZeroMQServer.hpp.

76{ return _m_messageCallback; }
FPTR_MessageCallback _m_messageCallback
Here is the caller graph for this function:

◆ messageCallback() [2/2]

void ZeroMQServer::messageCallback ( FPTR_MessageCallback func)
inline

Sets the message callback function

Parameters
[in]funcThe message callback function to be set

Definition at line 70 of file ZeroMQServer.hpp.

70{ _m_messageCallback = std::move(func); }

◆ operator=() [1/2]

ZeroMQServer & ZeroMQServer::operator= ( ZeroMQServer && )
delete

Move assignment operator.

Here is the call graph for this function:

◆ operator=() [2/2]

ZeroMQServer & ZeroMQServer::operator= ( ZeroMQServer )
delete

Copy assignment operator.

Here is the call graph for this function:

◆ shutdown()

void ZeroMQServer::shutdown ( )

Closes the ZeroMQ Server.

Definition at line 95 of file ZeroMQServer.cpp.

96{
97 if (_serverThread)
98 {
99 _serverThread.reset();
100 }
101
102 stop();
103}
void stop()
Definition ZeroMQ.cpp:63
Here is the call graph for this function:
Here is the caller graph for this function:

◆ threadFunc()

void ZeroMQServer::threadFunc ( const std::stop_token & stopToken)
privatenoexcept

Main thread function.

Definition at line 52 of file ZeroMQServer.cpp.

53{
54 spdlog::info("ZeroMQ server started");
55 while (!stopToken.stop_requested())
56 {
57 try
58 {
59 update();
60 if (_checkFlag)
61 {
62 _checkFlag->test_and_set();
63 }
64 }
65 catch (const std::exception &e)
66 {
67 spdlog::error("ZeroMQ server failed: {}", e.what());
68 }
69 }
70 spdlog::info("ZeroMQ server stopped");
71}
void update()
Processes new messages.
Here is the call graph for this function:
Here is the caller graph for this function:

◆ update()

void ZeroMQServer::update ( )
private

Processes new messages.

Definition at line 27 of file ZeroMQServer.cpp.

28{
29 auto recvMsgs = recvMessages();
30
31 if (!recvMsgs.empty())
32 {
33 std::vector<zmq::message_t> replyMsgs;
34
35 ZeroMQServerStats serverStats;
36 serverStats.processingTimeStart = std::chrono::high_resolution_clock::now();
37 serverStats.isSuccessful = messageCallback() && messageCallback()(recvMsgs, replyMsgs);
38
39 if (size_t nSentMsg = sendMessages(replyMsgs); nSentMsg != replyMsgs.size())
40 {
41 spdlog::warn("Can't send whole reply: Sent messages {} / {}", nSentMsg, replyMsgs.size());
42 }
43 serverStats.processingTimeEnd = std::chrono::high_resolution_clock::now();
44
45 if (_stats)
46 {
47 _stats->consumeStats(recvMsgs, replyMsgs, serverStats);
48 }
49 }
50}
FPTR_MessageCallback messageCallback() const
std::vector< zmq::message_t > recvMessages()
Definition ZeroMQ.cpp:81
size_t sendMessages(std::vector< zmq::message_t > &msg)
Definition ZeroMQ.cpp:96
bool isSuccessful
Indicates if processing was successful for this connection.
std::chrono::high_resolution_clock::time_point processingTimeEnd
Processing time end.
std::chrono::high_resolution_clock::time_point processingTimeStart
Processing time start.
Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ _checkFlag

std::shared_ptr<std::atomic_flag> ZeroMQServer::_checkFlag
private

Definition at line 14 of file ZeroMQServer.hpp.

◆ _m_messageCallback

FPTR_MessageCallback ZeroMQServer::_m_messageCallback
private

Definition at line 18 of file ZeroMQServer.hpp.

◆ _serverThread

std::unique_ptr<std::jthread> ZeroMQServer::_serverThread
private

Definition at line 20 of file ZeroMQServer.hpp.

◆ _stats

std::unique_ptr<ZeroMQStats> ZeroMQServer::_stats
private

Definition at line 16 of file ZeroMQServer.hpp.


The documentation for this class was generated from the following files: