Loading proto/Request.proto +16 −4 Original line number Diff line number Diff line Loading @@ -7,7 +7,8 @@ message Request enum Type { AUTHORIZATION = 0; DATA = 1; VALIDATION = 1; DATA = 2; } required Type type = 1; Loading @@ -22,12 +23,23 @@ message Request optional Authorization authorization = 2; //Validation request message Validation { required string schema = 1; required string table = 2; } optional Validation validation = 3; //Data request message Data { required string file_path = 1; required int32 file_version = 2; required string file_name = 3; required int32 file_version = 1; required string file_name = 2; } optional Data data = 4; } proto/Response.proto +24 −2 Original line number Diff line number Diff line Loading @@ -7,7 +7,8 @@ message Response enum Type { AUTHORIZATION = 0; DATA = 1; VALIDATION = 1; DATA = 2; } required Type type = 1; Loading @@ -28,6 +29,22 @@ message Response optional Authorization authorization = 2; //Validation response message Validation { enum State { ACCEPTED = 0; REJECTED = 1; } required State state = 1; required string status = 2; } optional Validation validation = 3; //Data response message Data Loading @@ -41,6 +58,11 @@ message Response required State state = 1; required string status = 2; required uint64 size = 3; required string file_path = 3; required int32 file_version = 4; required string file_name = 5; required uint64 size = 6; } optional Data data = 4; } src/Client.cpp +17 −17 Original line number Diff line number Diff line Loading @@ -279,19 +279,19 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) m_readBuff.size() - HEADER_SIZE); // m_protocolManager_sp->processResponse(response_sp); if(m_protocolManager_sp->waitBeforeRequest()) { m_requestResponseTimer.expires_from_now( boost::posix_time::seconds(m_configuration_sp->getRefreshTime())); m_requestResponseTimer.async_wait( boost::bind(&Client::startWriteRequest, this)); } else { startWriteRequest(); } // // if(m_protocolManager_sp->waitBeforeRequest()) // { // m_requestResponseTimer.expires_from_now( // boost::posix_time::seconds(m_configuration_sp->getRefreshTime())); // // m_requestResponseTimer.async_wait( // boost::bind(&Client::startWriteRequest, this)); // } // else // { // startWriteRequest(); // } } catch(std::exception& ec) { Loading @@ -318,7 +318,7 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) } //============================================================================== // Client::resetConnection() // Client::handleReadData() //============================================================================== void Client::handleReadData(const boost::system::error_code& errorCode) { Loading @@ -342,7 +342,7 @@ void Client::resetConnection() m_resetConnectionTimer.expires_at(boost::posix_time::pos_infin); m_requestResponseTimer.expires_at(boost::posix_time::pos_infin); //m_protocolManager_sp->resetProtocolStatus(); m_protocolManager_sp->resetProtocolStatus(); closeConnection(); Loading src/DBManager.cpp +74 −1 Original line number Diff line number Diff line Loading @@ -18,7 +18,11 @@ DBManager::DBManager(Tango::DeviceImpl* deviceImpl_p, { DEBUG_STREAM << "DBManager::DBManager()" << endl; m_deviceName = deviceImpl_p->get_name_lower(); m_session_sp.reset(new soci::session); m_tangoDB_sp.reset(new Tango::Database); } //============================================================================== Loading Loading @@ -53,13 +57,14 @@ void DBManager::connect() throw(soci::soci_error) boost::mutex::scoped_lock lock(m_connectionMutex); std::stringstream connection; connection << " host=" << m_configuration_sp->getDatabaseHost(); connection << " port=" << m_configuration_sp->getDatabasePort(); connection << " user=" << m_configuration_sp->getDatabaseUsername(); connection << " password=" << m_configuration_sp->getDatabasePassword(); #ifdef VERBOSE_DEBUG INFO_STREAM << "CONNECTION: " << connection.str() << endl; INFO_STREAM << "MAIN CONNECTION: " << connection.str() << endl; #endif m_session_sp->open(soci::mysql, connection.str()); Loading @@ -77,4 +82,72 @@ void DBManager::disconnect() m_session_sp->close(); } //============================================================================== // DBManager::retrieveLastTimestamp() //============================================================================== std::tm DBManager::retrieveLastTimestamp() throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveLastTimestamp()" << endl; boost::mutex::scoped_lock lock(m_connectionMutex); // if(m_auxSession_sp->get_backend() == NULL) // m_auxSession_sp->reconnect(); // // std::tm lastTimestamp; // // *m_auxSession_sp << "select coalesce(max(update_time),'1970-01-01 00:00:00') " // << "from " << m_configuration_sp->getDIDBSchema() << "." // << m_configuration_sp->getDIDBIndexTable(), soci::into(lastTimestamp); // // return lastTimestamp; } //============================================================================== // DBManager::persistLastTimestamp() //============================================================================== void DBManager::persistLastTimestamp(std::tm lastTimestamp) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::persistLastTimestamp()" << endl; boost::mutex::scoped_lock lock(m_connectionMutex); // if(m_auxSession_sp->get_backend() == NULL) // m_auxSession_sp->reconnect(); // // *m_auxSession_sp << "insert into " << m_configuration_sp->getDIDBSchema() // << "." << m_configuration_sp->getDIDBIndexTable() // << " values(:lastTimestamp)", soci::use(lastTimestamp, "lastTimestamp"); } //============================================================================== // DBManager::retrieveNewTuples() //============================================================================== DBManager::RowsetSP DBManager::retrieveNewTuples(std::tm lastTimestamp) throw(soci::soci_error, std::out_of_range) { DEBUG_STREAM << "DBManager::retrieveNewTuples()" << endl; boost::mutex::scoped_lock lock(m_connectionMutex); if(m_session_sp->get_backend() == NULL) m_session_sp->reconnect(); boost::posix_time::ptime timestamp = boost::posix_time::ptime_from_tm(lastTimestamp); // RowsetSP rows(new soci::rowset<soci::row>(m_mainSession_sp->prepare // << "select id, file_path, file_version, file_name, update_time from " // << m_configuration_sp->getDatabaseSchema() << "." // << m_configuration_sp->getDatabaseTable() // << " where update_time>'" // << boost::posix_time::to_iso_string(timestamp) // << "' order by update_time asc")); //return rows; } //============================================================================== // DBManager::retrieveNewTuples() //============================================================================== } //namespace src/DBManager.h +25 −2 Original line number Diff line number Diff line Loading @@ -9,7 +9,6 @@ #define DBMANAGER_H #include <Configuration.h> //#include <Response.pb.h> #include <tango.h> Loading Loading @@ -68,17 +67,41 @@ public: virtual void disconnect(); //------------------------------------------------------------------------------ // [Public] Timestamp methods //------------------------------------------------------------------------------ virtual std::tm retrieveLastTimestamp() throw(soci::soci_error); virtual void persistLastTimestamp(std::tm) throw(soci::soci_error); //------------------------------------------------------------------------------ // [Public] Tuple methods //------------------------------------------------------------------------------ typedef boost::shared_ptr< boost::tuple< boost::optional<int>, boost::optional<std::string>, boost::optional<std::string>, boost::optional<int>, boost::optional<std::string>, boost::optional<std::tm> > > RowsetSP; virtual RowsetSP retrieveNewTuples(std::tm) throw(soci::soci_error, std::out_of_range); protected: //------------------------------------------------------------------------------ // [Protected] Class variables //------------------------------------------------------------------------------ //Device name std::string m_deviceName; //Configuration shared pointer Configuration::SP m_configuration_sp; //Connection mutex boost::mutex m_connectionMutex; //Database connection scoped pointer //Metadata database connection scoped pointer boost::scoped_ptr<soci::session> m_session_sp; boost::scoped_ptr<Tango::Database> m_tangoDB_sp; }; } //End of namespace Loading Loading
proto/Request.proto +16 −4 Original line number Diff line number Diff line Loading @@ -7,7 +7,8 @@ message Request enum Type { AUTHORIZATION = 0; DATA = 1; VALIDATION = 1; DATA = 2; } required Type type = 1; Loading @@ -22,12 +23,23 @@ message Request optional Authorization authorization = 2; //Validation request message Validation { required string schema = 1; required string table = 2; } optional Validation validation = 3; //Data request message Data { required string file_path = 1; required int32 file_version = 2; required string file_name = 3; required int32 file_version = 1; required string file_name = 2; } optional Data data = 4; }
proto/Response.proto +24 −2 Original line number Diff line number Diff line Loading @@ -7,7 +7,8 @@ message Response enum Type { AUTHORIZATION = 0; DATA = 1; VALIDATION = 1; DATA = 2; } required Type type = 1; Loading @@ -28,6 +29,22 @@ message Response optional Authorization authorization = 2; //Validation response message Validation { enum State { ACCEPTED = 0; REJECTED = 1; } required State state = 1; required string status = 2; } optional Validation validation = 3; //Data response message Data Loading @@ -41,6 +58,11 @@ message Response required State state = 1; required string status = 2; required uint64 size = 3; required string file_path = 3; required int32 file_version = 4; required string file_name = 5; required uint64 size = 6; } optional Data data = 4; }
src/Client.cpp +17 −17 Original line number Diff line number Diff line Loading @@ -279,19 +279,19 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) m_readBuff.size() - HEADER_SIZE); // m_protocolManager_sp->processResponse(response_sp); if(m_protocolManager_sp->waitBeforeRequest()) { m_requestResponseTimer.expires_from_now( boost::posix_time::seconds(m_configuration_sp->getRefreshTime())); m_requestResponseTimer.async_wait( boost::bind(&Client::startWriteRequest, this)); } else { startWriteRequest(); } // // if(m_protocolManager_sp->waitBeforeRequest()) // { // m_requestResponseTimer.expires_from_now( // boost::posix_time::seconds(m_configuration_sp->getRefreshTime())); // // m_requestResponseTimer.async_wait( // boost::bind(&Client::startWriteRequest, this)); // } // else // { // startWriteRequest(); // } } catch(std::exception& ec) { Loading @@ -318,7 +318,7 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) } //============================================================================== // Client::resetConnection() // Client::handleReadData() //============================================================================== void Client::handleReadData(const boost::system::error_code& errorCode) { Loading @@ -342,7 +342,7 @@ void Client::resetConnection() m_resetConnectionTimer.expires_at(boost::posix_time::pos_infin); m_requestResponseTimer.expires_at(boost::posix_time::pos_infin); //m_protocolManager_sp->resetProtocolStatus(); m_protocolManager_sp->resetProtocolStatus(); closeConnection(); Loading
src/DBManager.cpp +74 −1 Original line number Diff line number Diff line Loading @@ -18,7 +18,11 @@ DBManager::DBManager(Tango::DeviceImpl* deviceImpl_p, { DEBUG_STREAM << "DBManager::DBManager()" << endl; m_deviceName = deviceImpl_p->get_name_lower(); m_session_sp.reset(new soci::session); m_tangoDB_sp.reset(new Tango::Database); } //============================================================================== Loading Loading @@ -53,13 +57,14 @@ void DBManager::connect() throw(soci::soci_error) boost::mutex::scoped_lock lock(m_connectionMutex); std::stringstream connection; connection << " host=" << m_configuration_sp->getDatabaseHost(); connection << " port=" << m_configuration_sp->getDatabasePort(); connection << " user=" << m_configuration_sp->getDatabaseUsername(); connection << " password=" << m_configuration_sp->getDatabasePassword(); #ifdef VERBOSE_DEBUG INFO_STREAM << "CONNECTION: " << connection.str() << endl; INFO_STREAM << "MAIN CONNECTION: " << connection.str() << endl; #endif m_session_sp->open(soci::mysql, connection.str()); Loading @@ -77,4 +82,72 @@ void DBManager::disconnect() m_session_sp->close(); } //============================================================================== // DBManager::retrieveLastTimestamp() //============================================================================== std::tm DBManager::retrieveLastTimestamp() throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveLastTimestamp()" << endl; boost::mutex::scoped_lock lock(m_connectionMutex); // if(m_auxSession_sp->get_backend() == NULL) // m_auxSession_sp->reconnect(); // // std::tm lastTimestamp; // // *m_auxSession_sp << "select coalesce(max(update_time),'1970-01-01 00:00:00') " // << "from " << m_configuration_sp->getDIDBSchema() << "." // << m_configuration_sp->getDIDBIndexTable(), soci::into(lastTimestamp); // // return lastTimestamp; } //============================================================================== // DBManager::persistLastTimestamp() //============================================================================== void DBManager::persistLastTimestamp(std::tm lastTimestamp) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::persistLastTimestamp()" << endl; boost::mutex::scoped_lock lock(m_connectionMutex); // if(m_auxSession_sp->get_backend() == NULL) // m_auxSession_sp->reconnect(); // // *m_auxSession_sp << "insert into " << m_configuration_sp->getDIDBSchema() // << "." << m_configuration_sp->getDIDBIndexTable() // << " values(:lastTimestamp)", soci::use(lastTimestamp, "lastTimestamp"); } //============================================================================== // DBManager::retrieveNewTuples() //============================================================================== DBManager::RowsetSP DBManager::retrieveNewTuples(std::tm lastTimestamp) throw(soci::soci_error, std::out_of_range) { DEBUG_STREAM << "DBManager::retrieveNewTuples()" << endl; boost::mutex::scoped_lock lock(m_connectionMutex); if(m_session_sp->get_backend() == NULL) m_session_sp->reconnect(); boost::posix_time::ptime timestamp = boost::posix_time::ptime_from_tm(lastTimestamp); // RowsetSP rows(new soci::rowset<soci::row>(m_mainSession_sp->prepare // << "select id, file_path, file_version, file_name, update_time from " // << m_configuration_sp->getDatabaseSchema() << "." // << m_configuration_sp->getDatabaseTable() // << " where update_time>'" // << boost::posix_time::to_iso_string(timestamp) // << "' order by update_time asc")); //return rows; } //============================================================================== // DBManager::retrieveNewTuples() //============================================================================== } //namespace
src/DBManager.h +25 −2 Original line number Diff line number Diff line Loading @@ -9,7 +9,6 @@ #define DBMANAGER_H #include <Configuration.h> //#include <Response.pb.h> #include <tango.h> Loading Loading @@ -68,17 +67,41 @@ public: virtual void disconnect(); //------------------------------------------------------------------------------ // [Public] Timestamp methods //------------------------------------------------------------------------------ virtual std::tm retrieveLastTimestamp() throw(soci::soci_error); virtual void persistLastTimestamp(std::tm) throw(soci::soci_error); //------------------------------------------------------------------------------ // [Public] Tuple methods //------------------------------------------------------------------------------ typedef boost::shared_ptr< boost::tuple< boost::optional<int>, boost::optional<std::string>, boost::optional<std::string>, boost::optional<int>, boost::optional<std::string>, boost::optional<std::tm> > > RowsetSP; virtual RowsetSP retrieveNewTuples(std::tm) throw(soci::soci_error, std::out_of_range); protected: //------------------------------------------------------------------------------ // [Protected] Class variables //------------------------------------------------------------------------------ //Device name std::string m_deviceName; //Configuration shared pointer Configuration::SP m_configuration_sp; //Connection mutex boost::mutex m_connectionMutex; //Database connection scoped pointer //Metadata database connection scoped pointer boost::scoped_ptr<soci::session> m_session_sp; boost::scoped_ptr<Tango::Database> m_tangoDB_sp; }; } //End of namespace Loading