Repo-Init
 
Loading...
Searching...
No Matches
ZeroMQServer Class Reference

#include <ZeroMQServer.hpp>

Inheritance diagram for ZeroMQServer:
Collaboration diagram for ZeroMQServer:

Public Member Functions

 ZeroMQServer (const std::string &hostAddr, std::shared_ptr< std::atomic_flag > checkFlag, const std::shared_ptr< prometheus::Registry > &reg=nullptr, const std::string &prependName="")
 
 ZeroMQServer (const ZeroMQServer &)=delete
 Copy constructor.
 
 ZeroMQServer (ZeroMQServer &&)=delete
 Move constructor.
 
ZeroMQServeroperator= (ZeroMQServer)=delete
 Copy assignment operator.
 
ZeroMQServeroperator= (ZeroMQServer &&)=delete
 Move assignment operator.
 
bool initialise ()
 
void shutdown ()
 Closes the ZeroMQ Server.
 
 ~ZeroMQServer () override
 
void messageCallback (FPTR_MessageCallback func)
 
FPTR_MessageCallback messageCallback () const
 

Private Member Functions

void update ()
 Processes new messages.
 
void threadFunc () noexcept
 Main thread function.
 
- Private Member Functions inherited from ZeroMQ
 ZeroMQ (const zmq::socket_type &type, const std::string &addr, bool isBind)
 
 ZeroMQ (const std::shared_ptr< zmq::context_t > &ctx, const zmq::socket_type &type, const std::string &addr, bool isBind)
 
 ZeroMQ (const ZeroMQ &)=delete
 Copy constructor.
 
 ZeroMQ (ZeroMQ &&)=delete
 Move constructor.
 
ZeroMQoperator= (ZeroMQ)=delete
 Copy assignment operator.
 
ZeroMQoperator= (ZeroMQ &&)=delete
 Move assignment operator.
 
bool start ()
 
void stop ()
 
std::vector< zmq::message_t > recvMessages ()
 
size_t sendMessages (std::vector< zmq::message_t > &msg)
 
const std::unique_ptr< zmq::socket_t > & getSocket () const
 
const std::shared_ptr< zmq::context_t > & getContext () const
 
const std::string & getAddress () const
 
 ~ZeroMQ ()
 
- Private Member Functions inherited from ZeroMQMonitor
 ZeroMQMonitor ()=default
 Constructor.
 
 ZeroMQMonitor (const ZeroMQMonitor &)=delete
 Copy constructor.
 
 ZeroMQMonitor (ZeroMQMonitor &&)=delete
 Move constructor.
 
ZeroMQMonitoroperator= (ZeroMQMonitor)=delete
 Copy assignment operator.
 
ZeroMQMonitoroperator= (ZeroMQMonitor &&)=delete
 Move assignment operator.
 
void startMonitoring (zmq::socket_t *socket, const std::string &monitorAddress)
 
void stopMonitoring ()
 
 ~ZeroMQMonitor () override
 

Private Attributes

std::unique_ptr< std::thread > _serverThread
 
std::atomic_flag _shouldStop {false}
 
std::shared_ptr< std::atomic_flag > _checkFlag
 
std::unique_ptr< ZeroMQStats_stats
 
FPTR_MessageCallback _m_messageCallback
 

Detailed Description

Definition at line 11 of file ZeroMQServer.hpp.

Constructor & Destructor Documentation

◆ ZeroMQServer() [1/3]

ZeroMQServer::ZeroMQServer ( const std::string & hostAddr,
std::shared_ptr< std::atomic_flag > checkFlag,
const std::shared_ptr< prometheus::Registry > & reg = nullptr,
const std::string & prependName = "" )

Constructor for server

Parameters
[in]hostAddrHost address to connect. Can be anything supported by ZeroMQ reply socket
[in]checkFlagFlag to check if the server is running
[in]regPrometheus registry for stats
[in]prependNamePrefix for Prometheus stats

Definition at line 71 of file ZeroMQServer.cpp.

73 : ZeroMQ(zmq::socket_type::rep, hostAddr, true), _checkFlag(std::move(checkFlag))
74{
75 if (reg)
76 {
77 _stats = std::make_unique<ZeroMQStats>(reg, prependName);
78 }
79
80 startMonitoring(getSocket().get(), "inproc://" + std::to_string(constHasher(hostAddr.c_str())) + ".rep");
81}
constexpr size_t constHasher(const char *s)
Definition Hasher.hpp:13
void startMonitoring(zmq::socket_t *socket, const std::string &monitorAddress)
std::unique_ptr< ZeroMQStats > _stats
std::shared_ptr< std::atomic_flag > _checkFlag
const std::unique_ptr< zmq::socket_t > & getSocket() const
Definition ZeroMQ.hpp:90
ZeroMQ(const zmq::socket_type &type, const std::string &addr, bool isBind)
Definition ZeroMQ.cpp:32
Here is the call graph for this function:

◆ ZeroMQServer() [2/3]

ZeroMQServer::ZeroMQServer ( const ZeroMQServer & )
delete

Copy constructor.

◆ ZeroMQServer() [3/3]

ZeroMQServer::ZeroMQServer ( ZeroMQServer && )
delete

Move constructor.

◆ ~ZeroMQServer()

ZeroMQServer::~ZeroMQServer ( )
inlineoverride

Deconstructor for server

