#include #include #include #include namespace DataImporter_ns { //============================================================================== // DBManager::DBManager() //============================================================================== DBManager::DBManager(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_sp) : Tango::LogAdapter(deviceImpl_p), m_configuration_sp(configuration_sp) { DEBUG_STREAM << "DBManager::DBManager()" << endl; m_deviceName = deviceImpl_p->get_name_lower(); m_mainSession_sp.reset(new soci::session); m_auxSession_sp.reset(new soci::session); } //============================================================================== // DBManager::~DBManager() //============================================================================== DBManager::~DBManager() { DEBUG_STREAM << "DBManager::~DBManager()" << endl; m_mainSession_sp->close(); m_auxSession_sp->close(); } //============================================================================== // DBManager::create() //============================================================================== DBManager::SP DBManager::create(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_sp) { DBManager::SP d_sp(new DBManager(deviceImpl_p, configuration_sp), DBManager::Deleter()); return d_sp; } //============================================================================== // DBManager::connect() //============================================================================== void DBManager::connectAll() throw(soci::soci_error) { DEBUG_STREAM << "DBManager::connect()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); bool reconnect = true; std::stringstream connection; connection << " host=" << m_configuration_sp->getDatabaseHost(); connection << " port=" << m_configuration_sp->getDatabasePort(); connection << " user=" << m_configuration_sp->getDatabaseUsername(); connection << " password=" << m_configuration_sp->getDatabasePassword(); #ifdef VERBOSE_DEBUG INFO_STREAM << "MAIN CONNECTION: " << connection.str() << endl; #endif m_mainSession_sp->open(soci::mysql, connection.str()); soci::mysql_session_backend* mainBackend = static_cast( m_mainSession_sp->get_backend()); MYSQL* mainMysql = mainBackend->conn_; mysql_options(mainMysql, MYSQL_OPT_RECONNECT, &reconnect); connection.str(""); connection << " host=" << m_configuration_sp->getAuxDatabaseHost(); connection << " port=" << m_configuration_sp->getAuxDatabasePort(); connection << " user=" << m_configuration_sp->getAuxDatabaseUsername(); connection << " password=" << m_configuration_sp->getAuxDatabasePassword(); #ifdef VERBOSE_DEBUG INFO_STREAM << "AUX CONNECTION: " << connection.str() << endl; #endif m_auxSession_sp->open(soci::mysql, connection.str()); soci::mysql_session_backend* auxBackend = static_cast( m_auxSession_sp->get_backend()); MYSQL* auxMysql = auxBackend->conn_; mysql_options(auxMysql, MYSQL_OPT_RECONNECT, &reconnect); } //============================================================================== // DBManager::disconnect() //============================================================================== void DBManager::disconnectAll() { DEBUG_STREAM << "DBManager::disconnect()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); m_mainSession_sp->close(); m_auxSession_sp->close(); } //============================================================================== // DBManager::getMainTransaction() //============================================================================== DBManager::TransactionSP DBManager::getMainTransaction() { DEBUG_STREAM << "DBManager::getMainTransaction()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); TransactionSP transaction_sp(new soci::transaction(*m_mainSession_sp)); return transaction_sp; } //============================================================================== // DBManager::getAuxTransaction() //============================================================================== DBManager::TransactionSP DBManager::getAuxTransaction() { DEBUG_STREAM << "DBManager::getAuxTransaction()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); TransactionSP transaction_sp(new soci::transaction(*m_auxSession_sp)); return transaction_sp; } //============================================================================== // DBManager::retrieveLastTimestamp() //============================================================================== boost::posix_time::ptime DBManager::retrieveLastTimestamp() throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveLastTimestamp()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_auxSession_sp->get_backend() == NULL) m_auxSession_sp->reconnect(); std::tm tm_time; *m_auxSession_sp << "select coalesce(max(last_timestamp),'1970-01-01 00:00:00')" << " from "<< m_configuration_sp->getAuxDatabaseSchema() << "." << m_configuration_sp->getAuxDatabaseTimestampTable() << " where device_name like :deviceName", soci::use(m_deviceName, "deviceName"), soci::into(tm_time); return boost::posix_time::ptime_from_tm(tm_time); } //============================================================================== // DBManager::persistLastTimestamp() //============================================================================== void DBManager::persistLastTimestamp(boost::posix_time::ptime ptime) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::persistLastTimestamp()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_auxSession_sp->get_backend() == NULL) m_auxSession_sp->reconnect(); *m_auxSession_sp << "insert into " << m_configuration_sp->getAuxDatabaseSchema() << "." << m_configuration_sp->getAuxDatabaseTimestampTable() << " (device_name, last_timestamp) values ('" << m_deviceName << "','" << boost::posix_time::to_iso_string(ptime) << "') on duplicate key update" << " last_timestamp='" << boost::posix_time::to_iso_string(ptime) << "'"; } //============================================================================== // DBManager::retrieveNewFiles() //============================================================================== DBManager::FileRowsetSP DBManager::retrieveNewFiles(boost::posix_time::ptime ptime) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveNewFiles()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_mainSession_sp->get_backend() == NULL) m_mainSession_sp->reconnect(); std::string selectKey = m_configuration_sp->getSelectKey(); std::string selectValue = m_configuration_sp->getSelectValue(); FileRowsetSP newFileRowset_sp; if(selectKey.empty()) { newFileRowset_sp.reset(new FileRowset(m_mainSession_sp->prepare << "select storage_path, file_path, file_version, file_name, update_time " << "from " << m_configuration_sp->getDatabaseSchema() << "." << m_configuration_sp->getDatabaseTable() << " where update_time>'" << boost::posix_time::to_iso_string(ptime) << "' order by update_time asc")); } else { newFileRowset_sp.reset(new FileRowset(m_mainSession_sp->prepare << "select storage_path, file_path, file_version, file_name, update_time " << "from " << m_configuration_sp->getDatabaseSchema() << "." << m_configuration_sp->getDatabaseTable() << " where update_time>'" << boost::posix_time::to_iso_string(ptime) << "' and " << selectKey << " like '%" << selectValue << "%' " << "order by update_time asc")); } return newFileRowset_sp; } //============================================================================== // DBManager::updateNewFilePath() //============================================================================== void DBManager::updateNewFilePath(std::string storagePath, std::string filePath, int fileVersion, std::string fileName) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::updateNewFilePath()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_mainSession_sp->get_backend() == NULL) m_mainSession_sp->reconnect(); *m_mainSession_sp << "update " << m_configuration_sp->getDatabaseSchema() << "." << m_configuration_sp->getDatabaseTable() << " set storage_path = :storagePath, file_path = :filePath " << " where file_version = :fileVersion and file_name like :fileName", soci::use(storagePath, "storagePath"), soci::use(filePath, "filePath"), soci::use(fileVersion, "fileVersion"), soci::use(fileName, "fileName"); } //============================================================================== // DBManager::addFailedFile() //============================================================================== void DBManager::addFailedFile(int fileVersion, std::string fileName) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::addFailedFile()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_auxSession_sp->get_backend() == NULL) m_auxSession_sp->reconnect(); *m_auxSession_sp << "insert ignore into " << m_configuration_sp->getAuxDatabaseSchema() << "." << m_configuration_sp->getAuxDatabaseFailedTable() << " (device_name, file_version, file_name) values (:deviceName, " << ":fileVersion, :fileName)", soci::use(m_deviceName, "deviceName"), soci::use(fileVersion, "fileVersion"), soci::use(fileName, "fileName"); } //============================================================================== // DBManager::removeFailedFile() //============================================================================== void DBManager::removeFailedFile(int fileVersion, std::string fileName) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::removeFailedFile()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_auxSession_sp->get_backend() == NULL) m_auxSession_sp->reconnect(); *m_auxSession_sp << "delete from " << m_configuration_sp->getAuxDatabaseSchema() << "." << m_configuration_sp->getAuxDatabaseFailedTable() << " where device_name like :deviceName and file_version = :fileVersion" << " and file_name like :fileName", soci::use(m_deviceName, "deviceName"), soci::use(fileVersion, "fileVersion"), soci::use(fileName, "fileName"); } //============================================================================== // DBManager::retrieveFailedFiles() //============================================================================== DBManager::FileRowsetSP DBManager::retrieveFailedFiles() throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveFailedFiles()" << endl; boost::mutex::scoped_lock lock(m_sessionMutex); if(m_auxSession_sp->get_backend() == NULL) m_auxSession_sp->reconnect(); FileRowsetSP failedFileRowset_sp(new FileRowset( m_auxSession_sp->prepare << "select m.storage_path, m.file_path, " << " m.file_version, m.file_name, m.update_time from " << m_configuration_sp->getDatabaseSchema() << "." << m_configuration_sp->getDatabaseTable() << " as m join " << m_configuration_sp->getAuxDatabaseSchema() << "." << m_configuration_sp->getAuxDatabaseFailedTable() << " as f " << "on f.file_version = m.file_version and f.file_name = m.file_name " << "where device_name like '" << m_deviceName << "'")); return failedFileRowset_sp; } } //namespace