Loading src/Configuration.h +4 −43 Original line number Diff line number Diff line Loading @@ -19,9 +19,6 @@ private: // [Private] Constructor destructor deleter //------------------------------------------------------------------------------ Configuration(std::string certificateFile, std::string storagePath, std::string dIDBHost, unsigned int dIDBPort, std::string dIDBUser, std::string dIDBPassword, std::string dIDBSchema, std::string dIDBIndexTable, std::string dIDBRejectedTable, std::string remoteHost, unsigned int remotePort, std::string remoteUsername, std::string remotePassword, std::string databaseHost, unsigned int databasePort, Loading @@ -29,9 +26,6 @@ private: std::string databaseSchema, std::string databaseTable, unsigned int refreshTime, unsigned int timeout) : m_certificateFile (certificateFile), m_storagePath(storagePath), m_dIDBHost(dIDBHost), m_dIDBPort(dIDBPort), m_dIDBUser(dIDBUser), m_dIDBPassword(dIDBPassword), m_dIDBSchema(dIDBSchema), m_dIDBIndexTable(dIDBIndexTable), m_dIDBRejectedTable(dIDBRejectedTable), m_remoteHost(remoteHost), m_remotePort(remotePort), m_remoteUsername(remoteUsername), m_remotePassword(remotePassword), m_databaseHost(databaseHost), m_databasePort(databasePort), Loading @@ -54,10 +48,7 @@ public: // [Public] Create class method //------------------------------------------------------------------------------ static Configuration::SP create(std::string certificateFile, std::string storagePath, std::string dIDBHost, unsigned int dIDBPort, std::string dIDBUser, std::string dIDBPassword, std::string dIDBSchema, std::string dIDBIndexTable, std::string dIDBRejectedTable, std::string remoteHost, unsigned int remotePort, std::string storagePath, std::string remoteHost, unsigned int remotePort, std::string remoteUsername, std::string remotePassword, std::string databaseHost, unsigned int databasePort, std::string databaseUsername, std::string databasePassword, Loading @@ -65,11 +56,9 @@ public: unsigned int refreshTime, unsigned int timeout) { Configuration::SP c_sp(new Configuration(certificateFile, storagePath, dIDBHost, dIDBPort, dIDBUser, dIDBPassword, dIDBSchema, dIDBIndexTable, dIDBRejectedTable, remoteHost, remotePort, remoteUsername, remotePassword, databaseHost, databasePort, databaseUsername, databasePassword, databaseSchema, databaseTable, refreshTime, timeout), Configuration::Deleter()); remoteHost, remotePort, remoteUsername, remotePassword, databaseHost, databasePort, databaseUsername, databasePassword, databaseSchema, databaseTable, refreshTime, timeout), Configuration::Deleter()); return c_sp; } Loading @@ -79,13 +68,6 @@ public: //------------------------------------------------------------------------------ std::string getCertificateFile() const { return m_certificateFile; } std::string getStoragePath() const { return m_storagePath; } std::string getDIDBHost() const { return m_dIDBHost; } unsigned int getDIDBPort() const { return m_dIDBPort; } std::string getDIDBUser() const { return m_dIDBUser; } std::string getDIDBPassword() const { return m_dIDBPassword; } std::string getDIDBSchema() const { return m_dIDBSchema; } std::string getDIDBIndexTable() const { return m_dIDBIndexTable; } std::string getDIDBRejectedTable() const { return m_dIDBRejectedTable; } std::string getRemoteHost() const { return m_remoteHost; } unsigned int getRemotePort() const { return m_remotePort; } std::string getRemoteUsername() const { return m_remoteUsername; } Loading @@ -109,27 +91,6 @@ private: //Absolute path to storage const std::string m_storagePath; //Host where data import database is running const std::string m_dIDBHost; //Port where data import database is listening const unsigned int m_dIDBPort; //User to login in data import database const std::string m_dIDBUser; //Password to login in data import database const std::string m_dIDBPassword; //Schema where data import tables are located const std::string m_dIDBSchema; //Index table name const std::string m_dIDBIndexTable; //Rejected table name const std::string m_dIDBRejectedTable; //Metadata exporter remote host const std::string m_remoteHost; Loading src/DBManager.cpp +69 −35 Original line number Diff line number Diff line Loading @@ -54,7 +54,7 @@ void DBManager::connect() throw(soci::soci_error) { DEBUG_STREAM << "DBManager::connect()" << endl; boost::mutex::scoped_lock lock(m_connectionMutex); boost::mutex::scoped_lock lock(m_sessionMutex); std::stringstream connection; Loading @@ -77,7 +77,7 @@ void DBManager::disconnect() { DEBUG_STREAM << "DBManager::disconnect()" << endl; boost::mutex::scoped_lock lock(m_connectionMutex); boost::mutex::scoped_lock lock(m_sessionMutex); m_session_sp->close(); } Loading @@ -85,65 +85,99 @@ void DBManager::disconnect() //============================================================================== // DBManager::retrieveLastTimestamp() //============================================================================== std::tm DBManager::retrieveLastTimestamp() throw(soci::soci_error) std::tm DBManager::retrieveLastTimestamp() throw(std::runtime_error, std::out_of_range) { DEBUG_STREAM << "DBManager::retrieveLastTimestamp()" << endl; boost::mutex::scoped_lock lock(m_connectionMutex); // if(m_auxSession_sp->get_backend() == NULL) // m_auxSession_sp->reconnect(); // // std::tm lastTimestamp; // // *m_auxSession_sp << "select coalesce(max(update_time),'1970-01-01 00:00:00') " // << "from " << m_configuration_sp->getDIDBSchema() << "." // << m_configuration_sp->getDIDBIndexTable(), soci::into(lastTimestamp); // // return lastTimestamp; boost::mutex::scoped_lock lock(m_tangoDBMutex); Tango::DbData db_data; db_data.push_back(Tango::DbDatum("LastTimestamp")); try { m_tangoDB_sp->get_device_property(m_deviceName, db_data); } catch(Tango::DevFailed& ex) { std::stringstream error_msg; for (unsigned int i=0; i<ex.errors.length(); i++) { error_msg << ex.errors[i].reason.in() << endl; error_msg << ex.errors[i].desc.in() << endl; error_msg << ex.errors[i].origin.in() << endl; } throw std::runtime_error(error_msg.str()); } std::string timestamp("1970-01-01 00:00:00"); db_data[0] >> timestamp; boost::posix_time::ptime ptime = boost::posix_time::time_from_string(timestamp); return boost::posix_time::to_tm(ptime); } //============================================================================== // DBManager::persistLastTimestamp() //============================================================================== void DBManager::persistLastTimestamp(std::tm lastTimestamp) throw(soci::soci_error) void DBManager::persistLastTimestamp(std::tm lastTimestamp) throw(std::runtime_error, std::out_of_range) { DEBUG_STREAM << "DBManager::persistLastTimestamp()" << endl; boost::mutex::scoped_lock lock(m_connectionMutex); boost::mutex::scoped_lock lock(m_tangoDBMutex); // if(m_auxSession_sp->get_backend() == NULL) // m_auxSession_sp->reconnect(); // // *m_auxSession_sp << "insert into " << m_configuration_sp->getDIDBSchema() // << "." << m_configuration_sp->getDIDBIndexTable() // << " values(:lastTimestamp)", soci::use(lastTimestamp, "lastTimestamp"); boost::posix_time::ptime ptime = boost::posix_time::ptime_from_tm(lastTimestamp); std::string timestampString = boost::posix_time::to_simple_string(ptime); Tango::DbDatum timestamp("LastTimestamp"); timestamp << timestampString; Tango::DbData db_data; db_data.push_back(timestamp); try { m_tangoDB_sp->put_device_property(m_deviceName, db_data); } catch(Tango::DevFailed& ex) { std::stringstream error_msg; for (unsigned int i=0; i<ex.errors.length(); i++) { error_msg << ex.errors[i].reason.in() << endl; error_msg << ex.errors[i].desc.in() << endl; error_msg << ex.errors[i].origin.in() << endl; } throw std::runtime_error(error_msg.str()); } } //============================================================================== // DBManager::retrieveNewTuples() //============================================================================== DBManager::RowsetSP DBManager::retrieveNewTuples(std::tm lastTimestamp) throw(soci::soci_error, std::out_of_range) DBManager::FileRowsetSP DBManager::retrieveNewTuples(std::string schema, std::string table, std::tm lastTimestamp) throw(soci::soci_error, std::out_of_range) { DEBUG_STREAM << "DBManager::retrieveNewTuples()" << endl; boost::mutex::scoped_lock lock(m_connectionMutex); boost::mutex::scoped_lock lock(m_sessionMutex); if(m_session_sp->get_backend() == NULL) m_session_sp->reconnect(); boost::posix_time::ptime timestamp = boost::posix_time::ptime_from_tm(lastTimestamp); // RowsetSP rows(new soci::rowset<soci::row>(m_mainSession_sp->prepare // << "select id, file_path, file_version, file_name, update_time from " // << m_configuration_sp->getDatabaseSchema() << "." // << m_configuration_sp->getDatabaseTable() // << " where update_time>'" // << boost::posix_time::to_iso_string(timestamp) // << "' order by update_time asc")); FileRowsetSP fileRowset_sp(new FileRowset(m_session_sp->prepare << "select" << " file_version, file_name, update_time from " << schema << "." << table << " where update_time>'" << boost::posix_time::to_iso_string(timestamp) << "' order by update_time asc")); //return rows; return fileRowset_sp; } //============================================================================== Loading src/DBManager.h +20 −12 Original line number Diff line number Diff line Loading @@ -70,38 +70,46 @@ public: //------------------------------------------------------------------------------ // [Public] Timestamp methods //------------------------------------------------------------------------------ virtual std::tm retrieveLastTimestamp() throw(soci::soci_error); virtual std::tm retrieveLastTimestamp() throw(std::runtime_error, std::out_of_range); virtual void persistLastTimestamp(std::tm) throw(soci::soci_error); virtual void persistLastTimestamp(std::tm) throw(std::runtime_error, std::out_of_range); //------------------------------------------------------------------------------ // [Public] Tuple methods //------------------------------------------------------------------------------ typedef boost::shared_ptr< boost::tuple< boost::optional<int>, boost::optional<std::string>, boost::optional<std::string>, boost::optional<int>, boost::optional<std::string>, boost::optional<std::tm> > > RowsetSP; typedef boost::tuple< boost::optional<int>, boost::optional<std::string>, boost::optional<std::tm> > FileRow; virtual RowsetSP retrieveNewTuples(std::tm) typedef soci::rowset< FileRow > FileRowset; typedef boost::shared_ptr< FileRowset > FileRowsetSP; virtual FileRowsetSP retrieveNewTuples(std::string, std::string, std::tm) throw(soci::soci_error, std::out_of_range); protected: //------------------------------------------------------------------------------ // [Protected] Class variables //------------------------------------------------------------------------------ //Device name std::string m_deviceName; //Configuration shared pointer Configuration::SP m_configuration_sp; //Connection mutex boost::mutex m_connectionMutex; //Metadata database connection mutex boost::mutex m_sessionMutex; //Metadata database connection scoped pointer boost::scoped_ptr<soci::session> m_session_sp; //Tango database connection mutex boost::mutex m_tangoDBMutex; //Tango database connection scoped pointer boost::scoped_ptr<Tango::Database> m_tangoDB_sp; //Device name std::string m_deviceName; }; } //End of namespace Loading src/DataImporter.cpp +3 −109 Original line number Diff line number Diff line Loading @@ -203,13 +203,6 @@ void DataImporter::get_device_property() Tango::DbData dev_prop; dev_prop.push_back(Tango::DbDatum("CertificateFile")); dev_prop.push_back(Tango::DbDatum("StoragePath")); dev_prop.push_back(Tango::DbDatum("DIDBHost")); dev_prop.push_back(Tango::DbDatum("DIDBPort")); dev_prop.push_back(Tango::DbDatum("DIDBUser")); dev_prop.push_back(Tango::DbDatum("DIDBPassword")); dev_prop.push_back(Tango::DbDatum("DIDBSchema")); dev_prop.push_back(Tango::DbDatum("DIDBIndexTable")); dev_prop.push_back(Tango::DbDatum("DIDBRejectedTable")); dev_prop.push_back(Tango::DbDatum("RemoteHost")); dev_prop.push_back(Tango::DbDatum("RemotePort")); dev_prop.push_back(Tango::DbDatum("RemoteUsername")); Loading Loading @@ -260,83 +253,6 @@ void DataImporter::get_device_property() // And try to extract StoragePath value from database if (dev_prop[i].is_empty()==false) dev_prop[i] >> storagePath; // Try to initialize DIDBHost from class property cl_prop = ds_class->get_class_property(dev_prop[++i].name); if (cl_prop.is_empty()==false) cl_prop >> dIDBHost; else { // Try to initialize DIDBHost from default device value def_prop = ds_class->get_default_device_property(dev_prop[i].name); if (def_prop.is_empty()==false) def_prop >> dIDBHost; } // And try to extract DIDBHost value from database if (dev_prop[i].is_empty()==false) dev_prop[i] >> dIDBHost; // Try to initialize DIDBPort from class property cl_prop = ds_class->get_class_property(dev_prop[++i].name); if (cl_prop.is_empty()==false) cl_prop >> dIDBPort; else { // Try to initialize DIDBPort from default device value def_prop = ds_class->get_default_device_property(dev_prop[i].name); if (def_prop.is_empty()==false) def_prop >> dIDBPort; } // And try to extract DIDBPort value from database if (dev_prop[i].is_empty()==false) dev_prop[i] >> dIDBPort; // Try to initialize DIDBUser from class property cl_prop = ds_class->get_class_property(dev_prop[++i].name); if (cl_prop.is_empty()==false) cl_prop >> dIDBUser; else { // Try to initialize DIDBUser from default device value def_prop = ds_class->get_default_device_property(dev_prop[i].name); if (def_prop.is_empty()==false) def_prop >> dIDBUser; } // And try to extract DIDBUser value from database if (dev_prop[i].is_empty()==false) dev_prop[i] >> dIDBUser; // Try to initialize DIDBPassword from class property cl_prop = ds_class->get_class_property(dev_prop[++i].name); if (cl_prop.is_empty()==false) cl_prop >> dIDBPassword; else { // Try to initialize DIDBPassword from default device value def_prop = ds_class->get_default_device_property(dev_prop[i].name); if (def_prop.is_empty()==false) def_prop >> dIDBPassword; } // And try to extract DIDBPassword value from database if (dev_prop[i].is_empty()==false) dev_prop[i] >> dIDBPassword; // Try to initialize DIDBSchema from class property cl_prop = ds_class->get_class_property(dev_prop[++i].name); if (cl_prop.is_empty()==false) cl_prop >> dIDBSchema; else { // Try to initialize DIDBSchema from default device value def_prop = ds_class->get_default_device_property(dev_prop[i].name); if (def_prop.is_empty()==false) def_prop >> dIDBSchema; } // And try to extract DIDBSchema value from database if (dev_prop[i].is_empty()==false) dev_prop[i] >> dIDBSchema; // Try to initialize DIDBIndexTable from class property cl_prop = ds_class->get_class_property(dev_prop[++i].name); if (cl_prop.is_empty()==false) cl_prop >> dIDBIndexTable; else { // Try to initialize DIDBIndexTable from default device value def_prop = ds_class->get_default_device_property(dev_prop[i].name); if (def_prop.is_empty()==false) def_prop >> dIDBIndexTable; } // And try to extract DIDBIndexTable value from database if (dev_prop[i].is_empty()==false) dev_prop[i] >> dIDBIndexTable; // Try to initialize DIDBRejectedTable from class property cl_prop = ds_class->get_class_property(dev_prop[++i].name); if (cl_prop.is_empty()==false) cl_prop >> dIDBRejectedTable; else { // Try to initialize DIDBRejectedTable from default device value def_prop = ds_class->get_default_device_property(dev_prop[i].name); if (def_prop.is_empty()==false) def_prop >> dIDBRejectedTable; } // And try to extract DIDBRejectedTable value from database if (dev_prop[i].is_empty()==false) dev_prop[i] >> dIDBRejectedTable; // Try to initialize RemoteHost from class property cl_prop = ds_class->get_class_property(dev_prop[++i].name); if (cl_prop.is_empty()==false) cl_prop >> remoteHost; Loading Loading @@ -510,27 +426,6 @@ void DataImporter::get_device_property() checkIfDirectoryExists(storagePath); if(dIDBHost.empty()) throw(invalid_argument("DIDBHost property is empty or not defined")); if(dIDBPort<1 || dIDBPort>MAX_PORT_NUMBER) throw(invalid_argument("DIDBPort property out of range or not defined")); if(dIDBUser.empty()) throw(invalid_argument("DIDBUser property is empty or not defined")); if(dIDBPassword.empty()) throw(invalid_argument("DIDBPassword property is empty or not defined")); if(dIDBSchema.empty()) throw(invalid_argument("DIDBSchema property is empty or not defined")); if(dIDBIndexTable.empty()) throw(invalid_argument("DIDBIndexTable property is empty or not defined")); if(dIDBRejectedTable.empty()) throw(invalid_argument("DIDBRejectedTable property is empty or not defined")); if(remoteHost.empty()) throw(invalid_argument("RemoteHost property is empty or not defined")); Loading Loading @@ -568,10 +463,9 @@ void DataImporter::get_device_property() throw(invalid_argument("Timeout property out of range or not defined")); m_configuration_sp = Configuration::create(certificateFile, storagePath, dIDBHost, dIDBPort, dIDBUser, dIDBPassword, dIDBSchema, dIDBIndexTable, dIDBRejectedTable, remoteHost, remotePort, remoteUsername, remotePassword, databaseHost, databasePort, databaseUsername, databasePassword, databaseSchema, databaseTable, refreshTime, timeout); remoteHost, remotePort, remoteUsername, remotePassword, databaseHost, databasePort, databaseUsername, databasePassword, databaseSchema, databaseTable, refreshTime, timeout); } catch(invalid_argument& ex) { Loading src/DataImporter.h +0 −14 Original line number Diff line number Diff line Loading @@ -91,20 +91,6 @@ public: string certificateFile; // StoragePath: Absolute path to storage string storagePath; // DIDBHost: Hostname where data import database is running string dIDBHost; // DIDBPort: Port where data import database is listening Tango::DevULong dIDBPort; // DIDBUser: User to login in data import database string dIDBUser; // DIDBPassword: Password to login in data import database string dIDBPassword; // DIDBSchema: Schema where data import tables are located string dIDBSchema; // DIDBIndexTable: Index table name string dIDBIndexTable; // DIDBRejectedTable: Rejected table name string dIDBRejectedTable; // RemoteHost: Metadata exporter remote host string remoteHost; // RemotePort: Metadata exporter remote port Loading Loading
src/Configuration.h +4 −43 Original line number Diff line number Diff line Loading @@ -19,9 +19,6 @@ private: // [Private] Constructor destructor deleter //------------------------------------------------------------------------------ Configuration(std::string certificateFile, std::string storagePath, std::string dIDBHost, unsigned int dIDBPort, std::string dIDBUser, std::string dIDBPassword, std::string dIDBSchema, std::string dIDBIndexTable, std::string dIDBRejectedTable, std::string remoteHost, unsigned int remotePort, std::string remoteUsername, std::string remotePassword, std::string databaseHost, unsigned int databasePort, Loading @@ -29,9 +26,6 @@ private: std::string databaseSchema, std::string databaseTable, unsigned int refreshTime, unsigned int timeout) : m_certificateFile (certificateFile), m_storagePath(storagePath), m_dIDBHost(dIDBHost), m_dIDBPort(dIDBPort), m_dIDBUser(dIDBUser), m_dIDBPassword(dIDBPassword), m_dIDBSchema(dIDBSchema), m_dIDBIndexTable(dIDBIndexTable), m_dIDBRejectedTable(dIDBRejectedTable), m_remoteHost(remoteHost), m_remotePort(remotePort), m_remoteUsername(remoteUsername), m_remotePassword(remotePassword), m_databaseHost(databaseHost), m_databasePort(databasePort), Loading @@ -54,10 +48,7 @@ public: // [Public] Create class method //------------------------------------------------------------------------------ static Configuration::SP create(std::string certificateFile, std::string storagePath, std::string dIDBHost, unsigned int dIDBPort, std::string dIDBUser, std::string dIDBPassword, std::string dIDBSchema, std::string dIDBIndexTable, std::string dIDBRejectedTable, std::string remoteHost, unsigned int remotePort, std::string storagePath, std::string remoteHost, unsigned int remotePort, std::string remoteUsername, std::string remotePassword, std::string databaseHost, unsigned int databasePort, std::string databaseUsername, std::string databasePassword, Loading @@ -65,11 +56,9 @@ public: unsigned int refreshTime, unsigned int timeout) { Configuration::SP c_sp(new Configuration(certificateFile, storagePath, dIDBHost, dIDBPort, dIDBUser, dIDBPassword, dIDBSchema, dIDBIndexTable, dIDBRejectedTable, remoteHost, remotePort, remoteUsername, remotePassword, databaseHost, databasePort, databaseUsername, databasePassword, databaseSchema, databaseTable, refreshTime, timeout), Configuration::Deleter()); remoteHost, remotePort, remoteUsername, remotePassword, databaseHost, databasePort, databaseUsername, databasePassword, databaseSchema, databaseTable, refreshTime, timeout), Configuration::Deleter()); return c_sp; } Loading @@ -79,13 +68,6 @@ public: //------------------------------------------------------------------------------ std::string getCertificateFile() const { return m_certificateFile; } std::string getStoragePath() const { return m_storagePath; } std::string getDIDBHost() const { return m_dIDBHost; } unsigned int getDIDBPort() const { return m_dIDBPort; } std::string getDIDBUser() const { return m_dIDBUser; } std::string getDIDBPassword() const { return m_dIDBPassword; } std::string getDIDBSchema() const { return m_dIDBSchema; } std::string getDIDBIndexTable() const { return m_dIDBIndexTable; } std::string getDIDBRejectedTable() const { return m_dIDBRejectedTable; } std::string getRemoteHost() const { return m_remoteHost; } unsigned int getRemotePort() const { return m_remotePort; } std::string getRemoteUsername() const { return m_remoteUsername; } Loading @@ -109,27 +91,6 @@ private: //Absolute path to storage const std::string m_storagePath; //Host where data import database is running const std::string m_dIDBHost; //Port where data import database is listening const unsigned int m_dIDBPort; //User to login in data import database const std::string m_dIDBUser; //Password to login in data import database const std::string m_dIDBPassword; //Schema where data import tables are located const std::string m_dIDBSchema; //Index table name const std::string m_dIDBIndexTable; //Rejected table name const std::string m_dIDBRejectedTable; //Metadata exporter remote host const std::string m_remoteHost; Loading
src/DBManager.cpp +69 −35 Original line number Diff line number Diff line Loading @@ -54,7 +54,7 @@ void DBManager::connect() throw(soci::soci_error) { DEBUG_STREAM << "DBManager::connect()" << endl; boost::mutex::scoped_lock lock(m_connectionMutex); boost::mutex::scoped_lock lock(m_sessionMutex); std::stringstream connection; Loading @@ -77,7 +77,7 @@ void DBManager::disconnect() { DEBUG_STREAM << "DBManager::disconnect()" << endl; boost::mutex::scoped_lock lock(m_connectionMutex); boost::mutex::scoped_lock lock(m_sessionMutex); m_session_sp->close(); } Loading @@ -85,65 +85,99 @@ void DBManager::disconnect() //============================================================================== // DBManager::retrieveLastTimestamp() //============================================================================== std::tm DBManager::retrieveLastTimestamp() throw(soci::soci_error) std::tm DBManager::retrieveLastTimestamp() throw(std::runtime_error, std::out_of_range) { DEBUG_STREAM << "DBManager::retrieveLastTimestamp()" << endl; boost::mutex::scoped_lock lock(m_connectionMutex); // if(m_auxSession_sp->get_backend() == NULL) // m_auxSession_sp->reconnect(); // // std::tm lastTimestamp; // // *m_auxSession_sp << "select coalesce(max(update_time),'1970-01-01 00:00:00') " // << "from " << m_configuration_sp->getDIDBSchema() << "." // << m_configuration_sp->getDIDBIndexTable(), soci::into(lastTimestamp); // // return lastTimestamp; boost::mutex::scoped_lock lock(m_tangoDBMutex); Tango::DbData db_data; db_data.push_back(Tango::DbDatum("LastTimestamp")); try { m_tangoDB_sp->get_device_property(m_deviceName, db_data); } catch(Tango::DevFailed& ex) { std::stringstream error_msg; for (unsigned int i=0; i<ex.errors.length(); i++) { error_msg << ex.errors[i].reason.in() << endl; error_msg << ex.errors[i].desc.in() << endl; error_msg << ex.errors[i].origin.in() << endl; } throw std::runtime_error(error_msg.str()); } std::string timestamp("1970-01-01 00:00:00"); db_data[0] >> timestamp; boost::posix_time::ptime ptime = boost::posix_time::time_from_string(timestamp); return boost::posix_time::to_tm(ptime); } //============================================================================== // DBManager::persistLastTimestamp() //============================================================================== void DBManager::persistLastTimestamp(std::tm lastTimestamp) throw(soci::soci_error) void DBManager::persistLastTimestamp(std::tm lastTimestamp) throw(std::runtime_error, std::out_of_range) { DEBUG_STREAM << "DBManager::persistLastTimestamp()" << endl; boost::mutex::scoped_lock lock(m_connectionMutex); boost::mutex::scoped_lock lock(m_tangoDBMutex); // if(m_auxSession_sp->get_backend() == NULL) // m_auxSession_sp->reconnect(); // // *m_auxSession_sp << "insert into " << m_configuration_sp->getDIDBSchema() // << "." << m_configuration_sp->getDIDBIndexTable() // << " values(:lastTimestamp)", soci::use(lastTimestamp, "lastTimestamp"); boost::posix_time::ptime ptime = boost::posix_time::ptime_from_tm(lastTimestamp); std::string timestampString = boost::posix_time::to_simple_string(ptime); Tango::DbDatum timestamp("LastTimestamp"); timestamp << timestampString; Tango::DbData db_data; db_data.push_back(timestamp); try { m_tangoDB_sp->put_device_property(m_deviceName, db_data); } catch(Tango::DevFailed& ex) { std::stringstream error_msg; for (unsigned int i=0; i<ex.errors.length(); i++) { error_msg << ex.errors[i].reason.in() << endl; error_msg << ex.errors[i].desc.in() << endl; error_msg << ex.errors[i].origin.in() << endl; } throw std::runtime_error(error_msg.str()); } } //============================================================================== // DBManager::retrieveNewTuples() //============================================================================== DBManager::RowsetSP DBManager::retrieveNewTuples(std::tm lastTimestamp) throw(soci::soci_error, std::out_of_range) DBManager::FileRowsetSP DBManager::retrieveNewTuples(std::string schema, std::string table, std::tm lastTimestamp) throw(soci::soci_error, std::out_of_range) { DEBUG_STREAM << "DBManager::retrieveNewTuples()" << endl; boost::mutex::scoped_lock lock(m_connectionMutex); boost::mutex::scoped_lock lock(m_sessionMutex); if(m_session_sp->get_backend() == NULL) m_session_sp->reconnect(); boost::posix_time::ptime timestamp = boost::posix_time::ptime_from_tm(lastTimestamp); // RowsetSP rows(new soci::rowset<soci::row>(m_mainSession_sp->prepare // << "select id, file_path, file_version, file_name, update_time from " // << m_configuration_sp->getDatabaseSchema() << "." // << m_configuration_sp->getDatabaseTable() // << " where update_time>'" // << boost::posix_time::to_iso_string(timestamp) // << "' order by update_time asc")); FileRowsetSP fileRowset_sp(new FileRowset(m_session_sp->prepare << "select" << " file_version, file_name, update_time from " << schema << "." << table << " where update_time>'" << boost::posix_time::to_iso_string(timestamp) << "' order by update_time asc")); //return rows; return fileRowset_sp; } //============================================================================== Loading
src/DBManager.h +20 −12 Original line number Diff line number Diff line Loading @@ -70,38 +70,46 @@ public: //------------------------------------------------------------------------------ // [Public] Timestamp methods //------------------------------------------------------------------------------ virtual std::tm retrieveLastTimestamp() throw(soci::soci_error); virtual std::tm retrieveLastTimestamp() throw(std::runtime_error, std::out_of_range); virtual void persistLastTimestamp(std::tm) throw(soci::soci_error); virtual void persistLastTimestamp(std::tm) throw(std::runtime_error, std::out_of_range); //------------------------------------------------------------------------------ // [Public] Tuple methods //------------------------------------------------------------------------------ typedef boost::shared_ptr< boost::tuple< boost::optional<int>, boost::optional<std::string>, boost::optional<std::string>, boost::optional<int>, boost::optional<std::string>, boost::optional<std::tm> > > RowsetSP; typedef boost::tuple< boost::optional<int>, boost::optional<std::string>, boost::optional<std::tm> > FileRow; virtual RowsetSP retrieveNewTuples(std::tm) typedef soci::rowset< FileRow > FileRowset; typedef boost::shared_ptr< FileRowset > FileRowsetSP; virtual FileRowsetSP retrieveNewTuples(std::string, std::string, std::tm) throw(soci::soci_error, std::out_of_range); protected: //------------------------------------------------------------------------------ // [Protected] Class variables //------------------------------------------------------------------------------ //Device name std::string m_deviceName; //Configuration shared pointer Configuration::SP m_configuration_sp; //Connection mutex boost::mutex m_connectionMutex; //Metadata database connection mutex boost::mutex m_sessionMutex; //Metadata database connection scoped pointer boost::scoped_ptr<soci::session> m_session_sp; //Tango database connection mutex boost::mutex m_tangoDBMutex; //Tango database connection scoped pointer boost::scoped_ptr<Tango::Database> m_tangoDB_sp; //Device name std::string m_deviceName; }; } //End of namespace Loading
src/DataImporter.cpp +3 −109 Original line number Diff line number Diff line Loading @@ -203,13 +203,6 @@ void DataImporter::get_device_property() Tango::DbData dev_prop; dev_prop.push_back(Tango::DbDatum("CertificateFile")); dev_prop.push_back(Tango::DbDatum("StoragePath")); dev_prop.push_back(Tango::DbDatum("DIDBHost")); dev_prop.push_back(Tango::DbDatum("DIDBPort")); dev_prop.push_back(Tango::DbDatum("DIDBUser")); dev_prop.push_back(Tango::DbDatum("DIDBPassword")); dev_prop.push_back(Tango::DbDatum("DIDBSchema")); dev_prop.push_back(Tango::DbDatum("DIDBIndexTable")); dev_prop.push_back(Tango::DbDatum("DIDBRejectedTable")); dev_prop.push_back(Tango::DbDatum("RemoteHost")); dev_prop.push_back(Tango::DbDatum("RemotePort")); dev_prop.push_back(Tango::DbDatum("RemoteUsername")); Loading Loading @@ -260,83 +253,6 @@ void DataImporter::get_device_property() // And try to extract StoragePath value from database if (dev_prop[i].is_empty()==false) dev_prop[i] >> storagePath; // Try to initialize DIDBHost from class property cl_prop = ds_class->get_class_property(dev_prop[++i].name); if (cl_prop.is_empty()==false) cl_prop >> dIDBHost; else { // Try to initialize DIDBHost from default device value def_prop = ds_class->get_default_device_property(dev_prop[i].name); if (def_prop.is_empty()==false) def_prop >> dIDBHost; } // And try to extract DIDBHost value from database if (dev_prop[i].is_empty()==false) dev_prop[i] >> dIDBHost; // Try to initialize DIDBPort from class property cl_prop = ds_class->get_class_property(dev_prop[++i].name); if (cl_prop.is_empty()==false) cl_prop >> dIDBPort; else { // Try to initialize DIDBPort from default device value def_prop = ds_class->get_default_device_property(dev_prop[i].name); if (def_prop.is_empty()==false) def_prop >> dIDBPort; } // And try to extract DIDBPort value from database if (dev_prop[i].is_empty()==false) dev_prop[i] >> dIDBPort; // Try to initialize DIDBUser from class property cl_prop = ds_class->get_class_property(dev_prop[++i].name); if (cl_prop.is_empty()==false) cl_prop >> dIDBUser; else { // Try to initialize DIDBUser from default device value def_prop = ds_class->get_default_device_property(dev_prop[i].name); if (def_prop.is_empty()==false) def_prop >> dIDBUser; } // And try to extract DIDBUser value from database if (dev_prop[i].is_empty()==false) dev_prop[i] >> dIDBUser; // Try to initialize DIDBPassword from class property cl_prop = ds_class->get_class_property(dev_prop[++i].name); if (cl_prop.is_empty()==false) cl_prop >> dIDBPassword; else { // Try to initialize DIDBPassword from default device value def_prop = ds_class->get_default_device_property(dev_prop[i].name); if (def_prop.is_empty()==false) def_prop >> dIDBPassword; } // And try to extract DIDBPassword value from database if (dev_prop[i].is_empty()==false) dev_prop[i] >> dIDBPassword; // Try to initialize DIDBSchema from class property cl_prop = ds_class->get_class_property(dev_prop[++i].name); if (cl_prop.is_empty()==false) cl_prop >> dIDBSchema; else { // Try to initialize DIDBSchema from default device value def_prop = ds_class->get_default_device_property(dev_prop[i].name); if (def_prop.is_empty()==false) def_prop >> dIDBSchema; } // And try to extract DIDBSchema value from database if (dev_prop[i].is_empty()==false) dev_prop[i] >> dIDBSchema; // Try to initialize DIDBIndexTable from class property cl_prop = ds_class->get_class_property(dev_prop[++i].name); if (cl_prop.is_empty()==false) cl_prop >> dIDBIndexTable; else { // Try to initialize DIDBIndexTable from default device value def_prop = ds_class->get_default_device_property(dev_prop[i].name); if (def_prop.is_empty()==false) def_prop >> dIDBIndexTable; } // And try to extract DIDBIndexTable value from database if (dev_prop[i].is_empty()==false) dev_prop[i] >> dIDBIndexTable; // Try to initialize DIDBRejectedTable from class property cl_prop = ds_class->get_class_property(dev_prop[++i].name); if (cl_prop.is_empty()==false) cl_prop >> dIDBRejectedTable; else { // Try to initialize DIDBRejectedTable from default device value def_prop = ds_class->get_default_device_property(dev_prop[i].name); if (def_prop.is_empty()==false) def_prop >> dIDBRejectedTable; } // And try to extract DIDBRejectedTable value from database if (dev_prop[i].is_empty()==false) dev_prop[i] >> dIDBRejectedTable; // Try to initialize RemoteHost from class property cl_prop = ds_class->get_class_property(dev_prop[++i].name); if (cl_prop.is_empty()==false) cl_prop >> remoteHost; Loading Loading @@ -510,27 +426,6 @@ void DataImporter::get_device_property() checkIfDirectoryExists(storagePath); if(dIDBHost.empty()) throw(invalid_argument("DIDBHost property is empty or not defined")); if(dIDBPort<1 || dIDBPort>MAX_PORT_NUMBER) throw(invalid_argument("DIDBPort property out of range or not defined")); if(dIDBUser.empty()) throw(invalid_argument("DIDBUser property is empty or not defined")); if(dIDBPassword.empty()) throw(invalid_argument("DIDBPassword property is empty or not defined")); if(dIDBSchema.empty()) throw(invalid_argument("DIDBSchema property is empty or not defined")); if(dIDBIndexTable.empty()) throw(invalid_argument("DIDBIndexTable property is empty or not defined")); if(dIDBRejectedTable.empty()) throw(invalid_argument("DIDBRejectedTable property is empty or not defined")); if(remoteHost.empty()) throw(invalid_argument("RemoteHost property is empty or not defined")); Loading Loading @@ -568,10 +463,9 @@ void DataImporter::get_device_property() throw(invalid_argument("Timeout property out of range or not defined")); m_configuration_sp = Configuration::create(certificateFile, storagePath, dIDBHost, dIDBPort, dIDBUser, dIDBPassword, dIDBSchema, dIDBIndexTable, dIDBRejectedTable, remoteHost, remotePort, remoteUsername, remotePassword, databaseHost, databasePort, databaseUsername, databasePassword, databaseSchema, databaseTable, refreshTime, timeout); remoteHost, remotePort, remoteUsername, remotePassword, databaseHost, databasePort, databaseUsername, databasePassword, databaseSchema, databaseTable, refreshTime, timeout); } catch(invalid_argument& ex) { Loading
src/DataImporter.h +0 −14 Original line number Diff line number Diff line Loading @@ -91,20 +91,6 @@ public: string certificateFile; // StoragePath: Absolute path to storage string storagePath; // DIDBHost: Hostname where data import database is running string dIDBHost; // DIDBPort: Port where data import database is listening Tango::DevULong dIDBPort; // DIDBUser: User to login in data import database string dIDBUser; // DIDBPassword: Password to login in data import database string dIDBPassword; // DIDBSchema: Schema where data import tables are located string dIDBSchema; // DIDBIndexTable: Index table name string dIDBIndexTable; // DIDBRejectedTable: Rejected table name string dIDBRejectedTable; // RemoteHost: Metadata exporter remote host string remoteHost; // RemotePort: Metadata exporter remote port Loading