Repo-Init
 
Loading...
Searching...
No Matches
ZeroMQServer Class Reference

#include <ZeroMQServer.hpp>

Inheritance diagram for ZeroMQServer:
Collaboration diagram for ZeroMQServer:

Public Member Functions

 ZeroMQServer (const std::string &hostAddr, std::shared_ptr< std::atomic_flag > checkFlag, const std::shared_ptr< prometheus::Registry > &reg=nullptr, const std::string &prependName="")
 
 ZeroMQServer (const ZeroMQServer &)=delete
 Copy constructor.
 
 ZeroMQServer (ZeroMQServer &&)=delete
 Move constructor.
 
ZeroMQServeroperator= (ZeroMQServer)=delete
 Copy assignment operator.
 
ZeroMQServeroperator= (ZeroMQServer &&)=delete
 Move assignment operator.
 
bool initialise ()
 
void shutdown ()
 Closes the ZeroMQ Server.
 
 ~ZeroMQServer () override
 
void messageCallback (FPTR_MessageCallback func)
 
FPTR_MessageCallback messageCallback () const
 

Private Member Functions

void update ()
 Processes new messages.
 
void threadFunc () noexcept
 Main thread function.
 
- Private Member Functions inherited from ZeroMQ
 ZeroMQ (const zmq::socket_type &type, const std::string &addr, bool isBind)
 
 ZeroMQ (const std::shared_ptr< zmq::context_t > &ctx, const zmq::socket_type &type, const std::string &addr, bool isBind)
 
 ZeroMQ (const ZeroMQ &)=delete
 Copy constructor.
 
 ZeroMQ (ZeroMQ &&)=delete
 Move constructor.
 
ZeroMQoperator= (ZeroMQ)=delete
 Copy assignment operator.
 
ZeroMQoperator= (ZeroMQ &&)=delete
 Move assignment operator.
 
bool start ()
 
void stop ()
 
std::vector< zmq::message_t > recvMessages ()
 
size_t sendMessages (std::vector< zmq::message_t > &msg)
 
const std::unique_ptr< zmq::socket_t > & getSocket () const
 
const std::shared_ptr< zmq::context_t > & getContext () const
 
const std::string & getAddress () const
 
 ~ZeroMQ ()
 
- Private Member Functions inherited from ZeroMQMonitor
 ZeroMQMonitor ()=default
 Constructor.
 
 ZeroMQMonitor (const ZeroMQMonitor &)=delete
 Copy constructor.
 
 ZeroMQMonitor (ZeroMQMonitor &&)=delete
 Move constructor.
 
ZeroMQMonitoroperator= (ZeroMQMonitor)=delete
 Copy assignment operator.
 
ZeroMQMonitoroperator= (ZeroMQMonitor &&)=delete
 Move assignment operator.
 
void startMonitoring (zmq::socket_t *socket, const std::string &monitorAddress)
 
void stopMonitoring ()
 
 ~ZeroMQMonitor () override
 

Private Attributes

std::unique_ptr< std::thread > _serverThread
 
std::atomic_flag _shouldStop {false}
 
std::shared_ptr< std::atomic_flag > _checkFlag
 
std::unique_ptr< ZeroMQStats_stats
 
FPTR_MessageCallback _m_messageCallback
 

Detailed Description

Definition at line 11 of file ZeroMQServer.hpp.

Constructor & Destructor Documentation

◆ ZeroMQServer() [1/3]

ZeroMQServer::ZeroMQServer ( const std::string & hostAddr,
std::shared_ptr< std::atomic_flag > checkFlag,
const std::shared_ptr< prometheus::Registry > & reg = nullptr,
const std::string & prependName = "" )

Constructor for server

Parameters
[in]hostAddrHost address to connect. Can be anything supported by ZeroMQ reply socket
[in]checkFlagFlag to check if the server is running
[in]regPrometheus registry for stats
[in]prependNamePrefix for Prometheus stats

Definition at line 68 of file ZeroMQServer.cpp.

70 : ZeroMQ(zmq::socket_type::rep, hostAddr, true), _checkFlag(std::move(checkFlag))
71{
72 if (reg)
73 {
74 _stats = std::make_unique<ZeroMQStats>(reg, prependName);
75 }
76
77 startMonitoring(getSocket().get(), "inproc://" + std::to_string(constHasher(hostAddr.c_str())) + ".rep");
78}
constexpr size_t constHasher(const char *s)
Definition Hasher.hpp:13
void startMonitoring(zmq::socket_t *socket, const std::string &monitorAddress)
std::unique_ptr< ZeroMQStats > _stats
std::shared_ptr< std::atomic_flag > _checkFlag
const std::unique_ptr< zmq::socket_t > & getSocket() const
Definition ZeroMQ.hpp:90
ZeroMQ(const zmq::socket_type &type, const std::string &addr, bool isBind)
Definition ZeroMQ.cpp:32
Here is the call graph for this function:

◆ ZeroMQServer() [2/3]

ZeroMQServer::ZeroMQServer ( const ZeroMQServer & )
delete

Copy constructor.

◆ ZeroMQServer() [3/3]

ZeroMQServer::ZeroMQServer ( ZeroMQServer && )
delete

Move constructor.

◆ ~ZeroMQServer()

ZeroMQServer::~ZeroMQServer ( )
inlineoverride

Deconstructor for server

