#include #include #include "CEthCamera.h" #ifdef _DEBUG #define DEB( x ) x #else #define DEB( x ) #endif CEthCamera::CEthCamera( const std::string &local_interface, port_t local_port ) : _nudp_req( false ), _local_port( local_port ), _remote_port( DEF_REMOTE_PORT ), _refresh_time( DEF_REFRESH_TIME ), _data_timeout( DEF_DATA_TIMEOUT ), _cmd_timeout( 1000*DEF_COMMAND_TIMEOUT ), _cmd_retries( DEF_COMMAND_RETRIES ), _opened( false ), _data_taking( false ), _log( NULL ), _err_log( NULL ) { DEB( printf( "CEthCamera constructor:\n" ); ) _h = new SocketHandler(); if ( !_h ) throw "CEthCamera: handler not created."; _comm = new NudpSocket( *_h ); if ( !_comm ) throw "CEthCamera: socket not created."; if ( !_comm->u2ip( local_interface, _local_interface ) ) throw "CEthCamera: local interface syntax error."; if ( _comm->Bind( _local_interface, _local_port ) == -1 ) throw "CEthCamera: cannot bind to local ip:port"; _h->Add( _comm ); pthread_mutex_init( &_nudp_mutex, NULL ); pthread_mutex_init( &_wait_mutex, NULL ); pthread_cond_init( &_data_done, NULL ); pthread_cond_init( &_nudp_free, NULL ); DEB( printf( "CEthCamera created at %s.\n\n", GetLocalAddrStr().c_str() ); ) } CEthCamera::~CEthCamera( void ) { Log( "CEthCamera destructor:\n" ); if ( _opened ) { if ( !Close() ) { LogErr( "CEthCamera still in use. Deleting anyway.\n\n" ); delete _refresh; }; } pthread_mutex_destroy( &_nudp_mutex ); pthread_mutex_destroy( &_wait_mutex ); pthread_cond_destroy( &_data_done ); pthread_cond_destroy( &_nudp_free ); _h->Remove( _comm ); delete _comm; delete _h; if ( _log ) { Log( "CEthCamera deleted.\n\n" ); fprintf( _log, "STOP LOGGING.\n\n" ); fclose( _log ); } if ( _err_log ) { fprintf( _err_log, "STOP LOGGING.\n\n" ); fclose( _err_log ); } } bool CEthCamera::Open( const std::string &remote_host, port_t remote_port ) { Log( "CEthCamera::Open():\n" ); if ( _opened ) { LogErr( "Connection must be closed before reopening.\n\n" ); return false; } if ( !_comm->u2ip( remote_host, _remote_host ) ) { LogErr( "Remote host syntax error.\n\n" ); return false; } _remote_port = remote_port; try { _refresh = new RefreshThread( this, _refresh_time ); } catch ( char const* msg ) { LogErr( msg ); LogErr( "\nConnection not estabilished.\n\n" ); return false; } if ( _refresh->Start() != 0 ) { LogErr( "Refresh thread not started.\n\n" ); delete _refresh; return false; } if ( !_comm->Open( _remote_host, _remote_port ) ) { LogErr( "Cannot open connection to remote host.\n\n" ); return false; } int rc = pthread_mutex_lock( &_wait_mutex ); if ( rc ) { LogErr( "CEthCamera::Open():\nCannot lock _wait_mutex.\n\n" ); } Log( "CEthCamera::Open():\nWaiting for BkgListener().\n\n" ); _stop_bkg = false; int code = pthread_create( &_bkg_thread_id, NULL, CEthCamera::BkgListener, this ); if ( code != 0 ) { rc = pthread_mutex_unlock( &_wait_mutex ); if ( rc ) { LogErr( "CEthCamera::Open():\nCannot unlock _wait_mutex.\n\n" ); } LogErr( "Cannot start listener thread.\n\n" ); return false; } pthread_cond_wait( &_data_done, &_wait_mutex ); Log( "CEthCamera::Open():\nBkgListener() start signaled.\n\n" ); rc = pthread_mutex_unlock( &_wait_mutex ); if ( rc ) { LogErr( "CEthCamera::Open():\nCannot unlock _wait_mutex.\n\n" ); } char buf[128]; sprintf( buf, "Connection to %s is opened.\n\n", GetRemoteAddrStr().c_str() ); Log( buf ); return true; } bool CEthCamera::Close( void ) { if ( !_opened ) return true; WaitForRawData(); if ( !_comm->Suspend() ) { LogErr( "CEthCamera::Close():\nCommunication port still in use.\n\n" ); return false; } _stop_bkg = true; pthread_join( _bkg_thread_id, NULL ); if ( _opened ) LogErr( "CEthCamera::Close():\nBkgListener did not change camera state.\n\n" ); delete _refresh; Log( "CEthCamera::Close():\nConnection closed.\n\n" ); return true; } bool CEthCamera::SendCmd( const char src_number[4], const char* src_data, const size_t src_data_length, const char src_top, char* dst_number, char* dst_data, size_t* dst_data_length, char* dst_top ) { unsigned int n = (unsigned char)src_number[3]; n = (n << 8) + (unsigned char)src_number[2]; n = (n << 8) + (unsigned char)src_number[1]; n = (n << 8) + (unsigned char)src_number[0]; return SendCmd( n, src_data, src_data_length, src_top, (unsigned int*)dst_number, dst_data, dst_data_length, dst_top ); } bool CEthCamera::SendCmd( const unsigned int src_number, const char* src_data, const size_t src_data_length, const char src_top, unsigned int* dst_number, char* dst_data, size_t* dst_data_length, char* dst_top ) { char log_buf[128]; sprintf( log_buf, "CEthCamera::SendCmd() (%s) number field = %.8X:\n", GetLocalAddrStr().c_str(), htonl(src_number) ); Log( log_buf ); if ( !_opened ) { sprintf( log_buf, "CEthCamera::SendCmd() (number field = %.8X):\nConnection closed. Command not sent.\n\n", htonl(src_number) ); LogErr( log_buf ); return false; } if ( !_refresh->IsAlive() ) { sprintf( log_buf, "CEthCamera::SendCmd() (number field = %.8X):\nRefresh is stopped. Command not sent.\n\n", htonl(src_number) ); LogErr( "Refresh is stopped. Command not sent.\n\n" ); return false; } char state, top; bool got_packet, repeat = true; int retries = _cmd_retries; bool cmd0x08 = false; if ( (src_number & 0xFF) == 0x08 ) cmd0x08 = true; if ( _data_taking && cmd0x08 ) { LogErr( "Cannot send 0x08 command while data taking.\n\n" ); return false; } _nudp_req = true; // listener will free the socket int rc = pthread_mutex_lock( &_nudp_mutex ); _nudp_req = false; if ( rc ) { sprintf( log_buf, "CEthCamera::SendCmd() (number field = %.8X):\nCannot lock access to NUDP socket.\n\n", htonl(src_number) ); LogErr( log_buf ); return false; } while ( (retries-- >= 0) && repeat ) { if ( !_comm->SendCmd( src_number, src_data, src_data_length, src_top ) ) { if ( cmd0x08 ) _comm->StopData(); sprintf( log_buf, "CEthCamera::SendCmd() (number field = %.8X):\nCommand not sent.\n\n", htonl(src_number) ); LogErr( log_buf ); break; } do { _h->Select( 0, _cmd_timeout ); got_packet = _comm->GotPacket(); state = _comm->GetState(); top = _comm->GetLastTop(); if ( got_packet && (top == 0x07) && (state & NUDP_DATA) ) { Log( "RAW data packet while waiting for command response (not a problem).\n" ); } } while ( got_packet && (top == 0x07) ); if ( state & NUDP_WAITING ) { if ( cmd0x08 ) _comm->EmergencyDataStandBy(); _comm->StopWaiting(); sprintf( log_buf, "CEthCamera::SendCmd() (number field = %.8X):\nTimeout.\n\n", htonl(src_number) ); LogErr( log_buf ); } else if ( state & NUDP_CMD_ERR ) { if ( cmd0x08 ) _comm->EmergencyDataStandBy(); sprintf( log_buf, "CEthCamera::SendCmd() (number field = %.8X):\nError in camera response.\n\n", htonl(src_number) ); LogErr( log_buf ); } else if ( (state & 0x0F) == NUDP_READY ) { repeat = false; // Go into the "data taking" mode only if transfer is not finished yet! if ( cmd0x08 ) { if ( _comm->GetLastRawNumber() < NUDP_DATA_PACKETS-1 ) { if ( _comm->GetState() & NUDP_DATA ) _data_taking = true; else LogErr( "CEthCamera::SendCmd() (0x08 command):\nUnexpected state!\n\n" ); } else Log( "CEthCamera::SendCmd() (0x08 command):\nWarning: transfer has finished before camera answer!\n(image received)\n\n" ); } sprintf( log_buf, "CEthCamera::SendCmd() (number field = %.8X):\nOK.\n\n", htonl(src_number) ); Log( log_buf ); } else { sprintf( log_buf, "CEthCamera::SendCmd() (number field = %.8X):\nUnexpected behaviour!\n\n", htonl(src_number) ); LogErr( log_buf ); } } // request handled: unlock the mutex and signal it to the listener rc = pthread_mutex_unlock( &_nudp_mutex ); if ( rc ) { LogErr( "Cannot unlock access to NUDP socket.\n\n" ); return false; } rc = pthread_cond_signal( &_nudp_free ); if ( rc ) { LogErr( "CEthCamera::SendCmd():\nCannot signal NUDP socket is free.\n\n" ); } if ( repeat ) { if ( cmd0x08 ) _comm->StopData(); LogErr( "CEthCamera::SendCmd():\nLost contact, closing connection.\n\n" ); Close(); return false; } return _comm->GetResponse( dst_number, dst_data, dst_data_length, dst_top ); } bool CEthCamera::GetLastResponse( char* dst_number, char* dst_data, size_t* dst_data_length, char* dst_top ) { return _comm->GetResponse( (unsigned int*)dst_number, dst_data, dst_data_length, dst_top ); } bool CEthCamera::GetLastResponse( unsigned int* dst_number, char* dst_data, size_t* dst_data_length, char* dst_top ) { return _comm->GetResponse( dst_number, dst_data, dst_data_length, dst_top ); } bool CEthCamera::SetRawDataBuffer( char* buf_addr ) { if ( _data_taking ) { LogErr( "CEthCamera::SetRawDataBuffer():\nCannot change RAW data buffer while data taking.\n\n" ); return false; } return _comm->PrepareForData( buf_addr ); } bool CEthCamera::TransferDone( void ) { int rc = pthread_mutex_unlock( &_nudp_mutex ); if ( rc ) { LogErr( "CEthCamera::TransferDone():\nCannot unlock access to NUDP socket.\n\n" ); } char status[4]; size_t stat_len; unsigned int ans = 0; bool cmd_ok = SendCmd( 0x0A, NULL, 0, 0, &ans, status, &stat_len ); rc = pthread_mutex_lock( &_nudp_mutex ); if ( rc ) { LogErr( "CEthCamera::TransferDone():\nCannot lock access to NUDP socket.\n\n" ); } if ( !cmd_ok ) { LogErr( "CEthCamera::TransferDone():\nCommand 0x0A failed.\n\n" ); return true; } char log_buf[80]; if ( status[1] & 0x10 ) { Log( "CEthCamera::TransferDone():\nTransfer is finished.\n\n" ); return true; } else { sprintf( log_buf, "CEthCamera::TransferDone():\nTransfer NOT finished (status:%X).\n\n", status[1] ); Log( log_buf ); return false; } } void* CEthCamera::BkgListener( void* pthis ) { CEthCamera* pt = (CEthCamera*)pthis; char top, log_buf[128]; int rc, data_count, data_timeout = 0, timeout_count = 0; rc = pthread_mutex_lock( &(pt->_nudp_mutex) ); if ( rc ) { pt->LogErr( "CEthCamera::BkgListener():\nCannot lock access to NUDP socket.\n\n" ); } // ********************************************************* // The only way out from this point is at the end of method! // ********************************************************* pt->_opened = true; rc = pthread_mutex_lock( &(pt->_wait_mutex) ); if ( rc ) { pt->LogErr( "CEthCamera::BkgListener():\nCannot lock _wait_mutex.\n\n" ); } rc = pthread_mutex_unlock( &(pt->_wait_mutex) ); if ( rc ) { pt->LogErr( "CEthCamera::BkgListener():\nCannot unlock _wait_mutex.\n\n" ); } pt->Log( "CEthCamera::BkgListener():\nSignaling start.\n\n" ); rc = pthread_cond_signal( &(pt->_data_done) ); if ( rc ) { pt->LogErr( "CEthCamera::BkgListener():\nSignaling failed.\n\n" ); } pt->Log( "CEthCamera::BkgListener():\nStarted.\n\n" ); while ( !pt->_stop_bkg ) { // if SendCmd() requests sending command - free the socket and wait if ( pt->_nudp_req ) rc = pthread_cond_wait( &(pt->_nudp_free), &(pt->_nudp_mutex) ); if ( pt->_data_taking ) { data_count = pt->_comm->GetDataCount(); pt->_h->Select( 0, 1000*DEF_WAIT_TIME ); rc = pthread_mutex_lock( &(pt->_wait_mutex) ); if ( rc ) { pt->LogErr( "CEthCamera::BkgListener():\nCannot lock _wait_mutex.\n\n" ); } if ( pt->_comm->GotPacket() ) { top = pt->_comm->GetLastTop(); if ( (top == 0x07) && (pt->_comm->GetDataCount() > data_count) ) { if ( pt->_log ) { sprintf( log_buf, "CEthCamera::BkgListener():\nRAW data packet received (dt=%dms).\n\n", data_timeout ); pt->Log( log_buf ); } data_timeout = 0; timeout_count = 0; if ( pt->_comm->GetLastRawNumber() == NUDP_DATA_PACKETS-1 ) // pt->_comm->AllDataReceived() { pt->_data_taking = false; rc = pthread_mutex_unlock( &(pt->_wait_mutex) ); if ( rc ) { pt->LogErr( "CEthCamera::BkgListener():\nCannot unlock _wait_mutex.\n\n" ); } rc = pthread_cond_signal( &(pt->_data_done) ); if ( rc ) { pt->LogErr( "CEthCamera::BkgListener():\nSignaling failed.\n\n" ); } pt->Log( "CEthCamera::BkgListener():\nLast RAW data packet received.\n\n" ); } } else if ( top != 0x07) { pt->LogErr( "CEthCamera::BkgListener():\nType of packet != 0x07 while waiting for RAW data (not a problem for a driver, but should not happen).\n\n" ); } } else { data_timeout += DEF_WAIT_TIME; if ( data_timeout >= pt->_data_timeout ) { pt->LogErr( "CEthCamera::BkgListener():\nRAW data timeout.\n\n" ); if ( pt->TransferDone() || (timeout_count >= DEF_DATA_RETRIES) ) { pt->_comm->StopData(); pt->_data_taking = false; data_timeout = 0; timeout_count = 0; rc = pthread_mutex_unlock( &(pt->_wait_mutex) ); if ( rc ) { pt->LogErr( "CEthCamera::BkgListener():\nCannot unlock _wait_mutex.\n\n" ); } rc = pthread_cond_signal( &(pt->_data_done) ); if ( rc ) { pt->LogErr( "CEthCamera::BkgListener():\nSignaling failed.\n\n" ); } } else { pt->LogErr( "CEthCamera::BkgListener():\nStill waiting for data...\n\n" ); data_timeout = 0; timeout_count++; } } } rc = pthread_mutex_unlock( &(pt->_wait_mutex) ); if ( rc ) { pt->LogErr( "CEthCamera::BkgListener():\nCannot unlock _wait_mutex.\n\n" ); } } else pt->_h->Select( 0, 20000 ); } // ******************************************************************** // BkgListener (and the connection to the camera) is considered CLOSED. // ******************************************************************** pt->_opened = false; rc = pthread_mutex_unlock( &(pt->_nudp_mutex) ); if ( rc ) { pt->LogErr( "CEthCamera::BkgListener():\nCannot unlock access to NUDP socket.\n\n" ); } pt->Log( "CEthCamera::BkgListener():\nStopped.\n\n" ); return NULL; } bool CEthCamera::WaitForRawData( void ) { pthread_mutex_lock( &_wait_mutex ); if ( _data_taking ) { Log( "CEthCamera::WaitForRawData():\nRAW data not ready, waiting.\n\n" ); pthread_cond_wait( &_data_done, &_wait_mutex ); Log( "CEthCamera::WaitForRawData():\nEnd of RAW data signaled.\n\n" ); } pthread_mutex_unlock( &_wait_mutex ); return _comm->AllDataReceived(); } bool CEthCamera::FragmentReceivedByOffset( int offset ) { return _comm->FragmentReceivedByOffset( offset ); } bool CEthCamera::FragmentReceivedByIndex( int index ) { return _comm->FragmentReceivedByIndex( index ); } int CEthCamera::GetDataCount( void ) { return _comm->GetDataCount(); } bool CEthCamera::ReadMissing( char* buf_addr ) { if ( _comm->AllDataReceived() ) return true; size_t len; char log_buf[80]; int n = 0, n_missing = NUDP_DATA_PACKETS - _comm->GetDataCount(); for ( int i = 0, offset = 0; (n < n_missing) && (i < NUDP_DATA_PACKETS); i++, offset += NUDP_DATA_LENGTH ) { if ( !FragmentReceivedByIndex( i ) ) { if ( !SendCmd( offset>>1, NULL, 0, 6, NULL, buf_addr+offset, &len ) ) { sprintf( log_buf, "CEthCamera::ReadMissing():\nCannot retransmit packet #%d.\n\n", i ); LogErr( log_buf ); return false; } sprintf( log_buf, "CEthCamera::ReadMissing():\nPacket #%d retransmitted.\n\n", i ); LogErr( log_buf ); n++; } } return true; } void CEthCamera::SetRefreshTime( int sec ) { _refresh_time = sec; if ( _opened ) _refresh->SetTime( sec ); } void CEthCamera::SetDataTimeout( int msec ) { _data_timeout = msec; } void CEthCamera::SetCommandTimeout( int msec ) { _cmd_timeout = 1000*msec; } void CEthCamera::SetCommandRetries( int n ) { _cmd_retries = n; } char CEthCamera::GetCamState( void ) { char state = 0; if ( _opened ) state = CAM_CONNECTION_OPEN; else return state; if ( _refresh->IsAlive() ) state |= CAM_REFRESH_RUNNING; if ( _data_taking ) state |= CAM_DATA_TAKING; return state; } std::string CEthCamera::GetLocalAddrStr( void ) { std::string result; _comm->l2ip( _local_interface, result ); char port[6]; sprintf( port, "%u", _local_port ); result += ':'; result += port; return result; } std::string CEthCamera::GetRemoteAddrStr( void ) { std::string result; if ( _opened ) { _comm->l2ip( _remote_host, result ); char port[6]; sprintf( port, "%u", _remote_port ); result += ':'; result += port; } else result = "(connection closed)"; return result; } void CEthCamera::Log( const char* msg ) { if ( _log ) { time_t now; time( &now ); struct tm *t = gmtime( &now ); fprintf( _log, "%d:%d:%d: %s", t->tm_hour, t->tm_min, t->tm_sec, msg ); fflush( _log ); } DEB( printf( msg ); ) } void CEthCamera::LogErr( const char* msg ) { char tmsg[256]; time_t now; time( &now ); struct tm *t = gmtime( &now ); sprintf( tmsg, "%d:%d:%d: %s", t->tm_hour, t->tm_min, t->tm_sec, msg ); if ( _log ) { fprintf( _log, tmsg ); fflush( _log ); } if ( _err_log ) { fprintf( _err_log, tmsg ); fflush( _err_log ); } DEB( printf( msg ); ) } bool CEthCamera::SetLogFilename( const std::string &fname ) { if ( _log ) return false; _log = fopen( fname.c_str(), "a+t" ); if ( !_log ) return false; if ( !_comm->SetLogFile( _log ) ) { fclose( _log ); _log = NULL; return false; } fprintf( _log, "START LOGGING.\n\n" ); return true; } bool CEthCamera::SetErrLogFilename( const std::string &fname ) { if ( _err_log ) return false; _err_log = fopen( fname.c_str(), "a+t" ); if ( !_err_log ) return false; if ( !_comm->SetErrLogFile( _err_log ) ) { fclose( _err_log ); _err_log = NULL; return false; } fprintf( _err_log, "START LOGGING.\n\n" ); return true; } RefreshThread::RefreshThread( CEthCamera* owner, int t_sec ): _rcmd( 0xFC ), _owner( owner ), _running( false ), _stop( false ), _t_sec( t_sec ) { _handler = new SocketHandler(); if ( !_handler ) throw "RefreshThread: handler not created."; _socket = new NudpSocket( *_handler ); if ( !_socket ) throw "RefreshThread: socket not created."; if ( _owner->_log ) _socket->SetLogFile( _owner->_log ); if ( _owner->_err_log ) _socket->SetLogFile( _owner->_err_log ); port_t refresh_port = _owner->GetLocalPort() + 1; if ( _socket->Bind( _owner->GetLocalInterface(), refresh_port ) == -1 ) throw "RefreshThread: cannot bind to local ip:port."; _handler->Add( _socket ); if ( !_socket->Open( owner->GetRemoteHost(), owner->GetRemotePort() ) ) throw "RefreshThread: cannot open connection to remote host:port."; _owner->Log( "RefreshThread created.\n\n" ); } RefreshThread::~RefreshThread( void ) { Stop(); _handler->Remove( _socket ); delete _socket; delete _handler; _owner->Log( "RefreshThread deleted.\n\n" ); } int RefreshThread::Start( void ) { if ( _running ) return 0; _running = true; _stop = false; _owner->Log( "RefreshThread::Start():\n" ); int code = pthread_create( &_thread_id, NULL, RefreshThread::EntryPoint, this ); if ( !code ) _owner->Log( "Started.\n\n" ); else _owner->Log( "pthread_create() error.\n\n" ); return code; } void RefreshThread::Stop( void ) { if ( !_running ) return; _owner->Log( "RefreshThread::Stop():\n" ); _stop = true; pthread_join( _thread_id, NULL ); _owner->Log( "Stopped.\n\n" ); } void* RefreshThread::EntryPoint( void* pthis ) { RefreshThread* pt = (RefreshThread*)pthis; char state; bool repeat; int retries, counter = 0; while ( !pt->_stop ) { if ( (counter++ >= pt->_t_sec) && !pt->_owner->_data_taking ) { pt->_owner->Log( "RefreshThread:\nCamera watchdog refresh.\n\n" ); repeat = true; retries = pt->_owner->_cmd_retries; while ( (retries-- >= 0) && repeat ) { pt->_socket->SendCmd( pt->_rcmd ); pt->_handler->Select( 0, pt->_owner->_cmd_timeout ); state = pt->_socket->GetState(); if ( state & NUDP_WAITING ) { pt->_socket->StopWaiting(); pt->_owner->LogErr( "RefreshThread:\nTimeout.\n\n" ); } else if ( state & NUDP_CMD_ERR ) { pt->_owner->LogErr( "RefreshThread:\nError in camera response.\n\n" ); } else if ( (state & 0x0F) == NUDP_READY ) { repeat = false; pt->_owner->Log( "RefreshThread:\nOK.\n\n" ); } else { pt->_owner->LogErr( "RefreshThread:\nUnexpected behaviour!\n\n" ); } } if ( repeat ) { pt->_owner->LogErr( "RefreshThread:\nLost contact, closing connection.\n" ); break; } counter = 0; } sleep( 1 ); } pt->_socket->Suspend(); pt->_running = false; return NULL; }