Loading src/Client.cpp +7 −41 Original line number Diff line number Diff line Loading @@ -189,9 +189,7 @@ void Client::startUpdateLists() try { m_protocolManager_sp->updateNewList(); m_protocolManager_sp->updateFailedList(); m_protocolManager_sp->retrieveFiles(); writeState(Tango::ON); writeStatus("Database loop active"); Loading @@ -211,19 +209,9 @@ void Client::startUpdateLists() writeStatus("Unknown error"); } if(readState() != Tango::ALARM && m_protocolManager_sp->hasNextNewList()) if(readState() == Tango::ON && m_protocolManager_sp->hasFilesToTransfer()) { m_protocolManager_sp->setRecoveryMode(false); startResolve(); } else if(readState() != Tango::ALARM && m_protocolManager_sp->hasNextFailedList() && m_protocolManager_sp->isRecoveryTimeElapsed()) { m_protocolManager_sp->setRecoveryMode(true); startResolve(); } else Loading Loading @@ -449,21 +437,10 @@ void Client::onTransferCompleted(FileWrapper::SP fileWrapper_sp) try { if(!m_protocolManager_sp->getRecoveryMode()) m_protocolManager_sp->setNewFileTransfered(fileWrapper_sp); else m_protocolManager_sp->setFailedFileTransfered(fileWrapper_sp); if(m_protocolManager_sp->hasNextNewList()) { m_protocolManager_sp->setRecoveryMode(false); m_protocolManager_sp->setCurrentFileDownloaded(fileWrapper_sp); startWriteRequest(); } else if(m_protocolManager_sp->hasNextFailedList()) if(m_protocolManager_sp->hasNextFile()) { m_protocolManager_sp->setRecoveryMode(true); startWriteRequest(); } else Loading Loading @@ -498,21 +475,10 @@ void Client::onTransferFailed() try { if(!m_protocolManager_sp->getRecoveryMode()) m_protocolManager_sp->setNewFileFailed(); else m_protocolManager_sp->setFailedFileFailed(); m_protocolManager_sp->setCurrentFileFailed(); if(m_protocolManager_sp->hasNextNewList()) if(m_protocolManager_sp->hasNextFile()) { m_protocolManager_sp->setRecoveryMode(false); startWriteRequest(); } else if(m_protocolManager_sp->hasNextFailedList()) { m_protocolManager_sp->setRecoveryMode(true); startWriteRequest(); } else Loading src/Client.h +6 −6 Original line number Diff line number Diff line Loading @@ -174,12 +174,6 @@ protected: //File list update time boost::asio::deadline_timer m_listsUpdateTimer; //Header size on binary stream static const unsigned HEADER_SIZE = 4; //Buffer for binary data read from stream std::vector<boost::uint8_t> m_readBuff; //Tango state property mutex boost::mutex m_stateMutex; Loading @@ -195,6 +189,12 @@ protected: //Address and port of remote endpoint std::string m_remoteEndpoint; //Header size on binary stream static const unsigned HEADER_SIZE = 4; //Buffer for binary data read from stream std::vector<boost::uint8_t> m_readBuff; //Read buffer size static const boost::uint64_t BUFFER_SIZE = 40960; Loading src/DBManager.cpp +52 −48 Original line number Diff line number Diff line Loading @@ -145,49 +145,6 @@ DBManager::TransactionSP DBManager::getAuxTransaction() return transaction_sp; } //============================================================================== // DBManager::retrieveNewFiles() //============================================================================== DBManager::NewFileRowsetSP DBManager::retrieveNewFiles(boost::posix_time::ptime ptime) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveNewFiles()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_mainSession_sp->get_backend() == NULL) m_mainSession_sp->reconnect(); NewFileRowsetSP newFileRowset_sp(new NewFileRowset(m_mainSession_sp->prepare << "select file_version, file_name, update_time from " << m_configuration_sp->getDatabaseSchema() << "." << m_configuration_sp->getDatabaseTable() << " where update_time>'" << boost::posix_time::to_iso_string(ptime) << "' order by update_time asc")); return newFileRowset_sp; } //============================================================================== // DBManager::updateNewFilePath() //============================================================================== void DBManager::updateNewFilePath(std::string storagePath, std::string filePath, int fileVersion, std::string fileName) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::updateNewFilePath()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_mainSession_sp->get_backend() == NULL) m_mainSession_sp->reconnect(); *m_mainSession_sp << "update " << m_configuration_sp->getDatabaseSchema() << "." << m_configuration_sp->getDatabaseTable() << " set storage_path = :storagePath, file_path = :filePath " << " where file_version = :fileVersion and file_name like :fileName", soci::use(storagePath, "storagePath"), soci::use(filePath, "filePath"), soci::use(fileVersion, "fileVersion"), soci::use(fileName, "fileName"); } //============================================================================== // DBManager::retrieveLastTimestamp() //============================================================================== Loading Loading @@ -232,6 +189,49 @@ void DBManager::persistLastTimestamp(boost::posix_time::ptime ptime) << " last_timestamp='" << boost::posix_time::to_iso_string(ptime) << "'"; } //============================================================================== // DBManager::retrieveNewFiles() //============================================================================== DBManager::FileRowsetSP DBManager::retrieveNewFiles(boost::posix_time::ptime ptime) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveNewFiles()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_mainSession_sp->get_backend() == NULL) m_mainSession_sp->reconnect(); FileRowsetSP newFileRowset_sp(new FileRowset(m_mainSession_sp->prepare << "select storage_path, file_path, file_version, file_name, update_time " << "from " << m_configuration_sp->getDatabaseSchema() << "." << m_configuration_sp->getDatabaseTable() << " where update_time>'" << boost::posix_time::to_iso_string(ptime) << "' order by update_time asc")); return newFileRowset_sp; } //============================================================================== // DBManager::updateNewFilePath() //============================================================================== void DBManager::updateNewFilePath(std::string storagePath, std::string filePath, int fileVersion, std::string fileName) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::updateNewFilePath()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_mainSession_sp->get_backend() == NULL) m_mainSession_sp->reconnect(); *m_mainSession_sp << "update " << m_configuration_sp->getDatabaseSchema() << "." << m_configuration_sp->getDatabaseTable() << " set storage_path = :storagePath, file_path = :filePath " << " where file_version = :fileVersion and file_name like :fileName", soci::use(storagePath, "storagePath"), soci::use(filePath, "filePath"), soci::use(fileVersion, "fileVersion"), soci::use(fileName, "fileName"); } //============================================================================== // DBManager::addFailedFile() //============================================================================== Loading Loading @@ -276,7 +276,7 @@ void DBManager::removeFailedFile(int fileVersion, std::string fileName) //============================================================================== // DBManager::retrieveFailedFiles() //============================================================================== DBManager::FailedFileRowsetSP DBManager::retrieveFailedFiles() DBManager::FileRowsetSP DBManager::retrieveFailedFiles() throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveFailedFiles()" << endl; Loading @@ -286,10 +286,14 @@ DBManager::FailedFileRowsetSP DBManager::retrieveFailedFiles() if(m_auxSession_sp->get_backend() == NULL) m_auxSession_sp->reconnect(); FailedFileRowsetSP failedFileRowset_sp(new FailedFileRowset( m_auxSession_sp->prepare << "select file_version, file_name from " FileRowsetSP failedFileRowset_sp(new FileRowset( m_auxSession_sp->prepare << "select m.storage_path, m.file_path, " << " m.file_version, m.file_name, m.update_time from " << m_configuration_sp->getDatabaseSchema() << "." << m_configuration_sp->getDatabaseTable() << " as m join " << m_configuration_sp->getAuxDatabaseSchema() << "." << m_configuration_sp->getAuxDatabaseFailedTable() << m_configuration_sp->getAuxDatabaseFailedTable() << " as f " << "on f.file_version = m.file_version and f.file_name = m.file_name " << "where device_name like '" << m_deviceName << "'")); return failedFileRowset_sp; Loading src/DBManager.h +19 −22 Original line number Diff line number Diff line Loading @@ -77,28 +77,32 @@ public: TransactionSP getAuxTransaction(); //------------------------------------------------------------------------------ // [Public] New file method // [Public] Timestamp methods //------------------------------------------------------------------------------ typedef boost::tuple< boost::optional<int>, boost::optional<std::string>, boost::optional<std::tm> > NewFileRow; virtual boost::posix_time::ptime retrieveLastTimestamp() throw(soci::soci_error); typedef soci::rowset< NewFileRow > NewFileRowset; virtual void persistLastTimestamp(boost::posix_time::ptime) throw(soci::soci_error); typedef boost::shared_ptr< NewFileRowset > NewFileRowsetSP; //------------------------------------------------------------------------------ // [Public] File row set definition //------------------------------------------------------------------------------ typedef boost::tuple< boost::optional<std::string>, boost::optional<std::string>, boost::optional<int>, boost::optional<std::string>, boost::optional<std::tm> > FileRow; virtual NewFileRowsetSP retrieveNewFiles(boost::posix_time::ptime) throw(soci::soci_error); typedef soci::rowset< FileRow > FileRowset; virtual void updateNewFilePath(std::string, std::string, int, std::string) throw(soci::soci_error); typedef boost::shared_ptr< FileRowset > FileRowsetSP; //------------------------------------------------------------------------------ // [Public] Timestamp methods // [Public] New file method //------------------------------------------------------------------------------ virtual boost::posix_time::ptime retrieveLastTimestamp() virtual FileRowsetSP retrieveNewFiles(boost::posix_time::ptime) throw(soci::soci_error); virtual void persistLastTimestamp(boost::posix_time::ptime) virtual void updateNewFilePath(std::string, std::string, int, std::string) throw(soci::soci_error); //------------------------------------------------------------------------------ Loading @@ -110,14 +114,7 @@ public: virtual void removeFailedFile(int, std::string) throw(soci::soci_error); typedef boost::tuple< boost::optional<int>, boost::optional<std::string> > FailedFileRow; typedef soci::rowset< FailedFileRow > FailedFileRowset; typedef boost::shared_ptr< FailedFileRowset > FailedFileRowsetSP; virtual FailedFileRowsetSP retrieveFailedFiles() virtual FileRowsetSP retrieveFailedFiles() throw(soci::soci_error); protected: Loading src/PlainClient.cpp +1 −12 Original line number Diff line number Diff line Loading @@ -122,12 +122,7 @@ void PlainClient::startWriteRequest() try { RequestSP request_sp; if(!m_protocolManager_sp->getRecoveryMode()) request_sp = m_protocolManager_sp->createNewListRequest(); else request_sp = m_protocolManager_sp->createFailedListRequest(); RequestSP request_sp = m_protocolManager_sp->createtRequest(); boost::uint32_t bodySize = request_sp->ByteSize(); Loading @@ -150,12 +145,6 @@ void PlainClient::startWriteRequest() boost::bind(&PlainClient::handleWriteRequest, this, boost::asio::placeholders::error)); } catch(std::logic_error& ec) { WARN_STREAM << "PlainClient::startWriteRequest() " << ec.what() << endl; onTransferFailed(); } catch(std::runtime_error& ec) { ERROR_STREAM << "PlainClient::startWriteRequest() " << ec.what() << endl; Loading Loading
src/Client.cpp +7 −41 Original line number Diff line number Diff line Loading @@ -189,9 +189,7 @@ void Client::startUpdateLists() try { m_protocolManager_sp->updateNewList(); m_protocolManager_sp->updateFailedList(); m_protocolManager_sp->retrieveFiles(); writeState(Tango::ON); writeStatus("Database loop active"); Loading @@ -211,19 +209,9 @@ void Client::startUpdateLists() writeStatus("Unknown error"); } if(readState() != Tango::ALARM && m_protocolManager_sp->hasNextNewList()) if(readState() == Tango::ON && m_protocolManager_sp->hasFilesToTransfer()) { m_protocolManager_sp->setRecoveryMode(false); startResolve(); } else if(readState() != Tango::ALARM && m_protocolManager_sp->hasNextFailedList() && m_protocolManager_sp->isRecoveryTimeElapsed()) { m_protocolManager_sp->setRecoveryMode(true); startResolve(); } else Loading Loading @@ -449,21 +437,10 @@ void Client::onTransferCompleted(FileWrapper::SP fileWrapper_sp) try { if(!m_protocolManager_sp->getRecoveryMode()) m_protocolManager_sp->setNewFileTransfered(fileWrapper_sp); else m_protocolManager_sp->setFailedFileTransfered(fileWrapper_sp); if(m_protocolManager_sp->hasNextNewList()) { m_protocolManager_sp->setRecoveryMode(false); m_protocolManager_sp->setCurrentFileDownloaded(fileWrapper_sp); startWriteRequest(); } else if(m_protocolManager_sp->hasNextFailedList()) if(m_protocolManager_sp->hasNextFile()) { m_protocolManager_sp->setRecoveryMode(true); startWriteRequest(); } else Loading Loading @@ -498,21 +475,10 @@ void Client::onTransferFailed() try { if(!m_protocolManager_sp->getRecoveryMode()) m_protocolManager_sp->setNewFileFailed(); else m_protocolManager_sp->setFailedFileFailed(); m_protocolManager_sp->setCurrentFileFailed(); if(m_protocolManager_sp->hasNextNewList()) if(m_protocolManager_sp->hasNextFile()) { m_protocolManager_sp->setRecoveryMode(false); startWriteRequest(); } else if(m_protocolManager_sp->hasNextFailedList()) { m_protocolManager_sp->setRecoveryMode(true); startWriteRequest(); } else Loading
src/Client.h +6 −6 Original line number Diff line number Diff line Loading @@ -174,12 +174,6 @@ protected: //File list update time boost::asio::deadline_timer m_listsUpdateTimer; //Header size on binary stream static const unsigned HEADER_SIZE = 4; //Buffer for binary data read from stream std::vector<boost::uint8_t> m_readBuff; //Tango state property mutex boost::mutex m_stateMutex; Loading @@ -195,6 +189,12 @@ protected: //Address and port of remote endpoint std::string m_remoteEndpoint; //Header size on binary stream static const unsigned HEADER_SIZE = 4; //Buffer for binary data read from stream std::vector<boost::uint8_t> m_readBuff; //Read buffer size static const boost::uint64_t BUFFER_SIZE = 40960; Loading
src/DBManager.cpp +52 −48 Original line number Diff line number Diff line Loading @@ -145,49 +145,6 @@ DBManager::TransactionSP DBManager::getAuxTransaction() return transaction_sp; } //============================================================================== // DBManager::retrieveNewFiles() //============================================================================== DBManager::NewFileRowsetSP DBManager::retrieveNewFiles(boost::posix_time::ptime ptime) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveNewFiles()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_mainSession_sp->get_backend() == NULL) m_mainSession_sp->reconnect(); NewFileRowsetSP newFileRowset_sp(new NewFileRowset(m_mainSession_sp->prepare << "select file_version, file_name, update_time from " << m_configuration_sp->getDatabaseSchema() << "." << m_configuration_sp->getDatabaseTable() << " where update_time>'" << boost::posix_time::to_iso_string(ptime) << "' order by update_time asc")); return newFileRowset_sp; } //============================================================================== // DBManager::updateNewFilePath() //============================================================================== void DBManager::updateNewFilePath(std::string storagePath, std::string filePath, int fileVersion, std::string fileName) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::updateNewFilePath()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_mainSession_sp->get_backend() == NULL) m_mainSession_sp->reconnect(); *m_mainSession_sp << "update " << m_configuration_sp->getDatabaseSchema() << "." << m_configuration_sp->getDatabaseTable() << " set storage_path = :storagePath, file_path = :filePath " << " where file_version = :fileVersion and file_name like :fileName", soci::use(storagePath, "storagePath"), soci::use(filePath, "filePath"), soci::use(fileVersion, "fileVersion"), soci::use(fileName, "fileName"); } //============================================================================== // DBManager::retrieveLastTimestamp() //============================================================================== Loading Loading @@ -232,6 +189,49 @@ void DBManager::persistLastTimestamp(boost::posix_time::ptime ptime) << " last_timestamp='" << boost::posix_time::to_iso_string(ptime) << "'"; } //============================================================================== // DBManager::retrieveNewFiles() //============================================================================== DBManager::FileRowsetSP DBManager::retrieveNewFiles(boost::posix_time::ptime ptime) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveNewFiles()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_mainSession_sp->get_backend() == NULL) m_mainSession_sp->reconnect(); FileRowsetSP newFileRowset_sp(new FileRowset(m_mainSession_sp->prepare << "select storage_path, file_path, file_version, file_name, update_time " << "from " << m_configuration_sp->getDatabaseSchema() << "." << m_configuration_sp->getDatabaseTable() << " where update_time>'" << boost::posix_time::to_iso_string(ptime) << "' order by update_time asc")); return newFileRowset_sp; } //============================================================================== // DBManager::updateNewFilePath() //============================================================================== void DBManager::updateNewFilePath(std::string storagePath, std::string filePath, int fileVersion, std::string fileName) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::updateNewFilePath()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_mainSession_sp->get_backend() == NULL) m_mainSession_sp->reconnect(); *m_mainSession_sp << "update " << m_configuration_sp->getDatabaseSchema() << "." << m_configuration_sp->getDatabaseTable() << " set storage_path = :storagePath, file_path = :filePath " << " where file_version = :fileVersion and file_name like :fileName", soci::use(storagePath, "storagePath"), soci::use(filePath, "filePath"), soci::use(fileVersion, "fileVersion"), soci::use(fileName, "fileName"); } //============================================================================== // DBManager::addFailedFile() //============================================================================== Loading Loading @@ -276,7 +276,7 @@ void DBManager::removeFailedFile(int fileVersion, std::string fileName) //============================================================================== // DBManager::retrieveFailedFiles() //============================================================================== DBManager::FailedFileRowsetSP DBManager::retrieveFailedFiles() DBManager::FileRowsetSP DBManager::retrieveFailedFiles() throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveFailedFiles()" << endl; Loading @@ -286,10 +286,14 @@ DBManager::FailedFileRowsetSP DBManager::retrieveFailedFiles() if(m_auxSession_sp->get_backend() == NULL) m_auxSession_sp->reconnect(); FailedFileRowsetSP failedFileRowset_sp(new FailedFileRowset( m_auxSession_sp->prepare << "select file_version, file_name from " FileRowsetSP failedFileRowset_sp(new FileRowset( m_auxSession_sp->prepare << "select m.storage_path, m.file_path, " << " m.file_version, m.file_name, m.update_time from " << m_configuration_sp->getDatabaseSchema() << "." << m_configuration_sp->getDatabaseTable() << " as m join " << m_configuration_sp->getAuxDatabaseSchema() << "." << m_configuration_sp->getAuxDatabaseFailedTable() << m_configuration_sp->getAuxDatabaseFailedTable() << " as f " << "on f.file_version = m.file_version and f.file_name = m.file_name " << "where device_name like '" << m_deviceName << "'")); return failedFileRowset_sp; Loading
src/DBManager.h +19 −22 Original line number Diff line number Diff line Loading @@ -77,28 +77,32 @@ public: TransactionSP getAuxTransaction(); //------------------------------------------------------------------------------ // [Public] New file method // [Public] Timestamp methods //------------------------------------------------------------------------------ typedef boost::tuple< boost::optional<int>, boost::optional<std::string>, boost::optional<std::tm> > NewFileRow; virtual boost::posix_time::ptime retrieveLastTimestamp() throw(soci::soci_error); typedef soci::rowset< NewFileRow > NewFileRowset; virtual void persistLastTimestamp(boost::posix_time::ptime) throw(soci::soci_error); typedef boost::shared_ptr< NewFileRowset > NewFileRowsetSP; //------------------------------------------------------------------------------ // [Public] File row set definition //------------------------------------------------------------------------------ typedef boost::tuple< boost::optional<std::string>, boost::optional<std::string>, boost::optional<int>, boost::optional<std::string>, boost::optional<std::tm> > FileRow; virtual NewFileRowsetSP retrieveNewFiles(boost::posix_time::ptime) throw(soci::soci_error); typedef soci::rowset< FileRow > FileRowset; virtual void updateNewFilePath(std::string, std::string, int, std::string) throw(soci::soci_error); typedef boost::shared_ptr< FileRowset > FileRowsetSP; //------------------------------------------------------------------------------ // [Public] Timestamp methods // [Public] New file method //------------------------------------------------------------------------------ virtual boost::posix_time::ptime retrieveLastTimestamp() virtual FileRowsetSP retrieveNewFiles(boost::posix_time::ptime) throw(soci::soci_error); virtual void persistLastTimestamp(boost::posix_time::ptime) virtual void updateNewFilePath(std::string, std::string, int, std::string) throw(soci::soci_error); //------------------------------------------------------------------------------ Loading @@ -110,14 +114,7 @@ public: virtual void removeFailedFile(int, std::string) throw(soci::soci_error); typedef boost::tuple< boost::optional<int>, boost::optional<std::string> > FailedFileRow; typedef soci::rowset< FailedFileRow > FailedFileRowset; typedef boost::shared_ptr< FailedFileRowset > FailedFileRowsetSP; virtual FailedFileRowsetSP retrieveFailedFiles() virtual FileRowsetSP retrieveFailedFiles() throw(soci::soci_error); protected: Loading
src/PlainClient.cpp +1 −12 Original line number Diff line number Diff line Loading @@ -122,12 +122,7 @@ void PlainClient::startWriteRequest() try { RequestSP request_sp; if(!m_protocolManager_sp->getRecoveryMode()) request_sp = m_protocolManager_sp->createNewListRequest(); else request_sp = m_protocolManager_sp->createFailedListRequest(); RequestSP request_sp = m_protocolManager_sp->createtRequest(); boost::uint32_t bodySize = request_sp->ByteSize(); Loading @@ -150,12 +145,6 @@ void PlainClient::startWriteRequest() boost::bind(&PlainClient::handleWriteRequest, this, boost::asio::placeholders::error)); } catch(std::logic_error& ec) { WARN_STREAM << "PlainClient::startWriteRequest() " << ec.what() << endl; onTransferFailed(); } catch(std::runtime_error& ec) { ERROR_STREAM << "PlainClient::startWriteRequest() " << ec.what() << endl; Loading