Definition at line 66 of file ZeroMQServer.hpp.

66{ shutdown(); }
void shutdown()
Closes the ZeroMQ Server.
Here is the call graph for this function:

Member Function Documentation

◆ initialise()

bool ZeroMQServer::initialise ( )

Initializes a new ZeroMQ server

Returns
true If initialized
false otherwise

Definition at line 80 of file ZeroMQServer.cpp.

81{
82 _shouldStop.clear();
83
84 if (start())
85 {
86 _serverThread = std::make_unique<std::thread>(&ZeroMQServer::threadFunc, this);
87 return true;
88 }
89 return false;
90}
std::atomic_flag _shouldStop
void threadFunc() noexcept
Main thread function.
std::unique_ptr< std::thread > _serverThread
bool start()
Definition ZeroMQ.cpp:43
Here is the call graph for this function:

◆ messageCallback() [1/2]

FPTR_MessageCallback ZeroMQServer::messageCallback ( ) const
inlinenodiscard

Gets the message callback function

Returns
The message callback function

Definition at line 78 of file ZeroMQServer.hpp.

78{ return _m_messageCallback; }
FPTR_MessageCallback _m_messageCallback
Here is the caller graph for this function:

◆ messageCallback() [2/2]

void ZeroMQServer::messageCallback ( FPTR_MessageCallback func)
inline

Sets the message callback function

Parameters
[in]funcThe message callback function to be set

Definition at line 72 of file ZeroMQServer.hpp.

72{ _m_messageCallback = std::move(func); }

◆ operator=() [1/2]

ZeroMQServer & ZeroMQServer::operator= ( ZeroMQServer && )
delete

Move assignment operator.

◆ operator=() [2/2]

ZeroMQServer & ZeroMQServer::operator= ( ZeroMQServer )
delete

Copy assignment operator.

◆ shutdown()

void ZeroMQServer::shutdown ( )

Closes the ZeroMQ Server.

Definition at line 92 of file ZeroMQServer.cpp.

93{
94 _shouldStop.test_and_set();
95 if (_serverThread && _serverThread->joinable())
96 {
97 _serverThread->join();
98 _serverThread.reset();
99 }
100
101 stop();
102}
void stop()
Definition ZeroMQ.cpp:63
Here is the call graph for this function:
Here is the caller graph for this function:

◆ threadFunc()

void ZeroMQServer::threadFunc ( )
privatenoexcept

Main thread function.

Definition at line 47 of file ZeroMQServer.cpp.

48{
49 spdlog::info("ZeroMQ server started");
50 while (!_shouldStop._M_i)
51 {
52 try
53 {
54 update();
55 if (_checkFlag)
56 {
57 _checkFlag->test_and_set();
58 }
59 }
60 catch (const std::exception &e)
61 {
62 spdlog::error("ZeroMQ server failed: {}", e.what());
63 }
64 }
65 spdlog::info("ZeroMQ server stopped");
66}
void update()
Processes new messages.
Here is the call graph for this function:
Here is the caller graph for this function:

◆ update()

void ZeroMQServer::update ( )
private

Processes new messages.

Definition at line 21 of file ZeroMQServer.cpp.

22{
23 auto recvMsgs = recvMessages();
24
25 if (!recvMsgs.empty())
26 {
27 std::vector<zmq::message_t> replyMsgs;
28
29 ZeroMQServerStats serverStats;
30 serverStats.processingTimeStart = std::chrono::high_resolution_clock::now();
31 serverStats.isSuccessful = messageCallback() && messageCallback()(recvMsgs, replyMsgs);
32
33 size_t nSentMsg = sendMessages(replyMsgs);
34 if (nSentMsg != replyMsgs.size())
35 {
36 spdlog::warn("Can't send whole reply: Sent messages {} / {}", nSentMsg, replyMsgs.size());
37 }
38 serverStats.processingTimeEnd = std::chrono::high_resolution_clock::now();
39
40 if (_stats)
41 {
42 _stats->consumeStats(recvMsgs, replyMsgs, serverStats);
43 }
44 }
45}
FPTR_MessageCallback messageCallback() const
std::vector< zmq::message_t > recvMessages()
Definition ZeroMQ.cpp:81
size_t sendMessages(std::vector< zmq::message_t > &msg)
Definition ZeroMQ.cpp:96
bool isSuccessful
Indicates if processing was successful for this connection.
std::chrono::high_resolution_clock::time_point processingTimeEnd
Processing time end.
std::chrono::high_resolution_clock::time_point processingTimeStart
Processing time start.
Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ _checkFlag

std::shared_ptr<std::atomic_flag> ZeroMQServer::_checkFlag
private

Definition at line 18 of file ZeroMQServer.hpp.

◆ _m_messageCallback

FPTR_MessageCallback ZeroMQServer::_m_messageCallback
private

Definition at line 22 of file ZeroMQServer.hpp.

◆ _serverThread

std::unique_ptr<std::thread> ZeroMQServer::_serverThread
private

Definition at line 14 of file ZeroMQServer.hpp.

◆ _shouldStop

std::atomic_flag ZeroMQServer::_shouldStop {false}
private

Definition at line 16 of file ZeroMQServer.hpp.

16{false};

◆ _stats

std::unique_ptr<ZeroMQStats> ZeroMQServer::_stats
private

Definition at line 20 of file ZeroMQServer.hpp.


The documentation for this class was generated from the following files: