Loading src/DBManager.cpp +120 −69 Original line number Diff line number Diff line Loading @@ -20,9 +20,9 @@ DBManager::DBManager(Tango::DeviceImpl* deviceImpl_p, m_deviceName = deviceImpl_p->get_name_lower(); m_session_sp.reset(new soci::session); m_mainSession_sp.reset(new soci::session); m_tangoDB_sp.reset(new Tango::Database); m_auxSession_sp.reset(new soci::session); } //============================================================================== Loading @@ -32,7 +32,9 @@ DBManager::~DBManager() { DEBUG_STREAM << "DBManager::~DBManager()" << endl; m_session_sp->close(); m_mainSession_sp->close(); m_auxSession_sp->close(); } //============================================================================== Loading Loading @@ -67,7 +69,20 @@ void DBManager::connect() throw(soci::soci_error) INFO_STREAM << "MAIN CONNECTION: " << connection.str() << endl; #endif m_session_sp->open(soci::mysql, connection.str()); m_mainSession_sp->open(soci::mysql, connection.str()); connection.str(""); connection << " host=" << m_configuration_sp->getAuxDatabaseHost(); connection << " port=" << m_configuration_sp->getAuxDatabasePort(); connection << " user=" << m_configuration_sp->getAuxDatabaseUsername(); connection << " password=" << m_configuration_sp->getAuxDatabasePassword(); #ifdef VERBOSE_DEBUG INFO_STREAM << "AUX CONNECTION: " << connection.str() << endl; #endif m_auxSession_sp->open(soci::mysql, connection.str()); } //============================================================================== Loading @@ -79,102 +94,138 @@ void DBManager::disconnect() boost::mutex::scoped_lock lock(m_sessionMutex); m_session_sp->close(); m_mainSession_sp->close(); m_auxSession_sp->close(); } //============================================================================== // DBManager::retrieveNewFiles() //============================================================================== DBManager::NewFileRowsetSP DBManager::retrieveNewFiles(boost::posix_time::ptime ptime) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveNewFiles()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_mainSession_sp->get_backend() == NULL) m_mainSession_sp->reconnect(); NewFileRowsetSP newFileRowset_sp(new NewFileRowset(m_mainSession_sp->prepare << "select file_version, file_name, update_time from " << m_configuration_sp->getDatabaseSchema() << "." << m_configuration_sp->getDatabaseTable() << " where update_time>'" << boost::posix_time::to_iso_string(ptime) << "' order by update_time asc")); return newFileRowset_sp; } //============================================================================== // DBManager::retrieveLastTimestamp() //============================================================================== boost::posix_time::ptime DBManager::retrieveLastTimestamp() throw(std::runtime_error, std::out_of_range) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveLastTimestamp()" << endl; boost::mutex::scoped_lock lock(m_tangoDBMutex); boost::mutex::scoped_lock lock(m_sessionMutex); Tango::DbData db_data; db_data.push_back(Tango::DbDatum("LastTimestamp")); if(m_auxSession_sp->get_backend() == NULL) m_auxSession_sp->reconnect(); try { m_tangoDB_sp->get_device_property(m_deviceName, db_data); } catch(Tango::DevFailed& ex) { std::stringstream error_msg; for (unsigned int i=0; i<ex.errors.length(); i++) { error_msg << ex.errors[i].reason.in() << endl; error_msg << ex.errors[i].desc.in() << endl; error_msg << ex.errors[i].origin.in() << endl; } throw std::runtime_error(error_msg.str()); } std::tm tm_time; std::string timestamp("1970-01-01 00:00:00"); db_data[0] >> timestamp; *m_auxSession_sp << "select coalesce(last_timestamp,'1970-01-01 00:00:00')" << " from "<< m_configuration_sp->getAuxDatabaseSchema() << "." << m_configuration_sp->getAuxDatabaseTimestampTable() << " where device_name like :deviceName", soci::use(m_deviceName, "deviceName"), soci::into(tm_time); return boost::posix_time::time_from_string(timestamp); return boost::posix_time::ptime_from_tm(tm_time); } //============================================================================== // DBManager::persistLastTimestamp() //============================================================================== void DBManager::persistLastTimestamp(boost::posix_time::ptime ptime) throw(std::runtime_error) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::persistLastTimestamp()" << endl; boost::mutex::scoped_lock lock(m_tangoDBMutex); std::string timestampString = boost::posix_time::to_simple_string(ptime); Tango::DbDatum timestamp("LastTimestamp"); timestamp << timestampString; boost::mutex::scoped_lock lock(m_sessionMutex); Tango::DbData db_data; db_data.push_back(timestamp); if(m_auxSession_sp->get_backend() == NULL) m_auxSession_sp->reconnect(); try { m_tangoDB_sp->put_device_property(m_deviceName, db_data); *m_auxSession_sp << "insert into " << m_configuration_sp->getAuxDatabaseSchema() << "." << m_configuration_sp->getAuxDatabaseTimestampTable() << " (device_name, last_timestamp) values ('" << m_deviceName << "','" << boost::posix_time::to_iso_string(ptime) << "') on duplicate key update" << " last_timestamp='" << boost::posix_time::to_iso_string(ptime) << "'"; } catch(Tango::DevFailed& ex) { std::stringstream error_msg; for (unsigned int i=0; i<ex.errors.length(); i++) //============================================================================== // DBManager::addFailedFile() //============================================================================== void DBManager::addFailedFile(int fileVersion, std::string fileName) throw(soci::soci_error) { error_msg << ex.errors[i].reason.in() << endl; error_msg << ex.errors[i].desc.in() << endl; error_msg << ex.errors[i].origin.in() << endl; } throw std::runtime_error(error_msg.str()); } DEBUG_STREAM << "DBManager::addFailedFile()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_auxSession_sp->get_backend() == NULL) m_auxSession_sp->reconnect(); *m_auxSession_sp << "insert ignore into " << m_configuration_sp->getAuxDatabaseSchema() << "." << m_configuration_sp->getAuxDatabaseFailedTable() << " (device_name, file_version, file_name) values (:deviceName, " << ":fileVersion, :fileName)", soci::use(m_deviceName, "deviceName"), soci::use(fileVersion, "fileVersion"), soci::use(fileName, "fileName"); } //============================================================================== // DBManager::retrieveNewFile() // DBManager::removeFailedFile() //============================================================================== DBManager::FileRowsetSP DBManager::retrieveNewFile(std::string schema, std::string table, boost::posix_time::ptime ptime) throw(soci::soci_error) void DBManager::removeFailedFile(int fileVersion, std::string fileName) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveNewFile()" << endl; DEBUG_STREAM << "DBManager::removeFailedFile()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_session_sp->get_backend() == NULL) m_session_sp->reconnect(); if(m_auxSession_sp->get_backend() == NULL) m_auxSession_sp->reconnect(); FileRowsetSP fileRowset_sp(new FileRowset(m_session_sp->prepare << "select" << " file_version, file_name, update_time from " << schema << "." << table << " where update_time>'" << boost::posix_time::to_iso_string(ptime) << "' order by update_time asc")); return fileRowset_sp; *m_auxSession_sp << "delete from " << m_configuration_sp->getAuxDatabaseSchema() << "." << m_configuration_sp->getAuxDatabaseFailedTable() << " where device_name like :deviceName and file_version = :fileVersion" << " and file_name like :fileName", soci::use(m_deviceName, "deviceName"), soci::use(fileVersion, "fileVersion"), soci::use(fileName, "fileName"); } //============================================================================== // DBManager::retrieveNewTuples() // DBManager::retrieveFailedFiles() //============================================================================== DBManager::FailedFileRowsetSP DBManager::retrieveFailedFiles(boost::posix_time::ptime ptime) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveFailedFiles()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_auxSession_sp->get_backend() == NULL) m_auxSession_sp->reconnect(); FailedFileRowsetSP failedFileRowset_sp(new FailedFileRowsetSP( m_auxSession_sp->prepare << "select file_version, file_name from " << m_configuration_sp->getAuxDatabaseSchema() << "." << m_configuration_sp->getAuxDatabaseFailedTable() << " where device_name like '" << m_deviceName << "'")); return failedFileRowset_sp; } } //namespace src/DBManager.h +32 −16 Original line number Diff line number Diff line Loading @@ -67,27 +67,46 @@ public: virtual void disconnect(); //------------------------------------------------------------------------------ // [Public] New file method //------------------------------------------------------------------------------ typedef boost::tuple< boost::optional<int>, boost::optional<std::string>, boost::optional<std::tm> > NewFileRow; typedef soci::rowset< NewFileRow > NewFileRowset; typedef boost::shared_ptr< NewFileRowset > NewFileRowsetSP; virtual NewFileRowsetSP retrieveNewFiles(boost::posix_time::ptime) throw(soci::soci_error); //------------------------------------------------------------------------------ // [Public] Timestamp methods //------------------------------------------------------------------------------ virtual boost::posix_time::ptime retrieveLastTimestamp() throw(std::runtime_error, std::out_of_range); throw(soci::soci_error); virtual void persistLastTimestamp(boost::posix_time::ptime) throw(std::runtime_error); throw(soci::soci_error); //------------------------------------------------------------------------------ // [Public] New file method // [Public] Failed file methods //------------------------------------------------------------------------------ typedef boost::tuple< boost::optional<int>, boost::optional<std::string>, boost::optional<std::tm> > FileRow; virtual void addFailedFile(int, std::string) throw(soci::soci_error); typedef soci::rowset< FileRow > FileRowset; virtual void removeFailedFile(int, std::string) throw(soci::soci_error); typedef boost::shared_ptr< FileRowset > FileRowsetSP; typedef boost::tuple< boost::optional<int>, boost::optional<std::string> > FailedFileRow; virtual FileRowsetSP retrieveNewFile(std::string, std::string, boost::posix_time::ptime) throw(soci::soci_error); typedef soci::rowset< FailedFileRow > FailedFileRowset; typedef boost::shared_ptr< FailedFileRowset > FailedFileRowsetSP; virtual FailedFileRowsetSP retrieveFailedFiles(boost::posix_time::ptime) throw(soci::soci_error); protected: //------------------------------------------------------------------------------ Loading @@ -102,14 +121,11 @@ protected: //Metadata database connection mutex boost::mutex m_sessionMutex; //Metadata database connection scoped pointer boost::scoped_ptr<soci::session> m_session_sp; //Tango database connection mutex boost::mutex m_tangoDBMutex; //Main database connection scoped pointer boost::scoped_ptr<soci::session> m_mainSession_sp; //Tango database connection scoped pointer boost::scoped_ptr<Tango::Database> m_tangoDB_sp; //Auxiliary database connection scoped pointer boost::scoped_ptr<soci::session> m_auxSession_sp; }; } //End of namespace Loading Loading
src/DBManager.cpp +120 −69 Original line number Diff line number Diff line Loading @@ -20,9 +20,9 @@ DBManager::DBManager(Tango::DeviceImpl* deviceImpl_p, m_deviceName = deviceImpl_p->get_name_lower(); m_session_sp.reset(new soci::session); m_mainSession_sp.reset(new soci::session); m_tangoDB_sp.reset(new Tango::Database); m_auxSession_sp.reset(new soci::session); } //============================================================================== Loading @@ -32,7 +32,9 @@ DBManager::~DBManager() { DEBUG_STREAM << "DBManager::~DBManager()" << endl; m_session_sp->close(); m_mainSession_sp->close(); m_auxSession_sp->close(); } //============================================================================== Loading Loading @@ -67,7 +69,20 @@ void DBManager::connect() throw(soci::soci_error) INFO_STREAM << "MAIN CONNECTION: " << connection.str() << endl; #endif m_session_sp->open(soci::mysql, connection.str()); m_mainSession_sp->open(soci::mysql, connection.str()); connection.str(""); connection << " host=" << m_configuration_sp->getAuxDatabaseHost(); connection << " port=" << m_configuration_sp->getAuxDatabasePort(); connection << " user=" << m_configuration_sp->getAuxDatabaseUsername(); connection << " password=" << m_configuration_sp->getAuxDatabasePassword(); #ifdef VERBOSE_DEBUG INFO_STREAM << "AUX CONNECTION: " << connection.str() << endl; #endif m_auxSession_sp->open(soci::mysql, connection.str()); } //============================================================================== Loading @@ -79,102 +94,138 @@ void DBManager::disconnect() boost::mutex::scoped_lock lock(m_sessionMutex); m_session_sp->close(); m_mainSession_sp->close(); m_auxSession_sp->close(); } //============================================================================== // DBManager::retrieveNewFiles() //============================================================================== DBManager::NewFileRowsetSP DBManager::retrieveNewFiles(boost::posix_time::ptime ptime) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveNewFiles()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_mainSession_sp->get_backend() == NULL) m_mainSession_sp->reconnect(); NewFileRowsetSP newFileRowset_sp(new NewFileRowset(m_mainSession_sp->prepare << "select file_version, file_name, update_time from " << m_configuration_sp->getDatabaseSchema() << "." << m_configuration_sp->getDatabaseTable() << " where update_time>'" << boost::posix_time::to_iso_string(ptime) << "' order by update_time asc")); return newFileRowset_sp; } //============================================================================== // DBManager::retrieveLastTimestamp() //============================================================================== boost::posix_time::ptime DBManager::retrieveLastTimestamp() throw(std::runtime_error, std::out_of_range) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveLastTimestamp()" << endl; boost::mutex::scoped_lock lock(m_tangoDBMutex); boost::mutex::scoped_lock lock(m_sessionMutex); Tango::DbData db_data; db_data.push_back(Tango::DbDatum("LastTimestamp")); if(m_auxSession_sp->get_backend() == NULL) m_auxSession_sp->reconnect(); try { m_tangoDB_sp->get_device_property(m_deviceName, db_data); } catch(Tango::DevFailed& ex) { std::stringstream error_msg; for (unsigned int i=0; i<ex.errors.length(); i++) { error_msg << ex.errors[i].reason.in() << endl; error_msg << ex.errors[i].desc.in() << endl; error_msg << ex.errors[i].origin.in() << endl; } throw std::runtime_error(error_msg.str()); } std::tm tm_time; std::string timestamp("1970-01-01 00:00:00"); db_data[0] >> timestamp; *m_auxSession_sp << "select coalesce(last_timestamp,'1970-01-01 00:00:00')" << " from "<< m_configuration_sp->getAuxDatabaseSchema() << "." << m_configuration_sp->getAuxDatabaseTimestampTable() << " where device_name like :deviceName", soci::use(m_deviceName, "deviceName"), soci::into(tm_time); return boost::posix_time::time_from_string(timestamp); return boost::posix_time::ptime_from_tm(tm_time); } //============================================================================== // DBManager::persistLastTimestamp() //============================================================================== void DBManager::persistLastTimestamp(boost::posix_time::ptime ptime) throw(std::runtime_error) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::persistLastTimestamp()" << endl; boost::mutex::scoped_lock lock(m_tangoDBMutex); std::string timestampString = boost::posix_time::to_simple_string(ptime); Tango::DbDatum timestamp("LastTimestamp"); timestamp << timestampString; boost::mutex::scoped_lock lock(m_sessionMutex); Tango::DbData db_data; db_data.push_back(timestamp); if(m_auxSession_sp->get_backend() == NULL) m_auxSession_sp->reconnect(); try { m_tangoDB_sp->put_device_property(m_deviceName, db_data); *m_auxSession_sp << "insert into " << m_configuration_sp->getAuxDatabaseSchema() << "." << m_configuration_sp->getAuxDatabaseTimestampTable() << " (device_name, last_timestamp) values ('" << m_deviceName << "','" << boost::posix_time::to_iso_string(ptime) << "') on duplicate key update" << " last_timestamp='" << boost::posix_time::to_iso_string(ptime) << "'"; } catch(Tango::DevFailed& ex) { std::stringstream error_msg; for (unsigned int i=0; i<ex.errors.length(); i++) //============================================================================== // DBManager::addFailedFile() //============================================================================== void DBManager::addFailedFile(int fileVersion, std::string fileName) throw(soci::soci_error) { error_msg << ex.errors[i].reason.in() << endl; error_msg << ex.errors[i].desc.in() << endl; error_msg << ex.errors[i].origin.in() << endl; } throw std::runtime_error(error_msg.str()); } DEBUG_STREAM << "DBManager::addFailedFile()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_auxSession_sp->get_backend() == NULL) m_auxSession_sp->reconnect(); *m_auxSession_sp << "insert ignore into " << m_configuration_sp->getAuxDatabaseSchema() << "." << m_configuration_sp->getAuxDatabaseFailedTable() << " (device_name, file_version, file_name) values (:deviceName, " << ":fileVersion, :fileName)", soci::use(m_deviceName, "deviceName"), soci::use(fileVersion, "fileVersion"), soci::use(fileName, "fileName"); } //============================================================================== // DBManager::retrieveNewFile() // DBManager::removeFailedFile() //============================================================================== DBManager::FileRowsetSP DBManager::retrieveNewFile(std::string schema, std::string table, boost::posix_time::ptime ptime) throw(soci::soci_error) void DBManager::removeFailedFile(int fileVersion, std::string fileName) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveNewFile()" << endl; DEBUG_STREAM << "DBManager::removeFailedFile()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_session_sp->get_backend() == NULL) m_session_sp->reconnect(); if(m_auxSession_sp->get_backend() == NULL) m_auxSession_sp->reconnect(); FileRowsetSP fileRowset_sp(new FileRowset(m_session_sp->prepare << "select" << " file_version, file_name, update_time from " << schema << "." << table << " where update_time>'" << boost::posix_time::to_iso_string(ptime) << "' order by update_time asc")); return fileRowset_sp; *m_auxSession_sp << "delete from " << m_configuration_sp->getAuxDatabaseSchema() << "." << m_configuration_sp->getAuxDatabaseFailedTable() << " where device_name like :deviceName and file_version = :fileVersion" << " and file_name like :fileName", soci::use(m_deviceName, "deviceName"), soci::use(fileVersion, "fileVersion"), soci::use(fileName, "fileName"); } //============================================================================== // DBManager::retrieveNewTuples() // DBManager::retrieveFailedFiles() //============================================================================== DBManager::FailedFileRowsetSP DBManager::retrieveFailedFiles(boost::posix_time::ptime ptime) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveFailedFiles()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_auxSession_sp->get_backend() == NULL) m_auxSession_sp->reconnect(); FailedFileRowsetSP failedFileRowset_sp(new FailedFileRowsetSP( m_auxSession_sp->prepare << "select file_version, file_name from " << m_configuration_sp->getAuxDatabaseSchema() << "." << m_configuration_sp->getAuxDatabaseFailedTable() << " where device_name like '" << m_deviceName << "'")); return failedFileRowset_sp; } } //namespace
src/DBManager.h +32 −16 Original line number Diff line number Diff line Loading @@ -67,27 +67,46 @@ public: virtual void disconnect(); //------------------------------------------------------------------------------ // [Public] New file method //------------------------------------------------------------------------------ typedef boost::tuple< boost::optional<int>, boost::optional<std::string>, boost::optional<std::tm> > NewFileRow; typedef soci::rowset< NewFileRow > NewFileRowset; typedef boost::shared_ptr< NewFileRowset > NewFileRowsetSP; virtual NewFileRowsetSP retrieveNewFiles(boost::posix_time::ptime) throw(soci::soci_error); //------------------------------------------------------------------------------ // [Public] Timestamp methods //------------------------------------------------------------------------------ virtual boost::posix_time::ptime retrieveLastTimestamp() throw(std::runtime_error, std::out_of_range); throw(soci::soci_error); virtual void persistLastTimestamp(boost::posix_time::ptime) throw(std::runtime_error); throw(soci::soci_error); //------------------------------------------------------------------------------ // [Public] New file method // [Public] Failed file methods //------------------------------------------------------------------------------ typedef boost::tuple< boost::optional<int>, boost::optional<std::string>, boost::optional<std::tm> > FileRow; virtual void addFailedFile(int, std::string) throw(soci::soci_error); typedef soci::rowset< FileRow > FileRowset; virtual void removeFailedFile(int, std::string) throw(soci::soci_error); typedef boost::shared_ptr< FileRowset > FileRowsetSP; typedef boost::tuple< boost::optional<int>, boost::optional<std::string> > FailedFileRow; virtual FileRowsetSP retrieveNewFile(std::string, std::string, boost::posix_time::ptime) throw(soci::soci_error); typedef soci::rowset< FailedFileRow > FailedFileRowset; typedef boost::shared_ptr< FailedFileRowset > FailedFileRowsetSP; virtual FailedFileRowsetSP retrieveFailedFiles(boost::posix_time::ptime) throw(soci::soci_error); protected: //------------------------------------------------------------------------------ Loading @@ -102,14 +121,11 @@ protected: //Metadata database connection mutex boost::mutex m_sessionMutex; //Metadata database connection scoped pointer boost::scoped_ptr<soci::session> m_session_sp; //Tango database connection mutex boost::mutex m_tangoDBMutex; //Main database connection scoped pointer boost::scoped_ptr<soci::session> m_mainSession_sp; //Tango database connection scoped pointer boost::scoped_ptr<Tango::Database> m_tangoDB_sp; //Auxiliary database connection scoped pointer boost::scoped_ptr<soci::session> m_auxSession_sp; }; } //End of namespace Loading