Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion camerad/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ list (APPEND INTERFACE_SOURCES
)
add_library(${INTERFACE_TARGET} ${INTERFACE_SOURCES})
target_link_libraries(${INTERFACE_TARGET}
network
common
)
target_include_directories(${INTERFACE_TARGET} PUBLIC ${INTERFACE_INCLUDES})
Expand Down Expand Up @@ -169,7 +170,6 @@ add_executable(camerad
# link everything
# ----------------------------------------------------------------------------
target_link_libraries(camerad
network
utilities
logentry
${INTERFACE_TARGET}
Expand Down
149 changes: 42 additions & 107 deletions camerad/camera_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ namespace Camera {
*
*/
Server::Server() :
blkport(-1),
id_pool(N_THREADS),
cmd_port(-1),
cmd_num(0)
{
message_server.set_handler( std::bind(&Camera::Server::dispatch, this,
std::placeholders::_1,
std::placeholders::_2) );

interface=Camera::Interface::create(); // factory funcion creates the appropriate interface type
interface->set_server(this); // pointer back to this Server instance
}
Expand Down Expand Up @@ -51,10 +54,10 @@ namespace Camera {
// iterate through each row in config file
for (int row=0; row < interface->configfile.n_rows; row++) {

// BLKPORT
if (interface->configfile.param[row]=="BLKPORT") {
// CMDPORT
if (interface->configfile.param[row]=="CMDPORT") {
try {
this->blkport = std::stoi( interface->configfile.arg[row] );
cmd_port = std::stoi( interface->configfile.arg[row] );
}
catch (const std::exception &e) {
std::ostringstream oss;
Expand All @@ -64,6 +67,19 @@ namespace Camera {
}
}
}

// start the ZMQ Message Server
//
if (cmd_port<1) throw std::runtime_error("CMDPORT not configured");
try {
if (message_server.get_isrunning()) message_server.stop();
message_server.start(cmd_port);
}
catch (const std::exception &e) {
logwrite(function, "ERROR starting ZMQ message server: "+std::string(e.what()));
exit(1);
}

}
/***** Camera::Server::configure_server *************************************/

Expand All @@ -76,117 +92,41 @@ namespace Camera {
void Server::exit_cleanly() {
const std::string function("Camera::Server::exit_cleanly");
this->interface->disconnect_controller();
this->message_server.stop();
logwrite(function, "server exiting");
exit(EXIT_SUCCESS);
}
/***** Camera::Server::exit_cleanly *****************************************/


/***** Camera::Server::block_main *******************************************/
/**
* @brief main function for blocking connection thread
* @param[in] sock shared pointer to Network::TcpSocket socket object
*
* accepts a socket connection and processes the request by
* calling function doit()
*
*/
void Server::block_main( std::shared_ptr<Network::TcpSocket> sock ) {
this->threads_active.fetch_add(1); // atomically increment threads_busy counter
this->doit(*sock);
sock->Close();
this->threads_active.fetch_sub(1); // atomically increment threads_busy counter
this->id_pool.release_number( sock->id );
return;
}
/***** Camera::Server::block_main *******************************************/


/***** Camera::Server::doit *************************************************/
/**
* @brief the workhorse of each thread connection
* @details incoming commands are parsed here and acted upon
* @param[in] sock Network::TcpSocket socket object
*
*/
void Server::doit( Network::TcpSocket sock ) {
const std::string function("Camera::Server::doit");
std::stringstream message;
std::string Server::dispatch(const std::string &message, Message::AckFunction send_ack) {
const std::string function("Camera::Server::dispatch");
std::string cmd, args;
bool connection_open=true;
long ret;

while (connection_open) {

// wait (poll) connected socket for incomming data
//
int pollret;
if ( ( pollret=sock.Poll() ) <= 0 ) {
if (pollret==0) {
message.str(""); message << "Poll timeout on fd " << sock.getfd() << " thread " << sock.id;
logwrite(function, message.str());
}
if (pollret <0) {
message.str(""); message << "Poll error on fd " << sock.getfd() << " thread " << sock.id << ": " << strerror(errno);
logwrite(function, message.str());
}
break; // this will close the connection
}
std::size_t cmd_sep = message.find_first_of(" "); // find the first space, which separates command from argument list

// Data available, now read from connected socket...
//
std::string sbuf;
char delim='\n';
if ( ( ret=sock.Read( sbuf, delim ) ) <= 0 ) {
if (ret<0) { // could be an actual read error
message.str(""); message << "Read error on fd " << sock.getfd() << ": " << strerror(errno);
logwrite(function, message.str());
}
if (ret==-2) {
message.str(""); message << "timeout reading from fd " << sock.getfd();
logwrite( function, message.str() );
}
break; // Breaking out of the while loop will close the connection.
// This probably means that the client has terminated abruptly,
// having sent FIN but not stuck around long enough
// to accept CLOSE and give the LAST_ACK.
}

// convert the input buffer into a string and remove any trailing linefeed
// and carriage return
//
sbuf.erase(std::remove(sbuf.begin(), sbuf.end(), '\r' ), sbuf.end());
sbuf.erase(std::remove(sbuf.begin(), sbuf.end(), '\n' ), sbuf.end());

if (sbuf.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out

try {
std::size_t cmd_sep = sbuf.find_first_of(" "); // find the first space, which separates command from argument list

cmd = sbuf.substr(0, cmd_sep); // cmd is everything up until that space

if (cmd.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out
cmd = message.substr(0, cmd_sep); // cmd is everything up until that space

if (cmd_sep == std::string::npos) { // If no space was found,
args.clear(); // then the arg list is empty,
}
else {
args= sbuf.substr(cmd_sep+1); // otherwise args is everything after that space.
args= message.substr(cmd_sep+1); // otherwise args is everything after that space.
}

if ( ++this->cmd_num == INT_MAX ) this->cmd_num = 0;

message.str(""); message << "thread " << sock.id << " received command on fd " << sock.getfd()
<< " (" << this->cmd_num << ") : " << cmd << " " << args;
logwrite(function, message.str());
}
catch (const std::exception &e) {
message.str(""); message << "ERROR parsing command from thread " << sock.id << " fd " << sock.getfd()
<< " (" << this->cmd_num << ") \"" << cmd << " " << args << "\" : "
<< e.what();
logwrite(function, message.str());
cmd = "_EXCEPTION_"; // skips command processing below
}
std::ostringstream oss;
oss << "received command " << ++cmd_num << ": " << message;
logwrite(function, oss.str());

//
// Process commands here
Expand All @@ -195,9 +135,6 @@ namespace Camera {
ret = NOTHING;
std::string retstring;

if ( cmd == "_EXCEPTION_" ) { // skips checking commands
}
else
if ( cmd == "-h" || cmd == "--help" || cmd == "help" || cmd == "?" ) {
retstring="camera { <CMD> } [<ARG>...]\n";
retstring.append( " where <CMD> is one of:\n" );
Expand Down Expand Up @@ -232,8 +169,15 @@ namespace Camera {
}
else
if ( cmd == CAMERAD_EXIT ) {
// Start a thread that waits before sending SIGTERM.
// The wait allows the reply to propagate back to the client
// so that it has a clean exit.
//
std::thread([]() { std::this_thread::sleep_for(std::chrono::milliseconds(250));
kill(getpid(), SIGTERM);
}).detach();
retstring="goodbye";
ret = NO_ERROR;
exit_cleanly();
}
else
if ( cmd == CAMERAD_EXPTIME ) {
Expand Down Expand Up @@ -313,8 +257,7 @@ namespace Camera {
// unknown commands generate an error
//
else {
message.str(""); message << "ERROR unknown command: " << cmd;
logwrite(function, message.str());
logwrite(function, "ERROR unknown command: "+cmd);
ret = ERROR;
}

Expand All @@ -328,23 +271,15 @@ namespace Camera {
if ( ret != HELP && ret != JSON ) retstring.append( ret == NO_ERROR ? "DONE" : "ERROR" );

if ( ret == JSON ) {
message.str(""); message << "command (" << this->cmd_num << ") reply with JSON message";
logwrite( function, message.str() );
logwrite(function, "command ("+std::to_string(cmd_num)+") reply with JSON message");
}
else
if ( ! retstring.empty() && ret != HELP ) {
retstring.append( "\n" );
message.str(""); message << "command (" << this->cmd_num << ") reply: " << retstring;
logwrite( function, message.str() );
logwrite(function, "command ("+std::to_string(cmd_num)+") reply: "+retstring);
}

if ( sock.Write( retstring ) < 0 ) connection_open=false;
}

if (!sock.isblocking()) break; // Non-blocking connection exits immediately.
// Keep blocking connection open for interactive session.
}
return;
return retstring;
}
/***** Camera::Server::doit *************************************************/

Expand Down
22 changes: 10 additions & 12 deletions camerad/camera_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,35 @@
//#include <atomic>
//#include <mutex>
#include <limits.h>
#include <signal.h>
#include <sys/types.h>
//#include <json.hpp>

#include "common.h"
#include "camera_interface.h"
#include "utilities.h"
#include "network.h"
#include "camerad_commands.h"
#include "message.h"

namespace Camera {

const int N_THREADS=10;

class Server {
private:
std::atomic<int> cmd_num;
int cmd_port;

public:
Server();
~Server();

std::unique_ptr<Interface> interface;
Message::Server message_server;

int blkport;

NumberPool id_pool;
std::map<int, std::shared_ptr<Network::TcpSocket>> socklist;
std::mutex sock_block_mutex;
std::atomic<int> threads_active;
std::atomic<int> cmd_num;
std::unique_ptr<Interface> interface;

void configure_server();
void exit_cleanly();
void block_main(std::shared_ptr<Network::TcpSocket> socket);
void doit(Network::TcpSocket sock);
std::string dispatch(const std::string &message, Message::AckFunction send_ack);
};
}

24 changes: 1 addition & 23 deletions camerad/camerad.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,32 +51,10 @@ int main( int argc, char** argv ) {
}
catch (const std::exception &e) {
logwrite(function, "ERROR configuring system: "+std::string(e.what()));
camerad.message_server.stop();
exit(1);
}

// dynamically create a new listening socket and thread
// to handle each connection request
//
Network::TcpSocket sock_block(camerad.blkport, true, -1, 0); // instantiate a TcpSocket with blocking port
if ( sock_block.Listen() < 0 ) {
std::cerr << "ERROR could not create listening socket\n";
exit(1);
}

while ( true ) {
auto newid = camerad.id_pool.get_next_number(); // get next available number from the pool
{
std::lock_guard<std::mutex> lock(camerad.sock_block_mutex);
camerad.socklist[newid] = std::make_shared<Network::TcpSocket>(sock_block); // create a new socket
camerad.socklist[newid]->id = newid; // update the id of the copied socket
camerad.socklist[newid]->Accept(); // accept connections on the new socket
}

// Create a new thread to handle the connection
//
std::thread( &Camera::Server::block_main, &camerad, camerad.socklist[newid] ).detach();
}

for (;;) pause(); // main thread suspends

return 0;
Expand Down
2 changes: 2 additions & 0 deletions camerad/camerad.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@
#include "common.h"
#include "logentry.h"

#include "message.h"

int main(int argc, char** argv); ///< the main function
Loading