Definition at line 66 of file ZeroMQServer.hpp.

66{ shutdown(); }
void shutdown()
Closes the ZeroMQ Server.
Here is the call graph for this function:

Member Function Documentation

◆ initialise()

bool ZeroMQServer::initialise ( )

Initializes a new ZeroMQ server

Returns
true If initialized
false otherwise

Definition at line 83 of file ZeroMQServer.cpp.

84{
85 _shouldStop.clear();
86
87 if (start())
88 {
89 _serverThread = std::make_unique<std::thread>(&ZeroMQServer::threadFunc, this);
90 return true;
91 }
92 return false;
93}
std::atomic_flag _shouldStop
void threadFunc() noexcept
Main thread function.
std::unique_ptr< std::thread > _serverThread
bool start()
Definition ZeroMQ.cpp:43
Here is the call graph for this function:

◆ messageCallback() [1/2]

FPTR_MessageCallback ZeroMQServer::messageCallback ( ) const
inlinenodiscard

Gets the message callback function

Returns
The message callback function

Definition at line 78 of file ZeroMQServer.hpp.

78{ return _m_messageCallback; }
FPTR_MessageCallback _m_messageCallback
Here is the caller graph for this function:

◆ messageCallback() [2/2]

void ZeroMQServer::messageCallback ( FPTR_MessageCallback func)
inline

Sets the message callback function

Parameters
[in]funcThe message callback function to be set

Definition at line 72 of file ZeroMQServer.hpp.

72{ _m_messageCallback = std::move(func); }

◆ operator=() [1/2]

ZeroMQServer & ZeroMQServer::operator= ( ZeroMQServer && )
delete

Move assignment operator.

◆ operator=() [2/2]

ZeroMQServer & ZeroMQServer::operator= ( ZeroMQServer )
delete

Copy assignment operator.

◆ shutdown()

void ZeroMQServer::shutdown ( )

Closes the ZeroMQ Server.

Definition at line 95 of file ZeroMQServer.cpp.

96{
97 _shouldStop.test_and_set();
98 if (_serverThread && _serverThread->joinable())
99 {
100 _serverThread->join();
101 _serverThread.reset();
102 }
103
104 stop();
105}
void stop()
Definition ZeroMQ.cpp:63
Here is the call graph for this function:
Here is the caller graph for this function:

◆ threadFunc()

void ZeroMQServer::threadFunc ( )
privatenoexcept

Main thread function.

Definition at line 50 of file ZeroMQServer.cpp.

51{
52 spdlog::info("ZeroMQ server started");
53 while (!_shouldStop._M_i)
54 {
55 try
56 {
57 update();
58 if (_checkFlag)
59 {
60 _checkFlag->test_and_set();
61 }
62 }
63 catch (const std::exception &e)
64 {
65 spdlog::error("ZeroMQ server failed: {}", e.what());
66 }
67 }
68 spdlog::info("ZeroMQ server stopped");
69}
void update()
Processes new messages.
Here is the call graph for this function:
Here is the caller graph for this function:

◆ update()

void ZeroMQServer::update ( )
private

Processes new messages.

Definition at line 25 of file ZeroMQServer.cpp.

26{
27 auto recvMsgs = recvMessages();
28
29 if (!recvMsgs.empty())
30 {
31 std::vector<zmq::message_t> replyMsgs;
32
33 ZeroMQServerStats serverStats;
34 serverStats.processingTimeStart = std::chrono::high_resolution_clock::now();
35 serverStats.isSuccessful = messageCallback() && messageCallback()(recvMsgs, replyMsgs);
36
37 if (size_t nSentMsg = sendMessages(replyMsgs); nSentMsg != replyMsgs.size())
38 {
39 spdlog::warn("Can't send whole reply: Sent messages {} / {}", nSentMsg, replyMsgs.size());
40 }
41 serverStats.processingTimeEnd = std::chrono::high_resolution_clock::now();
42
43 if (_stats)
44 {
45 _stats->consumeStats(recvMsgs, replyMsgs, serverStats);
46 }
47 }
48}
FPTR_MessageCallback messageCallback() const
std::vector< zmq::message_t > recvMessages()
Definition ZeroMQ.cpp:81
size_t sendMessages(std::vector< zmq::message_t > &msg)
Definition ZeroMQ.cpp:96
bool isSuccessful
Indicates if processing was successful for this connection.
std::chrono::high_resolution_clock::time_point processingTimeEnd
Processing time end.
std::chrono::high_resolution_clock::time_point processingTimeStart
Processing time start.
Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ _checkFlag

std::shared_ptr<std::atomic_flag> ZeroMQServer::_checkFlag
private

Definition at line 18 of file ZeroMQServer.hpp.

◆ _m_messageCallback

FPTR_MessageCallback ZeroMQServer::_m_messageCallback
private

Definition at line 22 of file ZeroMQServer.hpp.

◆ _serverThread

std::unique_ptr<std::thread> ZeroMQServer::_serverThread
private

Definition at line 14 of file ZeroMQServer.hpp.

◆ _shouldStop

std::atomic_flag ZeroMQServer::_shouldStop {false}
private

Definition at line 16 of file ZeroMQServer.hpp.

16{false};

◆ _stats

std::unique_ptr<ZeroMQStats> ZeroMQServer::_stats
private

Definition at line 20 of file ZeroMQServer.hpp.


The documentation for this class was generated from the following files: