From 74c0d1cb2178d11359c96b4fa1653d7dbdace930 Mon Sep 17 00:00:00 2001 From: David Hale Date: Mon, 9 Mar 2026 18:37:52 -0700 Subject: [PATCH 1/3] introduces a new messaging library in message.h (just structure at this point) --- camerad/camera_server.cpp | 9 +++++++ camerad/camera_server.h | 7 +++++ camerad/camerad.cpp | 27 ++++++-------------- camerad/camerad.h | 2 ++ common/message.h | 54 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 80 insertions(+), 19 deletions(-) create mode 100644 common/message.h diff --git a/camerad/camera_server.cpp b/camerad/camera_server.cpp index a5aa045..5f610a3 100644 --- a/camerad/camera_server.cpp +++ b/camerad/camera_server.cpp @@ -19,6 +19,8 @@ namespace Camera { id_pool(N_THREADS), cmd_num(0) { + message_server.set_handler( std::bind(&Camera::Server::dispatch, this, std::placeholders::_1) ); + interface=Camera::Interface::create(); // factory funcion creates the appropriate interface type interface->set_server(this); // pointer back to this Server instance } @@ -82,6 +84,13 @@ namespace Camera { /***** Camera::Server::exit_cleanly *****************************************/ + void Server::start() { + const std::string function("Camera::Server::start"); + std::string ret = message_server.test("killroy was here"); + logwrite(function, ret); + } + + /***** Camera::Server::block_main *******************************************/ /** * @brief main function for blocking connection thread diff --git a/camerad/camera_server.h b/camerad/camera_server.h index ad5d9e7..cae5c6f 100644 --- a/camerad/camera_server.h +++ b/camerad/camera_server.h @@ -19,6 +19,7 @@ #include "utilities.h" #include "network.h" #include "camerad_commands.h" +#include "message.h" namespace Camera { @@ -29,6 +30,8 @@ namespace Camera { Server(); ~Server(); + Message::Server message_server; + std::unique_ptr interface; int blkport; @@ -43,6 +46,10 @@ namespace Camera { void exit_cleanly(); void block_main(std::shared_ptr socket); void doit(Network::TcpSocket sock); + + void start(); + + std::string dispatch(const std::string &message) { logwrite("Camera::Server::dispatch", "foo"); return "bar"; } }; } diff --git a/camerad/camerad.cpp b/camerad/camerad.cpp index 02a173c..11c2f45 100644 --- a/camerad/camerad.cpp +++ b/camerad/camerad.cpp @@ -54,27 +54,16 @@ int main( int argc, char** argv ) { exit(1); } - // dynamically create a new listening socket and thread - // to handle each connection request +//Message::Server test; + + // start the ZMQ router // - Network::TcpSocket sock_block(camerad.blkport, true, -1, 0); // instantiate a TcpSocket with blocking port - if ( sock_block.Listen() < 0 ) { - std::cerr << "ERROR could not create listening socket\n"; - exit(1); + try { + camerad.start(); } - - while ( true ) { - auto newid = camerad.id_pool.get_next_number(); // get next available number from the pool - { - std::lock_guard lock(camerad.sock_block_mutex); - camerad.socklist[newid] = std::make_shared(sock_block); // create a new socket - camerad.socklist[newid]->id = newid; // update the id of the copied socket - camerad.socklist[newid]->Accept(); // accept connections on the new socket - } - - // Create a new thread to handle the connection - // - std::thread( &Camera::Server::block_main, &camerad, camerad.socklist[newid] ).detach(); + catch (const std::exception &e) { + logwrite(function, "ERROR starting server: "+std::string(e.what())); + exit(1); } for (;;) pause(); // main thread suspends diff --git a/camerad/camerad.h b/camerad/camerad.h index 20f64c0..f581ef1 100644 --- a/camerad/camerad.h +++ b/camerad/camerad.h @@ -28,4 +28,6 @@ #include "common.h" #include "logentry.h" +#include "message.h" + int main(int argc, char** argv); ///< the main function diff --git a/common/message.h b/common/message.h new file mode 100644 index 0000000..317106b --- /dev/null +++ b/common/message.h @@ -0,0 +1,54 @@ +/* + * message.h + * + */ +#pragma once +#include + +namespace Message { + + using MessageHandler = std::function; + + class Server { + private: + zmq::context_t context_; + zmq::socket_t router_; + zmq::socket_t pull_; + + MessageHandler handler_; + + std::atomic isrunning{false}; + + std::string inproc_address; + std::thread poll_thread; + int zmqport; + + public: + Server() : router_(context_, ZMQ_ROUTER), pull_(context_, ZMQ_PULL) {} + + std::string test(const std::string &message) { std::cerr << message << "\n"; return handler_(message); } + void set_handler(MessageHandler handler) { handler_ = std::move(handler); } + void start(); + void stop(); + void poll_loop() { + zmq::message_t routing_frame; + zmq::message_t messageid_frame; + zmq::message_t payload_frame; + + std::string routing( static_cast(routing_frame.data()), routing_frame.size() ); + std::string messageid( static_cast(messageid_frame.data()), messageid_frame.size() ); + std::string payload( static_cast(payload_frame.data()), payload_frame.size() ); + } + + }; + + class Client { + }; + + class xPubSub { + }; + + class xPubSubHandler { + }; + +} From a60e0bf0b9c32e9a08bf83e38d26361bcfc368fa Mon Sep 17 00:00:00 2001 From: David Hale Date: Tue, 10 Mar 2026 18:16:54 -0700 Subject: [PATCH 2/3] mostly finished! (Client/Server part) --- camerad/camera_server.cpp | 115 +++------- camerad/camera_server.h | 8 +- camerad/camerad.cpp | 8 +- common/message.h | 427 ++++++++++++++++++++++++++++++++++++-- 4 files changed, 439 insertions(+), 119 deletions(-) diff --git a/camerad/camera_server.cpp b/camerad/camera_server.cpp index 5f610a3..23ec655 100644 --- a/camerad/camera_server.cpp +++ b/camerad/camera_server.cpp @@ -19,7 +19,9 @@ namespace Camera { id_pool(N_THREADS), cmd_num(0) { - message_server.set_handler( std::bind(&Camera::Server::dispatch, this, std::placeholders::_1) ); + message_server.set_handler( std::bind(&Camera::Server::dispatch, this, + std::placeholders::_1, + std::placeholders::_2) ); interface=Camera::Interface::create(); // factory funcion creates the appropriate interface type interface->set_server(this); // pointer back to this Server instance @@ -78,19 +80,13 @@ namespace Camera { void Server::exit_cleanly() { const std::string function("Camera::Server::exit_cleanly"); this->interface->disconnect_controller(); + this->message_server.stop(); logwrite(function, "server exiting"); exit(EXIT_SUCCESS); } /***** Camera::Server::exit_cleanly *****************************************/ - void Server::start() { - const std::string function("Camera::Server::start"); - std::string ret = message_server.test("killroy was here"); - logwrite(function, ret); - } - - /***** Camera::Server::block_main *******************************************/ /** * @brief main function for blocking connection thread @@ -102,7 +98,6 @@ namespace Camera { */ void Server::block_main( std::shared_ptr sock ) { this->threads_active.fetch_add(1); // atomically increment threads_busy counter - this->doit(*sock); sock->Close(); this->threads_active.fetch_sub(1); // atomically increment threads_busy counter this->id_pool.release_number( sock->id ); @@ -118,84 +113,27 @@ namespace Camera { * @param[in] sock Network::TcpSocket socket object * */ - void Server::doit( Network::TcpSocket sock ) { - const std::string function("Camera::Server::doit"); - std::stringstream message; + std::string Server::dispatch(const std::string &message, Message::AckFunction send_ack) { + const std::string function("Camera::Server::dispatch"); std::string cmd, args; bool connection_open=true; long ret; - while (connection_open) { - // wait (poll) connected socket for incomming data - // - int pollret; - if ( ( pollret=sock.Poll() ) <= 0 ) { - if (pollret==0) { - message.str(""); message << "Poll timeout on fd " << sock.getfd() << " thread " << sock.id; - logwrite(function, message.str()); - } - if (pollret <0) { - message.str(""); message << "Poll error on fd " << sock.getfd() << " thread " << sock.id << ": " << strerror(errno); - logwrite(function, message.str()); - } - break; // this will close the connection - } - - // Data available, now read from connected socket... - // - std::string sbuf; - char delim='\n'; - if ( ( ret=sock.Read( sbuf, delim ) ) <= 0 ) { - if (ret<0) { // could be an actual read error - message.str(""); message << "Read error on fd " << sock.getfd() << ": " << strerror(errno); - logwrite(function, message.str()); - } - if (ret==-2) { - message.str(""); message << "timeout reading from fd " << sock.getfd(); - logwrite( function, message.str() ); - } - break; // Breaking out of the while loop will close the connection. - // This probably means that the client has terminated abruptly, - // having sent FIN but not stuck around long enough - // to accept CLOSE and give the LAST_ACK. - } - - // convert the input buffer into a string and remove any trailing linefeed - // and carriage return - // - sbuf.erase(std::remove(sbuf.begin(), sbuf.end(), '\r' ), sbuf.end()); - sbuf.erase(std::remove(sbuf.begin(), sbuf.end(), '\n' ), sbuf.end()); + std::size_t cmd_sep = message.find_first_of(" "); // find the first space, which separates command from argument list - if (sbuf.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out - - try { - std::size_t cmd_sep = sbuf.find_first_of(" "); // find the first space, which separates command from argument list - - cmd = sbuf.substr(0, cmd_sep); // cmd is everything up until that space - - if (cmd.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out + cmd = message.substr(0, cmd_sep); // cmd is everything up until that space if (cmd_sep == std::string::npos) { // If no space was found, args.clear(); // then the arg list is empty, } else { - args= sbuf.substr(cmd_sep+1); // otherwise args is everything after that space. + args= message.substr(cmd_sep+1); // otherwise args is everything after that space. } - if ( ++this->cmd_num == INT_MAX ) this->cmd_num = 0; - - message.str(""); message << "thread " << sock.id << " received command on fd " << sock.getfd() - << " (" << this->cmd_num << ") : " << cmd << " " << args; - logwrite(function, message.str()); - } - catch (const std::exception &e) { - message.str(""); message << "ERROR parsing command from thread " << sock.id << " fd " << sock.getfd() - << " (" << this->cmd_num << ") \"" << cmd << " " << args << "\" : " - << e.what(); - logwrite(function, message.str()); - cmd = "_EXCEPTION_"; // skips command processing below - } + std::ostringstream oss; + oss << "received command " << ++cmd_id << ": " << message; + logwrite(function, oss.str()); // // Process commands here @@ -204,9 +142,6 @@ namespace Camera { ret = NOTHING; std::string retstring; - if ( cmd == "_EXCEPTION_" ) { // skips checking commands - } - else if ( cmd == "-h" || cmd == "--help" || cmd == "help" || cmd == "?" ) { retstring="camera { } [...]\n"; retstring.append( " where is one of:\n" ); @@ -241,8 +176,15 @@ namespace Camera { } else if ( cmd == CAMERAD_EXIT ) { + // Start a thread that waits before sending SIGTERM. + // The wait allows the reply to propagate back to the client + // so that it has a clean exit. + // + std::thread([]() { std::this_thread::sleep_for(std::chrono::milliseconds(250)); + kill(getpid(), SIGTERM); + }).detach(); + retstring="goodbye"; ret = NO_ERROR; - exit_cleanly(); } else if ( cmd == CAMERAD_EXPTIME ) { @@ -322,8 +264,7 @@ namespace Camera { // unknown commands generate an error // else { - message.str(""); message << "ERROR unknown command: " << cmd; - logwrite(function, message.str()); + logwrite(function, "ERROR unknown command: "+cmd); ret = ERROR; } @@ -337,23 +278,15 @@ namespace Camera { if ( ret != HELP && ret != JSON ) retstring.append( ret == NO_ERROR ? "DONE" : "ERROR" ); if ( ret == JSON ) { - message.str(""); message << "command (" << this->cmd_num << ") reply with JSON message"; - logwrite( function, message.str() ); + logwrite(function, "command ("+std::to_string(cmd_id)+") reply with JSON message"); } else if ( ! retstring.empty() && ret != HELP ) { retstring.append( "\n" ); - message.str(""); message << "command (" << this->cmd_num << ") reply: " << retstring; - logwrite( function, message.str() ); + logwrite(function, "command ("+std::to_string(cmd_id)+") reply: "+retstring); } - - if ( sock.Write( retstring ) < 0 ) connection_open=false; } - - if (!sock.isblocking()) break; // Non-blocking connection exits immediately. - // Keep blocking connection open for interactive session. - } - return; + return retstring; } /***** Camera::Server::doit *************************************************/ diff --git a/camerad/camera_server.h b/camerad/camera_server.h index cae5c6f..482ca2c 100644 --- a/camerad/camera_server.h +++ b/camerad/camera_server.h @@ -12,6 +12,8 @@ //#include //#include #include +#include +#include //#include #include "common.h" @@ -26,6 +28,8 @@ namespace Camera { const int N_THREADS=10; class Server { + private: + std::atomic cmd_id{0}; public: Server(); ~Server(); @@ -47,9 +51,7 @@ namespace Camera { void block_main(std::shared_ptr socket); void doit(Network::TcpSocket sock); - void start(); - - std::string dispatch(const std::string &message) { logwrite("Camera::Server::dispatch", "foo"); return "bar"; } + std::string dispatch(const std::string &message, Message::AckFunction send_ack); }; } diff --git a/camerad/camerad.cpp b/camerad/camerad.cpp index 11c2f45..236ad2e 100644 --- a/camerad/camerad.cpp +++ b/camerad/camerad.cpp @@ -54,15 +54,13 @@ int main( int argc, char** argv ) { exit(1); } -//Message::Server test; - - // start the ZMQ router + // start the ZMQ Message Server // try { - camerad.start(); + camerad.message_server.start(5555); } catch (const std::exception &e) { - logwrite(function, "ERROR starting server: "+std::string(e.what())); + logwrite(function, "ERROR starting ZMQ message server: "+std::string(e.what())); exit(1); } diff --git a/common/message.h b/common/message.h index 317106b..995cc8b 100644 --- a/common/message.h +++ b/common/message.h @@ -1,48 +1,435 @@ -/* - * message.h +/** + * @file message.h + * @brief generic ZMQ messaging library + * @details This contains a Server class for accepting messages from processes, + * a Client class for sending messages to processes, and a + * PublisherSubscriber class for broadcasting and accepting + * one-to-many messages. The Client/Server classes use the ROUTER/DEALER + * pattern and the PubSub class uses (surprise!) the PUB/SUB pattern. + * This uses the header-only C++ bindings for libzmq. + * @author David Hale + * @date 2026-Mar-09 * */ + #pragma once + #include +#include +#include +#include +#include +#include +#include +#include +#include +#include namespace Message { - using MessageHandler = std::function; + /// callback passed to the message handler + using AckFunction = std::function; + /// registered message handler + using MessageHandler = std::function; + + const std::string MSG_SERVICE_ADDRESS = "tcp://127.0.0.1"; + + const std::string MSG_ACK = "ACK"; + const std::string MSG_DONE = "DONE"; + const std::string MSG_ERR = "ERROR"; + const std::string MSG_PING = "PING"; + const std::string MSG_PONG = "PONG"; + + constexpr int HEARTBEAT_INTERVAL_MS = 2000; ///< send heartbeat at this rate + constexpr int HEARTBEAT_TIMEOUT_MS = 6000; ///< drop if silent this long + constexpr int HEARTBEAT_TIME_TO_LIVE_MS = 6000; ///< maximum age for receiver message + /***** Message::timestamp ***************************************************/ + /** + * @brief returns a timestamp string for logging to stderr + */ + inline std::string timestamp() { + auto now = std::chrono::system_clock::now(); + std::time_t time = std::chrono::system_clock::to_time_t(now); + std::tm tm = *std::gmtime(&time); + char buf[20]; + std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", &tm); + return std::string(buf); + } + + /***** Message::Server ******************************************************/ + /** + * @brief defines a server that accepts messages, calls a message handler, + * and returns replies + */ class Server { private: - zmq::context_t context_; - zmq::socket_t router_; - zmq::socket_t pull_; + zmq::context_t context; + zmq::socket_t router; + zmq::socket_t pull; - MessageHandler handler_; + MessageHandler handler; ///< user-provided callback function to handle messages std::atomic isrunning{false}; - std::string inproc_address; + std::string inproc_address{"inproc://replies"}; ///< in-process memory pipe std::thread poll_thread; - int zmqport; + int zmqport{-1}; public: - Server() : router_(context_, ZMQ_ROUTER), pull_(context_, ZMQ_PULL) {} + Server() : router(context, ZMQ_ROUTER), pull(context, ZMQ_PULL) {} + + /***** Message::Server::get_isrunning ***********************************/ + bool get_isrunning() const { return isrunning; } + + /***** Message::Server::set_handler *************************************/ + /** + * @brief registers the message handling function + */ + void set_handler(MessageHandler h) { handler = std::move(h); } + + /***** Message::Server::start *******************************************/ + /** + * @brief binds the router and pull sockets and gets the polling thread started + * @details The pull socket is how I receive data back from the worker thread. + * @throws std::runtime_error + */ + void start(int port) { + if (isrunning) throw std::runtime_error("Message::Server::start already running"); + if (port>0) zmqport=port; else throw std::runtime_error("invalid port '"+std::to_string(port)+"'"); + router.set(zmq::sockopt::linger, 0); + router.set(zmq::sockopt::heartbeat_ivl, HEARTBEAT_INTERVAL_MS); + router.set(zmq::sockopt::heartbeat_timeout, HEARTBEAT_TIMEOUT_MS); + router.set(zmq::sockopt::heartbeat_ttl, HEARTBEAT_TIME_TO_LIVE_MS); + + router.bind(Message::MSG_SERVICE_ADDRESS+":"+std::to_string(zmqport)); - std::string test(const std::string &message) { std::cerr << message << "\n"; return handler_(message); } - void set_handler(MessageHandler handler) { handler_ = std::move(handler); } - void start(); - void stop(); + pull.bind(inproc_address); + + isrunning=true; + + std::cerr << timestamp() << " (Message::Server::start) messaging server started on port " << zmqport << "\n"; + + poll_thread = std::thread(&Server::poll_loop, this); + } + + /***** Message::Server::stop ********************************************/ + /** + * @brief signals the poll loop to stop and waits for the thread to exit + */ + void stop() { + isrunning=false; + if (poll_thread.joinable()) poll_thread.join(); + std::cerr << timestamp() << " (Message::Server::stop) messaging server on port " << zmqport << " stopped\n"; + } + + private: + /***** Message::Server::poll_loop ***************************************/ + /** + * @brief watches the ZMQ sockets and spawns worker to handle incoming message + */ void poll_loop() { - zmq::message_t routing_frame; - zmq::message_t messageid_frame; - zmq::message_t payload_frame; + zmq::pollitem_t items[] = { + { router.handle(), 0, ZMQ_POLLIN, 0 }, ///< incoming messages from clients + { pull.handle(), 0, ZMQ_POLLIN, 0 } ///< completed replies from worker threads + }; + + while (isrunning) { + // wait up to 100ms for activity on either item + zmq::poll(items, 2, std::chrono::milliseconds(100)); - std::string routing( static_cast(routing_frame.data()), routing_frame.size() ); - std::string messageid( static_cast(messageid_frame.data()), messageid_frame.size() ); - std::string payload( static_cast(payload_frame.data()), payload_frame.size() ); + // a dealer sent a message + if (items[0].revents & ZMQ_POLLIN) { + zmq::message_t routing_frame, messageid_frame, payload_frame; + + // received the frames from the router + auto frame1 = router.recv(routing_frame, zmq::recv_flags::none); + auto frame2 = router.recv(messageid_frame, zmq::recv_flags::none); + auto frame3 = router.recv(payload_frame, zmq::recv_flags::none); + + if (!frame1 || !frame2 || !frame3) { + std::cerr << timestamp() << " (Message::Server::poll_loop) ERROR receiving ZMQ message frames from router port " << zmqport << "\n"; + continue; + } + + // pass that message to a worker thread + std::string routing ( static_cast(routing_frame.data()), routing_frame.size() ); + std::string messageid( static_cast(messageid_frame.data()), messageid_frame.size() ); + std::string payload ( static_cast(payload_frame.data()), payload_frame.size() ); + + std::thread(&Message::Server::worker, this, routing, messageid, payload).detach(); + } + + // the worker thread has pushed back a reply + if (items[1].revents & ZMQ_POLLIN) { + zmq::message_t routing_frame, messageid_frame, reply_frame; + + // received the frames from the worker + auto frame1 = pull.recv(routing_frame, zmq::recv_flags::none); + auto frame2 = pull.recv(messageid_frame, zmq::recv_flags::none); + auto frame3 = pull.recv(reply_frame, zmq::recv_flags::none); + + if (!frame1 || !frame2 || !frame3) { + std::cerr << timestamp() << " (Message::Server::poll_loop) ERROR receiving ZMQ reply frames from worker\n"; + continue; + } + + // push them to the router + router.send(routing_frame, zmq::send_flags::sndmore); + router.send(messageid_frame, zmq::send_flags::sndmore); + router.send(reply_frame, zmq::send_flags::none); + } + } } + /***** Message::Server::worker ******************************************/ + /** + * @brief process incoming message using supplied callback handler + * @param[in] routing ZMQ-internal routing identity, identifies DEALER + * so ROUTER can route reply to the correct client + * @param[in] messageid client-generated message ID echoed back so the + * client can match reply with request + * @param[in] payload the message from the client (command, etc.) + */ + void worker(const std::string &routing, const std::string &messageid, const std::string &payload) { + + // intercept PING and reply PONG + if (payload==Message::MSG_PING) { + push_reply(routing, messageid, Message::MSG_PONG); + return; + } + + AckFunction ack = std::bind(&Server::push_reply, this, routing, messageid, std::placeholders::_1); + + // call my message handler function + // this is an external function that must have been registered + // + std::string reply = handler ? handler(payload, ack) : "ERROR: no handler registered"; + + push_reply(routing, messageid, reply); + } + + /***** Message::Server::push_reply **************************************/ + /** + * @brief delivers reply to poll_loop + * @details This creates a temporary socket connected to an in-process + * memory pipe. + * @param[in] routing the DEALER's routing identity (ZMQ-internal) + * @param[in] messageid message ID frame (client-generated and owned) + * @param[in] reply reply string to deliver + */ + void push_reply(const std::string &routing, const std::string &messageid, const std::string &reply) { + // create a transient ZMQ push socket + // + zmq::socket_t push(context, ZMQ_PUSH); + push.set(zmq::sockopt::linger, 0); + + // connect to the inproc address + push.connect(inproc_address); + + // push the frames back to the router, matching the reply to the router and messageid + push.send(zmq::buffer(routing), zmq::send_flags::sndmore); + push.send(zmq::buffer(messageid), zmq::send_flags::sndmore); + push.send(zmq::buffer(reply), zmq::send_flags::none); + } }; + /***** Message::Client ******************************************************/ + /** + * @brief defines a DEALER client for sending messages to ROUTER server + */ class Client { + private: + struct PendingRequest { + std::optional> ack_promise; + std::promise complete_promise; + }; + + zmq::context_t context; + std::string endpoint; + std::thread recv_thread; + std::atomic isconnected{false}; + std::mutex send_mtx; + std::mutex pending_mtx; + + zmq::socket_t sock; + std::unordered_map pending; + + std::atomic messageid{0}; + + public: + Client() : sock(context, ZMQ_DEALER) {} + + ~Client() { disconnect(); } + + struct RequestHandle { + RequestHandle() = default; + RequestHandle(RequestHandle&&) = default; + RequestHandle& operator=(RequestHandle&&) = default; + RequestHandle(const RequestHandle&) = delete; + RequestHandle& operator=(const RequestHandle&) = delete; + + bool expects_ack{false}; + std::future ack_future; + std::future complete_future; + + std::string wait_ack(int timeout_ms=3000) { + if (!ack_future.valid()) throw std::runtime_error("Message::Client::wait_ack ACK not expected"); + if (ack_future.wait_for(std::chrono::milliseconds(timeout_ms))==std::future_status::timeout) { + throw std::runtime_error("Message::Client::wait_ack timeout"); + } + return ack_future.get(); + } + + std::string wait_complete(int timeout_ms=60000) { + if (!complete_future.valid()) throw std::runtime_error("Message::Client::wait_complete invalid future"); + if (complete_future.wait_for(std::chrono::milliseconds(timeout_ms))==std::future_status::timeout) { + throw std::runtime_error("Message::Client::wait_complete timeout"); + } + return complete_future.get(); + } + }; + + /***** Message::Client::connect *****************************************/ + /** + * @brief connects this ZMQ DEALER to remote ROUTER socket + * @param[in] endpoint_in name:port of the ROUTER Server + * @throws std::runtime_error + */ + void connect(const std::string &endpoint_in) { + if (isconnected) throw std::runtime_error("Message::Client::connect already connected"); + endpoint = endpoint_in; + isconnected = true; + sock.set(zmq::sockopt::linger, 0); + sock.set(zmq::sockopt::heartbeat_ivl, HEARTBEAT_INTERVAL_MS); + sock.set(zmq::sockopt::heartbeat_timeout, HEARTBEAT_TIMEOUT_MS); + sock.connect(endpoint); + + recv_thread = std::thread(&Client::recv_loop, this); + } + + /***** Message::Client::disconnect **************************************/ + /** + * @brief disconnects from ZMQ ROUTER socket + */ + void disconnect() { + isconnected=false; + if (recv_thread.joinable()) recv_thread.join(); + } + + /***** Message::Client::reconnect ***************************************/ + /** + * @brief reconnect to last ZMQ ROUTER socket + */ + void reconnect() { + disconnect(); + connect(endpoint); + } + + /***** Message::Client::send ********************************************/ + /** + * @brief send a message and return a handle to wait for reply + * @details This returns immediately with a handle containing two futures. + * Call handle.wait_complete() to block until server signals + * completion (DONE|ERROR). + * If an ACK is desired, pass sendack=true and call handle.wait_ack() + * to confirm receipt, then handle.wait_complete(). + * @param[in] message message string to send to server + * @param[in] sendack false|true to allow sending immediate ACK + * @return RequestHandle complete_future and optionally ack_future + * @throws std::runtime_error + */ + RequestHandle send(const std::string &message, bool sendack=false) { + if (endpoint.empty()) throw std::runtime_error("Message::Client::send not connected"); + + const std::string id = std::to_string(messageid.fetch_add(1, std::memory_order_relaxed)); + + RequestHandle handle; + { + std::lock_guard lock(pending_mtx); + auto &req = pending[id]; + if (sendack) { + req.ack_promise.emplace(); + handle.ack_future = req.ack_promise->get_future(); + } + + handle.expects_ack = sendack; + handle.complete_future = req.complete_promise.get_future(); + } + + { + std::lock_guard lock(send_mtx); + sock.send(zmq::buffer(id), zmq::send_flags::sndmore); + sock.send(zmq::buffer(message), zmq::send_flags::none); + } + + return handle; + } + + /***** Message::Client::is_alive ****************************************/ + /** + * @brief sends a PING to the server + * @param[in] timeout_ms override default timeout + * @return true|false + */ + bool is_alive(int timeout_ms=1000) { + try { + auto handle = send(Message::MSG_PING); + return handle.wait_complete(timeout_ms).find(Message::MSG_PONG) != std::string::npos; + } + catch(...) { return false; } + } + + private: + /***** Message::Client::recv_loop ***************************************/ + /** + * @brief receives replies and fulfils pending promises + * @details run as a thread + */ + void recv_loop() { + zmq::pollitem_t item = { sock.handle(), 0, ZMQ_POLLIN, 0 }; + + while (isconnected) { + if (zmq::poll(&item, 1, std::chrono::milliseconds(100))==0) continue; + + zmq::message_t messageid_frame, reply_frame; + auto frame1 = sock.recv(messageid_frame, zmq::recv_flags::none); + auto frame2 = sock.recv(reply_frame, zmq::recv_flags::none); + + if (!frame1 || !frame2) { + std::cerr << timestamp() << " (Message::Client::recv_loop) ERROR receiving ZMQ reply frames from " << endpoint << "\n"; + continue; + } + + std::string id (static_cast(messageid_frame.data()), messageid_frame.size()); + std::string reply (static_cast(reply_frame.data()), reply_frame.size()); + + bool is_ack = (reply==Message::MSG_ACK); + bool is_complete = (reply.find(Message::MSG_PONG) != std::string::npos || + reply.find(Message::MSG_DONE) != std::string::npos || + reply.find(Message::MSG_ERR) != std::string::npos); + + std::lock_guard lock(pending_mtx); + + auto it = pending.find(id); + if (it==pending.end()) continue; + + auto &req = it->second; + + if (is_ack && req.ack_promise.has_value()) { + req.ack_promise->set_value(reply); + } + else + if (is_complete) { + req.complete_promise.set_value(reply); + pending.erase(it); + } + else { + std::cerr << timestamp() << " (Message::Client::recv_loop) unrecognized reply '" + << reply << "' for id " << id << " ignored\n"; + } + } + } }; class xPubSub { From 18f96da2086f45092e027fd6d14d8022f48db345 Mon Sep 17 00:00:00 2001 From: David Hale Date: Wed, 11 Mar 2026 09:15:34 -0700 Subject: [PATCH 3/3] removes unneeded Network::TcpSocket stuff use re-entrant safe gmtime in Message::timestamp --- camerad/CMakeLists.txt | 2 +- camerad/camera_server.cpp | 47 +++++++++++++++++---------------------- camerad/camera_server.h | 17 +++----------- camerad/camerad.cpp | 11 +-------- common/message.h | 12 ++++++---- 5 files changed, 33 insertions(+), 56 deletions(-) diff --git a/camerad/CMakeLists.txt b/camerad/CMakeLists.txt index 1fe239b..ce07f56 100644 --- a/camerad/CMakeLists.txt +++ b/camerad/CMakeLists.txt @@ -127,6 +127,7 @@ list (APPEND INTERFACE_SOURCES ) add_library(${INTERFACE_TARGET} ${INTERFACE_SOURCES}) target_link_libraries(${INTERFACE_TARGET} + network common ) target_include_directories(${INTERFACE_TARGET} PUBLIC ${INTERFACE_INCLUDES}) @@ -169,7 +170,6 @@ add_executable(camerad # link everything # ---------------------------------------------------------------------------- target_link_libraries(camerad - network utilities logentry ${INTERFACE_TARGET} diff --git a/camerad/camera_server.cpp b/camerad/camera_server.cpp index 23ec655..0da9958 100644 --- a/camerad/camera_server.cpp +++ b/camerad/camera_server.cpp @@ -15,8 +15,7 @@ namespace Camera { * */ Server::Server() : - blkport(-1), - id_pool(N_THREADS), + cmd_port(-1), cmd_num(0) { message_server.set_handler( std::bind(&Camera::Server::dispatch, this, @@ -55,10 +54,10 @@ namespace Camera { // iterate through each row in config file for (int row=0; row < interface->configfile.n_rows; row++) { - // BLKPORT - if (interface->configfile.param[row]=="BLKPORT") { + // CMDPORT + if (interface->configfile.param[row]=="CMDPORT") { try { - this->blkport = std::stoi( interface->configfile.arg[row] ); + cmd_port = std::stoi( interface->configfile.arg[row] ); } catch (const std::exception &e) { std::ostringstream oss; @@ -68,6 +67,19 @@ namespace Camera { } } } + + // start the ZMQ Message Server + // + if (cmd_port<1) throw std::runtime_error("CMDPORT not configured"); + try { + if (message_server.get_isrunning()) message_server.stop(); + message_server.start(cmd_port); + } + catch (const std::exception &e) { + logwrite(function, "ERROR starting ZMQ message server: "+std::string(e.what())); + exit(1); + } + } /***** Camera::Server::configure_server *************************************/ @@ -87,25 +99,6 @@ namespace Camera { /***** Camera::Server::exit_cleanly *****************************************/ - /***** Camera::Server::block_main *******************************************/ - /** - * @brief main function for blocking connection thread - * @param[in] sock shared pointer to Network::TcpSocket socket object - * - * accepts a socket connection and processes the request by - * calling function doit() - * - */ - void Server::block_main( std::shared_ptr sock ) { - this->threads_active.fetch_add(1); // atomically increment threads_busy counter - sock->Close(); - this->threads_active.fetch_sub(1); // atomically increment threads_busy counter - this->id_pool.release_number( sock->id ); - return; - } - /***** Camera::Server::block_main *******************************************/ - - /***** Camera::Server::doit *************************************************/ /** * @brief the workhorse of each thread connection @@ -132,7 +125,7 @@ namespace Camera { } std::ostringstream oss; - oss << "received command " << ++cmd_id << ": " << message; + oss << "received command " << ++cmd_num << ": " << message; logwrite(function, oss.str()); // @@ -278,12 +271,12 @@ namespace Camera { if ( ret != HELP && ret != JSON ) retstring.append( ret == NO_ERROR ? "DONE" : "ERROR" ); if ( ret == JSON ) { - logwrite(function, "command ("+std::to_string(cmd_id)+") reply with JSON message"); + logwrite(function, "command ("+std::to_string(cmd_num)+") reply with JSON message"); } else if ( ! retstring.empty() && ret != HELP ) { retstring.append( "\n" ); - logwrite(function, "command ("+std::to_string(cmd_id)+") reply: "+retstring); + logwrite(function, "command ("+std::to_string(cmd_num)+") reply: "+retstring); } } return retstring; diff --git a/camerad/camera_server.h b/camerad/camera_server.h index 482ca2c..15b11fa 100644 --- a/camerad/camera_server.h +++ b/camerad/camera_server.h @@ -25,11 +25,11 @@ namespace Camera { - const int N_THREADS=10; - class Server { private: - std::atomic cmd_id{0}; + std::atomic cmd_num; + int cmd_port; + public: Server(); ~Server(); @@ -38,19 +38,8 @@ namespace Camera { std::unique_ptr interface; - int blkport; - - NumberPool id_pool; - std::map> socklist; - std::mutex sock_block_mutex; - std::atomic threads_active; - std::atomic cmd_num; - void configure_server(); void exit_cleanly(); - void block_main(std::shared_ptr socket); - void doit(Network::TcpSocket sock); - std::string dispatch(const std::string &message, Message::AckFunction send_ack); }; } diff --git a/camerad/camerad.cpp b/camerad/camerad.cpp index 236ad2e..efd261d 100644 --- a/camerad/camerad.cpp +++ b/camerad/camerad.cpp @@ -51,16 +51,7 @@ int main( int argc, char** argv ) { } catch (const std::exception &e) { logwrite(function, "ERROR configuring system: "+std::string(e.what())); - exit(1); - } - - // start the ZMQ Message Server - // - try { - camerad.message_server.start(5555); - } - catch (const std::exception &e) { - logwrite(function, "ERROR starting ZMQ message server: "+std::string(e.what())); + camerad.message_server.stop(); exit(1); } diff --git a/common/message.h b/common/message.h index 995cc8b..e89a987 100644 --- a/common/message.h +++ b/common/message.h @@ -50,11 +50,15 @@ namespace Message { */ inline std::string timestamp() { auto now = std::chrono::system_clock::now(); + auto us = std::chrono::duration_cast(now.time_since_epoch())%1000000; std::time_t time = std::chrono::system_clock::to_time_t(now); - std::tm tm = *std::gmtime(&time); + std::tm tm; + if (gmtime_r(&time, &tm)==NULL) { perror("gmtime_r failed"); return ""; } char buf[20]; - std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", &tm); - return std::string(buf); + std::strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S", &tm); + char out[28]; + std::snprintf(out, sizeof(out), "%s.%06ld", buf, us.count()); + return std::string(out); } /***** Message::Server ******************************************************/ @@ -204,7 +208,7 @@ namespace Message { // call my message handler function // this is an external function that must have been registered // - std::string reply = handler ? handler(payload, ack) : "ERROR: no handler registered"; + std::string reply = handler ? handler(payload, ack) : "no handler registered"; push_reply(routing, messageid, reply); }