Loading src/Client.cpp +90 −24 Original line number Diff line number Diff line Loading @@ -53,7 +53,7 @@ void Client::start() { DEBUG_STREAM << "Client::start()" << endl; m_dBManager_sp->connect(); m_dBManager_sp->connectAll(); m_protocolManager_sp = ProtocolManager::create(m_deviceImpl_p, m_configuration_sp, m_dBManager_sp); Loading Loading @@ -91,7 +91,10 @@ void Client::stop() m_protocolManager_sp.reset(); m_dBManager_sp->disconnect(); m_dBManager_sp->disconnectAll(); writeState(Tango::OFF); writeStatus("Database loop paused"); } //============================================================================== Loading Loading @@ -188,7 +191,7 @@ void Client::startUpdateLists() m_protocolManager_sp->updateFileLists(); writeState(Tango::ON); writeStatus("Looking for new files"); writeStatus("Database loop active"); } catch(std::exception& ec) { Loading Loading @@ -341,8 +344,7 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) { WARN_STREAM << "Client::handleReadResponseBody() " << ec.what() << endl; writeState(Tango::ALARM); writeStatus(ec.what()); onTransferFailed(); } catch(std::runtime_error& ec) { Loading Loading @@ -387,9 +389,36 @@ void Client::handleReadData(FileWrapper::SP fileWrapper_sp, std::size_t recvByte } else { INFO_STREAM << "Client::handleReadData() transfer complete " << endl; onTransferCompleted(fileWrapper_sp); } } else { WARN_STREAM << "Client::handleReadData() bad I/O" << endl; onTransferFailed(); } } else { ERROR_STREAM << "Client::handleReadData() " << errorCode.message() << " from " << m_remoteEndpoint << endl; writeState(Tango::ALARM); writeStatus(errorCode.message()); } } //============================================================================== // Client::onTransferCompleted() //============================================================================== void Client::onTransferCompleted(FileWrapper::SP fileWrapper_sp) { DEBUG_STREAM << "Client::onTransferCompleted()" << endl; m_protocolManager_sp->markAsCompleted(); try { m_protocolManager_sp->setFileTransfered(fileWrapper_sp); m_protocolManager_sp->nextFile(); Loading @@ -404,22 +433,59 @@ void Client::handleReadData(FileWrapper::SP fileWrapper_sp, std::size_t recvByte startUpdateLists(); } } catch(std::exception& ec) { ERROR_STREAM << "Client::onTransferCompleted() " << ec.what() << endl; writeState(Tango::ALARM); writeStatus(ec.what()); } else catch(...) { ERROR_STREAM << "Client::handleReadData() bad I/O" << endl; ERROR_STREAM << "Client::onTransferCompleted() Unknown error" << endl; writeState(Tango::ALARM); writeStatus("Bad I/O"); writeStatus("Unknown error"); } } //============================================================================== // Client::onTransferFailed() //============================================================================== void Client::onTransferFailed() { DEBUG_STREAM << "Client::onTransferFailed()" << endl; try { m_protocolManager_sp->setFileFailed(); m_protocolManager_sp->nextFile(); if(m_protocolManager_sp->hasNextFile()) { startWriteRequest(); } else { ERROR_STREAM << "Client::handleReadData() " << errorCode.message() << " from " << m_remoteEndpoint << endl; closeConnection(); startUpdateLists(); } } catch(std::exception& ec) { ERROR_STREAM << "Client::onTransferFailed() " << ec.what() << endl; writeState(Tango::ALARM); writeStatus(errorCode.message()); writeStatus(ec.what()); } catch(...) { ERROR_STREAM << "Client::onTransferFailed() Unknown error" << endl; writeState(Tango::ALARM); writeStatus("Unknown error"); } } Loading src/Client.h +7 −0 Original line number Diff line number Diff line Loading @@ -116,6 +116,13 @@ protected: virtual void handleReadData(FileWrapper::SP, std::size_t, const boost::system::error_code&); //------------------------------------------------------------------------------ // [Protected] Transfer result methods //------------------------------------------------------------------------------ void onTransferCompleted(FileWrapper::SP); void onTransferFailed(); //------------------------------------------------------------------------------ // [Protected] Connection reset and timeout handler methods //------------------------------------------------------------------------------ Loading src/DBManager.cpp +34 −6 Original line number Diff line number Diff line Loading @@ -52,7 +52,7 @@ DBManager::SP DBManager::create(Tango::DeviceImpl* deviceImpl_p, //============================================================================== // DBManager::connect() //============================================================================== void DBManager::connect() throw(soci::soci_error) void DBManager::connectAll() throw(soci::soci_error) { DEBUG_STREAM << "DBManager::connect()" << endl; Loading Loading @@ -88,7 +88,7 @@ void DBManager::connect() throw(soci::soci_error) //============================================================================== // DBManager::disconnect() //============================================================================== void DBManager::disconnect() void DBManager::disconnectAll() { DEBUG_STREAM << "DBManager::disconnect()" << endl; Loading @@ -99,6 +99,34 @@ void DBManager::disconnect() m_auxSession_sp->close(); } //============================================================================== // DBManager::getMainTransaction() //============================================================================== DBManager::TransactionSP DBManager::getMainTransaction() { DEBUG_STREAM << "DBManager::getMainTransaction()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); TransactionSP transaction_sp(new soci::transaction(*m_mainSession_sp)); return transaction_sp; } //============================================================================== // DBManager::getAuxTransaction() //============================================================================== DBManager::TransactionSP DBManager::getAuxTransaction() { DEBUG_STREAM << "DBManager::getAuxTransaction()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); TransactionSP transaction_sp(new soci::transaction(*m_auxSession_sp)); return transaction_sp; } //============================================================================== // DBManager::retrieveNewFiles() //============================================================================== Loading Loading @@ -134,10 +162,10 @@ void DBManager::updateNewFilePath(std::string storagePath, std::string filePath, if(m_mainSession_sp->get_backend() == NULL) m_mainSession_sp->reconnect(); *m_mainSession_sp << "insert into " << m_configuration_sp->getDatabaseSchema() << "." << m_configuration_sp->getDatabaseTable() << " (storage_path, " << "file_path) values (:storagePath, :filePath) where file_version = " << ":fileVersion and file_name like :FileName", *m_mainSession_sp << "update " << m_configuration_sp->getDatabaseSchema() << "." << m_configuration_sp->getDatabaseTable() << " set storage_path = :storagePath, file_path = :filePath " << " where file_version = :fileVersion and file_name like :fileName", soci::use(storagePath, "storagePath"), soci::use(filePath, "filePath"), soci::use(fileVersion, "fileVersion"), soci::use(fileName, "fileName"); } Loading src/DBManager.h +12 −3 Original line number Diff line number Diff line Loading @@ -61,11 +61,20 @@ public: static DBManager::SP create(Tango::DeviceImpl*, Configuration::SP); //------------------------------------------------------------------------------ // [Public] Connection handling methods // [Public] Connections handling methods //------------------------------------------------------------------------------ virtual void connect() throw(soci::soci_error); virtual void connectAll() throw(soci::soci_error); virtual void disconnect(); virtual void disconnectAll(); //------------------------------------------------------------------------------ // [Public] Transaction retriever methods //------------------------------------------------------------------------------ typedef boost::shared_ptr<soci::transaction> TransactionSP; TransactionSP getMainTransaction(); TransactionSP getAuxTransaction(); //------------------------------------------------------------------------------ // [Public] New file method Loading src/FileWrapper.cpp +69 −6 Original line number Diff line number Diff line Loading @@ -7,13 +7,34 @@ namespace DataImporter_ns // FileWrapper::FileWrapper() //============================================================================== FileWrapper::FileWrapper(Tango::DeviceImpl* deviceImpl_p, boost::filesystem::path filePath, boost::uint64_t totalFileSize) : Tango::LogAdapter(deviceImpl_p), m_totalFileSize(totalFileSize) std::string storagePath, std::string filePath, int fileVersion, std::string fileName, boost::uint64_t totalFileSize) throw(std::logic_error) : Tango::LogAdapter(deviceImpl_p), m_storagePath(storagePath), m_filePath(filePath), m_fileVersion(fileVersion), m_fileName(fileName), m_totalFileSize(totalFileSize) { DEBUG_STREAM << "FileWrapper::FileWrapper()" << endl; m_outputFileStream.open(filePath.string(), std::ios::binary); boost::filesystem::path destPath(storagePath); destPath /= filePath; std::stringstream fileStream; fileStream << "/" << fileVersion; destPath /= fileStream.str(); if(!boost::filesystem::exists(destPath)) boost::filesystem::create_directories(destPath); if(!boost::filesystem::is_directory(destPath)) throw std::logic_error("Destination path \'" + destPath.string() + "\' is not a directory" ); destPath /= fileName; m_outputFileStream.open(destPath.string(), std::ios::binary); } //============================================================================== Loading @@ -30,14 +51,56 @@ FileWrapper::~FileWrapper() // FileWrapper::create() //============================================================================== FileWrapper::SP FileWrapper::create(Tango::DeviceImpl* deviceImpl_p, boost::filesystem::path filePath, boost::uint64_t totalFileSize) std::string storagePath, std::string filePath, int fileVersion, std::string fileName, boost::uint64_t fileSize) throw(std::logic_error) { FileWrapper::SP d_sp(new FileWrapper(deviceImpl_p, filePath, totalFileSize), FileWrapper::Deleter()); FileWrapper::SP d_sp(new FileWrapper(deviceImpl_p, storagePath, filePath, fileVersion, fileName, fileSize), FileWrapper::Deleter()); return d_sp; } //============================================================================== // FileWrapper::getStoragePath() //============================================================================== std::string FileWrapper::getStoragePath() { DEBUG_STREAM << "FileWrapper::getStoragePath()" << endl; return m_storagePath; } //============================================================================== // FileWrapper::getFilePath() //============================================================================== std::string FileWrapper::getFilePath() { DEBUG_STREAM << "FileWrapper::getFilePath()" << endl; return m_filePath; } //============================================================================== // FileWrapper::getFileVersion() //============================================================================== int FileWrapper::getFileVersion() { DEBUG_STREAM << "FileWrapper::getFileVersion()" << endl; return m_fileVersion; } //============================================================================== // FileWrapper::getFileName() //============================================================================== std::string FileWrapper::getFileName() { DEBUG_STREAM << "FileWrapper::getFileName()" << endl; return m_fileName; } //============================================================================== // FileWrapper::isOpen() //============================================================================== Loading Loading
src/Client.cpp +90 −24 Original line number Diff line number Diff line Loading @@ -53,7 +53,7 @@ void Client::start() { DEBUG_STREAM << "Client::start()" << endl; m_dBManager_sp->connect(); m_dBManager_sp->connectAll(); m_protocolManager_sp = ProtocolManager::create(m_deviceImpl_p, m_configuration_sp, m_dBManager_sp); Loading Loading @@ -91,7 +91,10 @@ void Client::stop() m_protocolManager_sp.reset(); m_dBManager_sp->disconnect(); m_dBManager_sp->disconnectAll(); writeState(Tango::OFF); writeStatus("Database loop paused"); } //============================================================================== Loading Loading @@ -188,7 +191,7 @@ void Client::startUpdateLists() m_protocolManager_sp->updateFileLists(); writeState(Tango::ON); writeStatus("Looking for new files"); writeStatus("Database loop active"); } catch(std::exception& ec) { Loading Loading @@ -341,8 +344,7 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) { WARN_STREAM << "Client::handleReadResponseBody() " << ec.what() << endl; writeState(Tango::ALARM); writeStatus(ec.what()); onTransferFailed(); } catch(std::runtime_error& ec) { Loading Loading @@ -387,9 +389,36 @@ void Client::handleReadData(FileWrapper::SP fileWrapper_sp, std::size_t recvByte } else { INFO_STREAM << "Client::handleReadData() transfer complete " << endl; onTransferCompleted(fileWrapper_sp); } } else { WARN_STREAM << "Client::handleReadData() bad I/O" << endl; onTransferFailed(); } } else { ERROR_STREAM << "Client::handleReadData() " << errorCode.message() << " from " << m_remoteEndpoint << endl; writeState(Tango::ALARM); writeStatus(errorCode.message()); } } //============================================================================== // Client::onTransferCompleted() //============================================================================== void Client::onTransferCompleted(FileWrapper::SP fileWrapper_sp) { DEBUG_STREAM << "Client::onTransferCompleted()" << endl; m_protocolManager_sp->markAsCompleted(); try { m_protocolManager_sp->setFileTransfered(fileWrapper_sp); m_protocolManager_sp->nextFile(); Loading @@ -404,22 +433,59 @@ void Client::handleReadData(FileWrapper::SP fileWrapper_sp, std::size_t recvByte startUpdateLists(); } } catch(std::exception& ec) { ERROR_STREAM << "Client::onTransferCompleted() " << ec.what() << endl; writeState(Tango::ALARM); writeStatus(ec.what()); } else catch(...) { ERROR_STREAM << "Client::handleReadData() bad I/O" << endl; ERROR_STREAM << "Client::onTransferCompleted() Unknown error" << endl; writeState(Tango::ALARM); writeStatus("Bad I/O"); writeStatus("Unknown error"); } } //============================================================================== // Client::onTransferFailed() //============================================================================== void Client::onTransferFailed() { DEBUG_STREAM << "Client::onTransferFailed()" << endl; try { m_protocolManager_sp->setFileFailed(); m_protocolManager_sp->nextFile(); if(m_protocolManager_sp->hasNextFile()) { startWriteRequest(); } else { ERROR_STREAM << "Client::handleReadData() " << errorCode.message() << " from " << m_remoteEndpoint << endl; closeConnection(); startUpdateLists(); } } catch(std::exception& ec) { ERROR_STREAM << "Client::onTransferFailed() " << ec.what() << endl; writeState(Tango::ALARM); writeStatus(errorCode.message()); writeStatus(ec.what()); } catch(...) { ERROR_STREAM << "Client::onTransferFailed() Unknown error" << endl; writeState(Tango::ALARM); writeStatus("Unknown error"); } } Loading
src/Client.h +7 −0 Original line number Diff line number Diff line Loading @@ -116,6 +116,13 @@ protected: virtual void handleReadData(FileWrapper::SP, std::size_t, const boost::system::error_code&); //------------------------------------------------------------------------------ // [Protected] Transfer result methods //------------------------------------------------------------------------------ void onTransferCompleted(FileWrapper::SP); void onTransferFailed(); //------------------------------------------------------------------------------ // [Protected] Connection reset and timeout handler methods //------------------------------------------------------------------------------ Loading
src/DBManager.cpp +34 −6 Original line number Diff line number Diff line Loading @@ -52,7 +52,7 @@ DBManager::SP DBManager::create(Tango::DeviceImpl* deviceImpl_p, //============================================================================== // DBManager::connect() //============================================================================== void DBManager::connect() throw(soci::soci_error) void DBManager::connectAll() throw(soci::soci_error) { DEBUG_STREAM << "DBManager::connect()" << endl; Loading Loading @@ -88,7 +88,7 @@ void DBManager::connect() throw(soci::soci_error) //============================================================================== // DBManager::disconnect() //============================================================================== void DBManager::disconnect() void DBManager::disconnectAll() { DEBUG_STREAM << "DBManager::disconnect()" << endl; Loading @@ -99,6 +99,34 @@ void DBManager::disconnect() m_auxSession_sp->close(); } //============================================================================== // DBManager::getMainTransaction() //============================================================================== DBManager::TransactionSP DBManager::getMainTransaction() { DEBUG_STREAM << "DBManager::getMainTransaction()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); TransactionSP transaction_sp(new soci::transaction(*m_mainSession_sp)); return transaction_sp; } //============================================================================== // DBManager::getAuxTransaction() //============================================================================== DBManager::TransactionSP DBManager::getAuxTransaction() { DEBUG_STREAM << "DBManager::getAuxTransaction()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); TransactionSP transaction_sp(new soci::transaction(*m_auxSession_sp)); return transaction_sp; } //============================================================================== // DBManager::retrieveNewFiles() //============================================================================== Loading Loading @@ -134,10 +162,10 @@ void DBManager::updateNewFilePath(std::string storagePath, std::string filePath, if(m_mainSession_sp->get_backend() == NULL) m_mainSession_sp->reconnect(); *m_mainSession_sp << "insert into " << m_configuration_sp->getDatabaseSchema() << "." << m_configuration_sp->getDatabaseTable() << " (storage_path, " << "file_path) values (:storagePath, :filePath) where file_version = " << ":fileVersion and file_name like :FileName", *m_mainSession_sp << "update " << m_configuration_sp->getDatabaseSchema() << "." << m_configuration_sp->getDatabaseTable() << " set storage_path = :storagePath, file_path = :filePath " << " where file_version = :fileVersion and file_name like :fileName", soci::use(storagePath, "storagePath"), soci::use(filePath, "filePath"), soci::use(fileVersion, "fileVersion"), soci::use(fileName, "fileName"); } Loading
src/DBManager.h +12 −3 Original line number Diff line number Diff line Loading @@ -61,11 +61,20 @@ public: static DBManager::SP create(Tango::DeviceImpl*, Configuration::SP); //------------------------------------------------------------------------------ // [Public] Connection handling methods // [Public] Connections handling methods //------------------------------------------------------------------------------ virtual void connect() throw(soci::soci_error); virtual void connectAll() throw(soci::soci_error); virtual void disconnect(); virtual void disconnectAll(); //------------------------------------------------------------------------------ // [Public] Transaction retriever methods //------------------------------------------------------------------------------ typedef boost::shared_ptr<soci::transaction> TransactionSP; TransactionSP getMainTransaction(); TransactionSP getAuxTransaction(); //------------------------------------------------------------------------------ // [Public] New file method Loading
src/FileWrapper.cpp +69 −6 Original line number Diff line number Diff line Loading @@ -7,13 +7,34 @@ namespace DataImporter_ns // FileWrapper::FileWrapper() //============================================================================== FileWrapper::FileWrapper(Tango::DeviceImpl* deviceImpl_p, boost::filesystem::path filePath, boost::uint64_t totalFileSize) : Tango::LogAdapter(deviceImpl_p), m_totalFileSize(totalFileSize) std::string storagePath, std::string filePath, int fileVersion, std::string fileName, boost::uint64_t totalFileSize) throw(std::logic_error) : Tango::LogAdapter(deviceImpl_p), m_storagePath(storagePath), m_filePath(filePath), m_fileVersion(fileVersion), m_fileName(fileName), m_totalFileSize(totalFileSize) { DEBUG_STREAM << "FileWrapper::FileWrapper()" << endl; m_outputFileStream.open(filePath.string(), std::ios::binary); boost::filesystem::path destPath(storagePath); destPath /= filePath; std::stringstream fileStream; fileStream << "/" << fileVersion; destPath /= fileStream.str(); if(!boost::filesystem::exists(destPath)) boost::filesystem::create_directories(destPath); if(!boost::filesystem::is_directory(destPath)) throw std::logic_error("Destination path \'" + destPath.string() + "\' is not a directory" ); destPath /= fileName; m_outputFileStream.open(destPath.string(), std::ios::binary); } //============================================================================== Loading @@ -30,14 +51,56 @@ FileWrapper::~FileWrapper() // FileWrapper::create() //============================================================================== FileWrapper::SP FileWrapper::create(Tango::DeviceImpl* deviceImpl_p, boost::filesystem::path filePath, boost::uint64_t totalFileSize) std::string storagePath, std::string filePath, int fileVersion, std::string fileName, boost::uint64_t fileSize) throw(std::logic_error) { FileWrapper::SP d_sp(new FileWrapper(deviceImpl_p, filePath, totalFileSize), FileWrapper::Deleter()); FileWrapper::SP d_sp(new FileWrapper(deviceImpl_p, storagePath, filePath, fileVersion, fileName, fileSize), FileWrapper::Deleter()); return d_sp; } //============================================================================== // FileWrapper::getStoragePath() //============================================================================== std::string FileWrapper::getStoragePath() { DEBUG_STREAM << "FileWrapper::getStoragePath()" << endl; return m_storagePath; } //============================================================================== // FileWrapper::getFilePath() //============================================================================== std::string FileWrapper::getFilePath() { DEBUG_STREAM << "FileWrapper::getFilePath()" << endl; return m_filePath; } //============================================================================== // FileWrapper::getFileVersion() //============================================================================== int FileWrapper::getFileVersion() { DEBUG_STREAM << "FileWrapper::getFileVersion()" << endl; return m_fileVersion; } //============================================================================== // FileWrapper::getFileName() //============================================================================== std::string FileWrapper::getFileName() { DEBUG_STREAM << "FileWrapper::getFileName()" << endl; return m_fileName; } //============================================================================== // FileWrapper::isOpen() //============================================================================== Loading