Loading proto/Response.proto +7 −2 Original line number Diff line number Diff line Loading @@ -4,8 +4,13 @@ message Response { enum State { ACCEPTED = 0; REJECTED = 1; REQUEST_ACCEPTED = 0; ACCESS_DENY = 1; TABLE_NOT_EXPORTED = 2; METADATA_NOT_FOUND = 3; FILE_NOT_DOWNLOADED = 4; FILE_NOT_FOUND = 5; GENERIC_ERROR = 6; } required State state = 1; Loading src/Client.cpp +69 −60 Original line number Diff line number Diff line Loading @@ -13,7 +13,7 @@ namespace DataImporter_ns Client::Client(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_sp) : Tango::LogAdapter(deviceImpl_p), m_deviceImpl_p(deviceImpl_p), m_configuration_sp(configuration_sp), m_resolver(m_ioService), m_resetConnectionTimer(m_ioService), m_requestResponseTimer(m_ioService) m_resetConnectionTimer(m_ioService), m_listsUpdateTimer(m_ioService) { DEBUG_STREAM << "Client::Client()" << endl; Loading Loading @@ -63,6 +63,8 @@ void Client::start() m_work_sp.reset(new boost::asio::io_service::work(m_ioService)); m_thread_sp.reset(new boost::thread(boost::bind(&Client::run, this))); Client::startUpdateLists(); } //============================================================================== Loading @@ -72,6 +74,8 @@ void Client::stop() { DEBUG_STREAM << "Client::stop()" << endl; closeConnection(); m_ioService.stop(); m_work_sp.reset(); Loading Loading @@ -172,6 +176,60 @@ void Client::run() DEBUG_STREAM << "Client::run() Stopping" << endl; } //============================================================================== // Client::startUpdateLists() //============================================================================== void Client::startUpdateLists() { DEBUG_STREAM << "Client::startUpdateLists()" << endl; try { m_protocolManager_sp->updateNewFileList(); m_protocolManager_sp->updateFiledFileList(); writeState(Tango::ON); writeStatus("Looking for new files"); } catch(std::exception& ec) { ERROR_STREAM << "Client::startUpdateLists() " << ec.what() << endl; writeState(Tango::ALARM); writeStatus(ec.what()); } catch(...) { ERROR_STREAM << "Client::startUpdateLists() Unknown error" << endl; writeState(Tango::ALARM); writeStatus("Unknown error"); } handleUpdateLists(); } //============================================================================== // Client::handleUpdateLists() //============================================================================== void Client::handleUpdateLists() { DEBUG_STREAM << "Client::handleUpdateLists()" << endl; if(readState() != Tango::ALARM && (!m_protocolManager_sp->isNewFileListEmpty() || !m_protocolManager_sp->isFailedFileListEmpty())) { startResolve(); } else { m_listsUpdateTimer.expires_from_now( boost::posix_time::seconds(m_configuration_sp->getRefreshTime())); } } //============================================================================== // Client::startResolve() //============================================================================== Loading @@ -185,7 +243,7 @@ void Client::startResolve() INFO_STREAM << "Client::startResolve() " << infoStream.str() << endl; writeState(Tango::ON); writeState(Tango::RUNNING); writeStatus(infoStream.str()); boost::asio::ip::tcp::resolver::query query(m_configuration_sp->getRemoteHost(), Loading Loading @@ -216,7 +274,7 @@ void Client::handleResolve(const boost::system::error_code& errorCode, { ERROR_STREAM << "Client::handleResolve() " << errorCode.message() << endl; writeState(Tango::FAULT); writeState(Tango::ALARM); writeStatus(errorCode.message()); } } Loading @@ -236,7 +294,7 @@ void Client::handleWriteRequest(const boost::system::error_code& errorCode) { ERROR_STREAM << "Client::handleRequest() " << errorCode.message() << endl; writeState(Tango::FAULT); writeState(Tango::ALARM); writeStatus(errorCode.message()); } } Loading @@ -258,7 +316,7 @@ void Client::handleReadResponseHeader(const boost::system::error_code& errorCode { ERROR_STREAM << "Client::handleReadResponseHeader() " << errorCode.message() << endl; writeState(Tango::FAULT); writeState(Tango::ALARM); writeStatus(errorCode.message()); } } Loading @@ -274,60 +332,20 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) { try { ResponseSP response_sp(new Response); response_sp->ParseFromArray(&m_readBuff[HEADER_SIZE], m_readBuff.size() - HEADER_SIZE); m_protocolManager_sp->processResponse(response_sp); if(m_protocolManager_sp->isTransferRequest()) { std::string fileName = m_protocolManager_sp->getFileName(); int fileSize = m_protocolManager_sp->getFileSize(); INFO_STREAM << "Session::handleWriteResponse() transfer file " << fileName << " size " << fileSize << " from " << m_remoteEndpoint << endl; m_outputStreamSize = fileSize; if(m_outputStream.is_open()) m_outputStream.close(); m_outputStream.open(fileName.c_str(), std::ios::binary); if(m_outputStream) { startReadData(); } else { ERROR_STREAM << "Session::handleWriteResponse() Cannot open " << fileName << endl; } } else { m_requestResponseTimer.expires_from_now( boost::posix_time::seconds(m_configuration_sp->getRefreshTime())); m_requestResponseTimer.async_wait( boost::bind(&Client::startWriteRequest, this)); } //TODO: handle response } catch(std::exception& ec) { ERROR_STREAM << "Client::handleResponse() " << ec.what() << endl; writeState(Tango::FAULT); writeState(Tango::ALARM); writeStatus(ec.what()); } catch(...) { ERROR_STREAM << "Client::handleResponse() Unknown error" << endl; writeState(Tango::FAULT); writeState(Tango::ALARM); writeStatus("Unknown error"); } } Loading @@ -335,7 +353,7 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) { ERROR_STREAM << "Client::handleResponse() " << errorCode.message() << endl; writeState(Tango::FAULT); writeState(Tango::ALARM); writeStatus(errorCode.message()); } } Loading @@ -346,18 +364,11 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) void Client::handleReadData(const boost::system::error_code& errorCode, std::size_t bytes_transferred) { //DEBUG_STREAM << "Client::handleReadData()" << endl; if(!errorCode) { if(bytes_transferred>0) { m_outputStream.write(&m_fileBuff[0], (std::streamsize)bytes_transferred); /*/ INFO_STREAM << "Client::handleReadData() write " << m_outputStream.tellp() << "/" <<m_outputStreamSize << endl; */ } if(m_outputStream.tellp()<m_outputStreamSize) Loading Loading @@ -398,13 +409,11 @@ void Client::resetConnection() ERROR_STREAM << "Client::resetConnection() Connection timeout" << endl; m_resetConnectionTimer.expires_at(boost::posix_time::pos_infin); m_requestResponseTimer.expires_at(boost::posix_time::pos_infin); m_protocolManager_sp->resetProtocolStatus(); m_listsUpdateTimer.expires_at(boost::posix_time::pos_infin); closeConnection(); startResolve(); startUpdateLists(); } m_resetConnectionTimer.async_wait(boost::bind(&Client::resetConnection, this)); Loading src/Client.h +10 −3 Original line number Diff line number Diff line Loading @@ -39,9 +39,9 @@ public: //------------------------------------------------------------------------------ // [Public] Thread management methods //------------------------------------------------------------------------------ virtual void start() = 0; virtual void start(); virtual void stop() = 0; virtual void stop(); //------------------------------------------------------------------------------ // [Public] Read state and status methods Loading @@ -63,6 +63,13 @@ protected: //------------------------------------------------------------------------------ virtual void run(); //------------------------------------------------------------------------------ // [Protected] Lists update methods //------------------------------------------------------------------------------ virtual void startUpdateLists(); virtual void handleUpdateLists(); //------------------------------------------------------------------------------ // [Protected] Endpoint resolution methods //------------------------------------------------------------------------------ Loading Loading @@ -154,7 +161,7 @@ protected: boost::asio::deadline_timer m_resetConnectionTimer; //Request response timeout boost::asio::deadline_timer m_requestResponseTimer; boost::asio::deadline_timer m_listsUpdateTimer; //Header size on binary stream const unsigned HEADER_SIZE = 4; Loading src/DBManager.cpp +2 −2 Original line number Diff line number Diff line Loading @@ -209,7 +209,7 @@ void DBManager::removeFailedFile(int fileVersion, std::string fileName) //============================================================================== // DBManager::retrieveFailedFiles() //============================================================================== DBManager::FailedFileRowsetSP DBManager::retrieveFailedFiles(boost::posix_time::ptime ptime) DBManager::FailedFileRowsetSP DBManager::retrieveFailedFiles() throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveFailedFiles()" << endl; Loading @@ -219,7 +219,7 @@ DBManager::FailedFileRowsetSP DBManager::retrieveFailedFiles(boost::posix_time:: if(m_auxSession_sp->get_backend() == NULL) m_auxSession_sp->reconnect(); FailedFileRowsetSP failedFileRowset_sp(new FailedFileRowsetSP( FailedFileRowsetSP failedFileRowset_sp(new FailedFileRowset( m_auxSession_sp->prepare << "select file_version, file_name from " << m_configuration_sp->getAuxDatabaseSchema() << "." << m_configuration_sp->getAuxDatabaseFailedTable() Loading src/DBManager.h +1 −1 Original line number Diff line number Diff line Loading @@ -105,7 +105,7 @@ public: typedef boost::shared_ptr< FailedFileRowset > FailedFileRowsetSP; virtual FailedFileRowsetSP retrieveFailedFiles(boost::posix_time::ptime) virtual FailedFileRowsetSP retrieveFailedFiles() throw(soci::soci_error); protected: Loading Loading
proto/Response.proto +7 −2 Original line number Diff line number Diff line Loading @@ -4,8 +4,13 @@ message Response { enum State { ACCEPTED = 0; REJECTED = 1; REQUEST_ACCEPTED = 0; ACCESS_DENY = 1; TABLE_NOT_EXPORTED = 2; METADATA_NOT_FOUND = 3; FILE_NOT_DOWNLOADED = 4; FILE_NOT_FOUND = 5; GENERIC_ERROR = 6; } required State state = 1; Loading
src/Client.cpp +69 −60 Original line number Diff line number Diff line Loading @@ -13,7 +13,7 @@ namespace DataImporter_ns Client::Client(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_sp) : Tango::LogAdapter(deviceImpl_p), m_deviceImpl_p(deviceImpl_p), m_configuration_sp(configuration_sp), m_resolver(m_ioService), m_resetConnectionTimer(m_ioService), m_requestResponseTimer(m_ioService) m_resetConnectionTimer(m_ioService), m_listsUpdateTimer(m_ioService) { DEBUG_STREAM << "Client::Client()" << endl; Loading Loading @@ -63,6 +63,8 @@ void Client::start() m_work_sp.reset(new boost::asio::io_service::work(m_ioService)); m_thread_sp.reset(new boost::thread(boost::bind(&Client::run, this))); Client::startUpdateLists(); } //============================================================================== Loading @@ -72,6 +74,8 @@ void Client::stop() { DEBUG_STREAM << "Client::stop()" << endl; closeConnection(); m_ioService.stop(); m_work_sp.reset(); Loading Loading @@ -172,6 +176,60 @@ void Client::run() DEBUG_STREAM << "Client::run() Stopping" << endl; } //============================================================================== // Client::startUpdateLists() //============================================================================== void Client::startUpdateLists() { DEBUG_STREAM << "Client::startUpdateLists()" << endl; try { m_protocolManager_sp->updateNewFileList(); m_protocolManager_sp->updateFiledFileList(); writeState(Tango::ON); writeStatus("Looking for new files"); } catch(std::exception& ec) { ERROR_STREAM << "Client::startUpdateLists() " << ec.what() << endl; writeState(Tango::ALARM); writeStatus(ec.what()); } catch(...) { ERROR_STREAM << "Client::startUpdateLists() Unknown error" << endl; writeState(Tango::ALARM); writeStatus("Unknown error"); } handleUpdateLists(); } //============================================================================== // Client::handleUpdateLists() //============================================================================== void Client::handleUpdateLists() { DEBUG_STREAM << "Client::handleUpdateLists()" << endl; if(readState() != Tango::ALARM && (!m_protocolManager_sp->isNewFileListEmpty() || !m_protocolManager_sp->isFailedFileListEmpty())) { startResolve(); } else { m_listsUpdateTimer.expires_from_now( boost::posix_time::seconds(m_configuration_sp->getRefreshTime())); } } //============================================================================== // Client::startResolve() //============================================================================== Loading @@ -185,7 +243,7 @@ void Client::startResolve() INFO_STREAM << "Client::startResolve() " << infoStream.str() << endl; writeState(Tango::ON); writeState(Tango::RUNNING); writeStatus(infoStream.str()); boost::asio::ip::tcp::resolver::query query(m_configuration_sp->getRemoteHost(), Loading Loading @@ -216,7 +274,7 @@ void Client::handleResolve(const boost::system::error_code& errorCode, { ERROR_STREAM << "Client::handleResolve() " << errorCode.message() << endl; writeState(Tango::FAULT); writeState(Tango::ALARM); writeStatus(errorCode.message()); } } Loading @@ -236,7 +294,7 @@ void Client::handleWriteRequest(const boost::system::error_code& errorCode) { ERROR_STREAM << "Client::handleRequest() " << errorCode.message() << endl; writeState(Tango::FAULT); writeState(Tango::ALARM); writeStatus(errorCode.message()); } } Loading @@ -258,7 +316,7 @@ void Client::handleReadResponseHeader(const boost::system::error_code& errorCode { ERROR_STREAM << "Client::handleReadResponseHeader() " << errorCode.message() << endl; writeState(Tango::FAULT); writeState(Tango::ALARM); writeStatus(errorCode.message()); } } Loading @@ -274,60 +332,20 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) { try { ResponseSP response_sp(new Response); response_sp->ParseFromArray(&m_readBuff[HEADER_SIZE], m_readBuff.size() - HEADER_SIZE); m_protocolManager_sp->processResponse(response_sp); if(m_protocolManager_sp->isTransferRequest()) { std::string fileName = m_protocolManager_sp->getFileName(); int fileSize = m_protocolManager_sp->getFileSize(); INFO_STREAM << "Session::handleWriteResponse() transfer file " << fileName << " size " << fileSize << " from " << m_remoteEndpoint << endl; m_outputStreamSize = fileSize; if(m_outputStream.is_open()) m_outputStream.close(); m_outputStream.open(fileName.c_str(), std::ios::binary); if(m_outputStream) { startReadData(); } else { ERROR_STREAM << "Session::handleWriteResponse() Cannot open " << fileName << endl; } } else { m_requestResponseTimer.expires_from_now( boost::posix_time::seconds(m_configuration_sp->getRefreshTime())); m_requestResponseTimer.async_wait( boost::bind(&Client::startWriteRequest, this)); } //TODO: handle response } catch(std::exception& ec) { ERROR_STREAM << "Client::handleResponse() " << ec.what() << endl; writeState(Tango::FAULT); writeState(Tango::ALARM); writeStatus(ec.what()); } catch(...) { ERROR_STREAM << "Client::handleResponse() Unknown error" << endl; writeState(Tango::FAULT); writeState(Tango::ALARM); writeStatus("Unknown error"); } } Loading @@ -335,7 +353,7 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) { ERROR_STREAM << "Client::handleResponse() " << errorCode.message() << endl; writeState(Tango::FAULT); writeState(Tango::ALARM); writeStatus(errorCode.message()); } } Loading @@ -346,18 +364,11 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) void Client::handleReadData(const boost::system::error_code& errorCode, std::size_t bytes_transferred) { //DEBUG_STREAM << "Client::handleReadData()" << endl; if(!errorCode) { if(bytes_transferred>0) { m_outputStream.write(&m_fileBuff[0], (std::streamsize)bytes_transferred); /*/ INFO_STREAM << "Client::handleReadData() write " << m_outputStream.tellp() << "/" <<m_outputStreamSize << endl; */ } if(m_outputStream.tellp()<m_outputStreamSize) Loading Loading @@ -398,13 +409,11 @@ void Client::resetConnection() ERROR_STREAM << "Client::resetConnection() Connection timeout" << endl; m_resetConnectionTimer.expires_at(boost::posix_time::pos_infin); m_requestResponseTimer.expires_at(boost::posix_time::pos_infin); m_protocolManager_sp->resetProtocolStatus(); m_listsUpdateTimer.expires_at(boost::posix_time::pos_infin); closeConnection(); startResolve(); startUpdateLists(); } m_resetConnectionTimer.async_wait(boost::bind(&Client::resetConnection, this)); Loading
src/Client.h +10 −3 Original line number Diff line number Diff line Loading @@ -39,9 +39,9 @@ public: //------------------------------------------------------------------------------ // [Public] Thread management methods //------------------------------------------------------------------------------ virtual void start() = 0; virtual void start(); virtual void stop() = 0; virtual void stop(); //------------------------------------------------------------------------------ // [Public] Read state and status methods Loading @@ -63,6 +63,13 @@ protected: //------------------------------------------------------------------------------ virtual void run(); //------------------------------------------------------------------------------ // [Protected] Lists update methods //------------------------------------------------------------------------------ virtual void startUpdateLists(); virtual void handleUpdateLists(); //------------------------------------------------------------------------------ // [Protected] Endpoint resolution methods //------------------------------------------------------------------------------ Loading Loading @@ -154,7 +161,7 @@ protected: boost::asio::deadline_timer m_resetConnectionTimer; //Request response timeout boost::asio::deadline_timer m_requestResponseTimer; boost::asio::deadline_timer m_listsUpdateTimer; //Header size on binary stream const unsigned HEADER_SIZE = 4; Loading
src/DBManager.cpp +2 −2 Original line number Diff line number Diff line Loading @@ -209,7 +209,7 @@ void DBManager::removeFailedFile(int fileVersion, std::string fileName) //============================================================================== // DBManager::retrieveFailedFiles() //============================================================================== DBManager::FailedFileRowsetSP DBManager::retrieveFailedFiles(boost::posix_time::ptime ptime) DBManager::FailedFileRowsetSP DBManager::retrieveFailedFiles() throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveFailedFiles()" << endl; Loading @@ -219,7 +219,7 @@ DBManager::FailedFileRowsetSP DBManager::retrieveFailedFiles(boost::posix_time:: if(m_auxSession_sp->get_backend() == NULL) m_auxSession_sp->reconnect(); FailedFileRowsetSP failedFileRowset_sp(new FailedFileRowsetSP( FailedFileRowsetSP failedFileRowset_sp(new FailedFileRowset( m_auxSession_sp->prepare << "select file_version, file_name from " << m_configuration_sp->getAuxDatabaseSchema() << "." << m_configuration_sp->getAuxDatabaseFailedTable() Loading
src/DBManager.h +1 −1 Original line number Diff line number Diff line Loading @@ -105,7 +105,7 @@ public: typedef boost::shared_ptr< FailedFileRowset > FailedFileRowsetSP; virtual FailedFileRowsetSP retrieveFailedFiles(boost::posix_time::ptime) virtual FailedFileRowsetSP retrieveFailedFiles() throw(soci::soci_error); protected: Loading