Loading proto/Request.proto +5 −4 Original line number Diff line number Diff line Loading @@ -8,7 +8,8 @@ message Request { AUTHORIZATION = 0; VALIDATION = 1; DATA = 2; TRANSFER = 2; KEEPALIVE = 3; } required Type type = 1; Loading @@ -33,13 +34,13 @@ message Request optional Validation validation = 3; //Data request //Transfer request message Data message Transfer { required int32 file_version = 1; required string file_name = 2; } optional Data data = 4; optional Transfer transfer = 4; } proto/Response.proto +9 −8 Original line number Diff line number Diff line Loading @@ -8,7 +8,8 @@ message Response { AUTHORIZATION = 0; VALIDATION = 1; DATA = 2; TRANSFER = 2; KEEPALIVE = 3; } required Type type = 1; Loading Loading @@ -45,9 +46,9 @@ message Response optional Validation validation = 3; //Data response //Transfer response message Data message Transfer { enum State { Loading @@ -58,11 +59,11 @@ message Response required State state = 1; required string status = 2; required string file_path = 3; required int32 file_version = 4; required string file_name = 5; required uint64 size = 6; optional string file_path = 3; optional int32 file_version = 4; optional string file_name = 5; optional uint64 size = 6; } optional Data data = 4; optional Transfer transfer = 4; } src/Client.cpp +74 −16 Original line number Diff line number Diff line Loading @@ -2,6 +2,7 @@ #include <boost/lexical_cast.hpp> #include <boost/bind.hpp> #include <fstream> namespace DataImporter_ns { Loading Loading @@ -278,20 +279,42 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) response_sp->ParseFromArray(&m_readBuff[HEADER_SIZE], m_readBuff.size() - HEADER_SIZE); // m_protocolManager_sp->processResponse(response_sp); // // if(m_protocolManager_sp->waitBeforeRequest()) // { // m_requestResponseTimer.expires_from_now( // boost::posix_time::seconds(m_configuration_sp->getRefreshTime())); // // m_requestResponseTimer.async_wait( // boost::bind(&Client::startWriteRequest, this)); // } // else // { // startWriteRequest(); // } m_protocolManager_sp->processResponse(response_sp); if(m_protocolManager_sp->isTransferRequest()) { std::string fileName = m_protocolManager_sp->getFileName(); int fileSize = m_protocolManager_sp->getFileSize(); INFO_STREAM << "Session::handleWriteResponse() transfer file " << fileName << " size " << fileSize << " from " << m_remoteEndpoint << endl; m_outputStreamSize = fileSize; if(m_outputStream.is_open()) m_outputStream.close(); m_outputStream.open(fileName.c_str(), std::ios::binary); if(m_outputStream) { startReadData(); } else { ERROR_STREAM << "Session::handleWriteResponse() Cannot open " << fileName << endl; } } else { m_requestResponseTimer.expires_from_now( boost::posix_time::seconds(m_configuration_sp->getRefreshTime())); m_requestResponseTimer.async_wait( boost::bind(&Client::startWriteRequest, this)); } } catch(std::exception& ec) { Loading Loading @@ -320,11 +343,46 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) //============================================================================== // Client::handleReadData() //============================================================================== void Client::handleReadData(const boost::system::error_code& errorCode) void Client::handleReadData(const boost::system::error_code& errorCode, std::size_t bytes_transferred) { DEBUG_STREAM << "Client::handleReadData()" << endl; //DEBUG_STREAM << "Client::handleReadData()" << endl; if(!errorCode) { if(bytes_transferred>0) { m_outputStream.write(&m_fileBuff[0], (std::streamsize)bytes_transferred); /*/ INFO_STREAM << "Client::handleReadData() write " << m_outputStream.tellp() << "/" <<m_outputStreamSize << endl; */ } if(m_outputStream.tellp()<m_outputStreamSize) { startReadData(); } else { INFO_STREAM << "Client::handleReadData() transfer complete " << endl; m_outputStream.close(); startWriteRequest(); } } else if(errorCode == boost::asio::error::eof) { DEBUG_STREAM << "Client::handleReadData() end of file from " << m_remoteEndpoint << endl; } else { ERROR_STREAM << "Client::handleReadData() " << errorCode.message() << " from " << m_remoteEndpoint << endl; } } //============================================================================== Loading src/Client.h +11 −2 Original line number Diff line number Diff line Loading @@ -14,6 +14,7 @@ #include <boost/thread.hpp> #include <boost/scoped_ptr.hpp> #include <boost/cstdint.hpp> #include <boost/array.hpp> namespace DataImporter_ns { Loading Loading @@ -100,11 +101,11 @@ protected: virtual void handleReadResponseBody(const boost::system::error_code&); //------------------------------------------------------------------------------ // [Protected] Read response body methods // [Protected] Read data methods //------------------------------------------------------------------------------ virtual void startReadData() = 0; virtual void handleReadData(const boost::system::error_code&); virtual void handleReadData(const boost::system::error_code&, std::size_t); //------------------------------------------------------------------------------ // [Protected] Connection reset and timeout handler methods Loading Loading @@ -175,6 +176,14 @@ protected: //Address and port of remote endpoint std::string m_remoteEndpoint; const int BUFFER_SIZE = 40960; std::vector<char> m_fileBuff; std::ofstream m_outputStream; int m_outputStreamSize; }; } //End of namespace Loading src/DBManager.cpp +9 −16 Original line number Diff line number Diff line Loading @@ -85,7 +85,7 @@ void DBManager::disconnect() //============================================================================== // DBManager::retrieveLastTimestamp() //============================================================================== std::tm DBManager::retrieveLastTimestamp() boost::posix_time::ptime DBManager::retrieveLastTimestamp() throw(std::runtime_error, std::out_of_range) { DEBUG_STREAM << "DBManager::retrieveLastTimestamp()" << endl; Loading Loading @@ -114,23 +114,19 @@ std::tm DBManager::retrieveLastTimestamp() std::string timestamp("1970-01-01 00:00:00"); db_data[0] >> timestamp; boost::posix_time::ptime ptime = boost::posix_time::time_from_string(timestamp); return boost::posix_time::to_tm(ptime); return boost::posix_time::time_from_string(timestamp); } //============================================================================== // DBManager::persistLastTimestamp() //============================================================================== void DBManager::persistLastTimestamp(std::tm lastTimestamp) throw(std::runtime_error, std::out_of_range) void DBManager::persistLastTimestamp(boost::posix_time::ptime ptime) throw(std::runtime_error) { DEBUG_STREAM << "DBManager::persistLastTimestamp()" << endl; boost::mutex::scoped_lock lock(m_tangoDBMutex); boost::posix_time::ptime ptime = boost::posix_time::ptime_from_tm(lastTimestamp); std::string timestampString = boost::posix_time::to_simple_string(ptime); Tango::DbDatum timestamp("LastTimestamp"); Loading @@ -157,24 +153,21 @@ void DBManager::persistLastTimestamp(std::tm lastTimestamp) } //============================================================================== // DBManager::retrieveNewTuples() // DBManager::retrieveNewFile() //============================================================================== DBManager::FileRowsetSP DBManager::retrieveNewTuples(std::string schema, std::string table, std::tm lastTimestamp) throw(soci::soci_error, std::out_of_range) DBManager::FileRowsetSP DBManager::retrieveNewFile(std::string schema, std::string table, boost::posix_time::ptime ptime) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveNewTuples()" << endl; DEBUG_STREAM << "DBManager::retrieveNewFile()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_session_sp->get_backend() == NULL) m_session_sp->reconnect(); boost::posix_time::ptime timestamp = boost::posix_time::ptime_from_tm(lastTimestamp); FileRowsetSP fileRowset_sp(new FileRowset(m_session_sp->prepare << "select" << " file_version, file_name, update_time from " << schema << "." << table << " where update_time>'" << boost::posix_time::to_iso_string(timestamp) << " where update_time>'" << boost::posix_time::to_iso_string(ptime) << "' order by update_time asc")); return fileRowset_sp; Loading Loading
proto/Request.proto +5 −4 Original line number Diff line number Diff line Loading @@ -8,7 +8,8 @@ message Request { AUTHORIZATION = 0; VALIDATION = 1; DATA = 2; TRANSFER = 2; KEEPALIVE = 3; } required Type type = 1; Loading @@ -33,13 +34,13 @@ message Request optional Validation validation = 3; //Data request //Transfer request message Data message Transfer { required int32 file_version = 1; required string file_name = 2; } optional Data data = 4; optional Transfer transfer = 4; }
proto/Response.proto +9 −8 Original line number Diff line number Diff line Loading @@ -8,7 +8,8 @@ message Response { AUTHORIZATION = 0; VALIDATION = 1; DATA = 2; TRANSFER = 2; KEEPALIVE = 3; } required Type type = 1; Loading Loading @@ -45,9 +46,9 @@ message Response optional Validation validation = 3; //Data response //Transfer response message Data message Transfer { enum State { Loading @@ -58,11 +59,11 @@ message Response required State state = 1; required string status = 2; required string file_path = 3; required int32 file_version = 4; required string file_name = 5; required uint64 size = 6; optional string file_path = 3; optional int32 file_version = 4; optional string file_name = 5; optional uint64 size = 6; } optional Data data = 4; optional Transfer transfer = 4; }
src/Client.cpp +74 −16 Original line number Diff line number Diff line Loading @@ -2,6 +2,7 @@ #include <boost/lexical_cast.hpp> #include <boost/bind.hpp> #include <fstream> namespace DataImporter_ns { Loading Loading @@ -278,20 +279,42 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) response_sp->ParseFromArray(&m_readBuff[HEADER_SIZE], m_readBuff.size() - HEADER_SIZE); // m_protocolManager_sp->processResponse(response_sp); // // if(m_protocolManager_sp->waitBeforeRequest()) // { // m_requestResponseTimer.expires_from_now( // boost::posix_time::seconds(m_configuration_sp->getRefreshTime())); // // m_requestResponseTimer.async_wait( // boost::bind(&Client::startWriteRequest, this)); // } // else // { // startWriteRequest(); // } m_protocolManager_sp->processResponse(response_sp); if(m_protocolManager_sp->isTransferRequest()) { std::string fileName = m_protocolManager_sp->getFileName(); int fileSize = m_protocolManager_sp->getFileSize(); INFO_STREAM << "Session::handleWriteResponse() transfer file " << fileName << " size " << fileSize << " from " << m_remoteEndpoint << endl; m_outputStreamSize = fileSize; if(m_outputStream.is_open()) m_outputStream.close(); m_outputStream.open(fileName.c_str(), std::ios::binary); if(m_outputStream) { startReadData(); } else { ERROR_STREAM << "Session::handleWriteResponse() Cannot open " << fileName << endl; } } else { m_requestResponseTimer.expires_from_now( boost::posix_time::seconds(m_configuration_sp->getRefreshTime())); m_requestResponseTimer.async_wait( boost::bind(&Client::startWriteRequest, this)); } } catch(std::exception& ec) { Loading Loading @@ -320,11 +343,46 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) //============================================================================== // Client::handleReadData() //============================================================================== void Client::handleReadData(const boost::system::error_code& errorCode) void Client::handleReadData(const boost::system::error_code& errorCode, std::size_t bytes_transferred) { DEBUG_STREAM << "Client::handleReadData()" << endl; //DEBUG_STREAM << "Client::handleReadData()" << endl; if(!errorCode) { if(bytes_transferred>0) { m_outputStream.write(&m_fileBuff[0], (std::streamsize)bytes_transferred); /*/ INFO_STREAM << "Client::handleReadData() write " << m_outputStream.tellp() << "/" <<m_outputStreamSize << endl; */ } if(m_outputStream.tellp()<m_outputStreamSize) { startReadData(); } else { INFO_STREAM << "Client::handleReadData() transfer complete " << endl; m_outputStream.close(); startWriteRequest(); } } else if(errorCode == boost::asio::error::eof) { DEBUG_STREAM << "Client::handleReadData() end of file from " << m_remoteEndpoint << endl; } else { ERROR_STREAM << "Client::handleReadData() " << errorCode.message() << " from " << m_remoteEndpoint << endl; } } //============================================================================== Loading
src/Client.h +11 −2 Original line number Diff line number Diff line Loading @@ -14,6 +14,7 @@ #include <boost/thread.hpp> #include <boost/scoped_ptr.hpp> #include <boost/cstdint.hpp> #include <boost/array.hpp> namespace DataImporter_ns { Loading Loading @@ -100,11 +101,11 @@ protected: virtual void handleReadResponseBody(const boost::system::error_code&); //------------------------------------------------------------------------------ // [Protected] Read response body methods // [Protected] Read data methods //------------------------------------------------------------------------------ virtual void startReadData() = 0; virtual void handleReadData(const boost::system::error_code&); virtual void handleReadData(const boost::system::error_code&, std::size_t); //------------------------------------------------------------------------------ // [Protected] Connection reset and timeout handler methods Loading Loading @@ -175,6 +176,14 @@ protected: //Address and port of remote endpoint std::string m_remoteEndpoint; const int BUFFER_SIZE = 40960; std::vector<char> m_fileBuff; std::ofstream m_outputStream; int m_outputStreamSize; }; } //End of namespace Loading
src/DBManager.cpp +9 −16 Original line number Diff line number Diff line Loading @@ -85,7 +85,7 @@ void DBManager::disconnect() //============================================================================== // DBManager::retrieveLastTimestamp() //============================================================================== std::tm DBManager::retrieveLastTimestamp() boost::posix_time::ptime DBManager::retrieveLastTimestamp() throw(std::runtime_error, std::out_of_range) { DEBUG_STREAM << "DBManager::retrieveLastTimestamp()" << endl; Loading Loading @@ -114,23 +114,19 @@ std::tm DBManager::retrieveLastTimestamp() std::string timestamp("1970-01-01 00:00:00"); db_data[0] >> timestamp; boost::posix_time::ptime ptime = boost::posix_time::time_from_string(timestamp); return boost::posix_time::to_tm(ptime); return boost::posix_time::time_from_string(timestamp); } //============================================================================== // DBManager::persistLastTimestamp() //============================================================================== void DBManager::persistLastTimestamp(std::tm lastTimestamp) throw(std::runtime_error, std::out_of_range) void DBManager::persistLastTimestamp(boost::posix_time::ptime ptime) throw(std::runtime_error) { DEBUG_STREAM << "DBManager::persistLastTimestamp()" << endl; boost::mutex::scoped_lock lock(m_tangoDBMutex); boost::posix_time::ptime ptime = boost::posix_time::ptime_from_tm(lastTimestamp); std::string timestampString = boost::posix_time::to_simple_string(ptime); Tango::DbDatum timestamp("LastTimestamp"); Loading @@ -157,24 +153,21 @@ void DBManager::persistLastTimestamp(std::tm lastTimestamp) } //============================================================================== // DBManager::retrieveNewTuples() // DBManager::retrieveNewFile() //============================================================================== DBManager::FileRowsetSP DBManager::retrieveNewTuples(std::string schema, std::string table, std::tm lastTimestamp) throw(soci::soci_error, std::out_of_range) DBManager::FileRowsetSP DBManager::retrieveNewFile(std::string schema, std::string table, boost::posix_time::ptime ptime) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveNewTuples()" << endl; DEBUG_STREAM << "DBManager::retrieveNewFile()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_session_sp->get_backend() == NULL) m_session_sp->reconnect(); boost::posix_time::ptime timestamp = boost::posix_time::ptime_from_tm(lastTimestamp); FileRowsetSP fileRowset_sp(new FileRowset(m_session_sp->prepare << "select" << " file_version, file_name, update_time from " << schema << "." << table << " where update_time>'" << boost::posix_time::to_iso_string(timestamp) << " where update_time>'" << boost::posix_time::to_iso_string(ptime) << "' order by update_time asc")); return fileRowset_sp; Loading