Newer
Older
#include <DBManager.h>
#include <ctime>
#include <boost/date_time.hpp>
#include <soci/mysql/soci-mysql.h>
namespace DataImporter_ns
{
//==============================================================================
// DBManager::DBManager()
//==============================================================================
DBManager::DBManager(Tango::DeviceImpl* deviceImpl_p,
Configuration::SP configuration_sp) : Tango::LogAdapter(deviceImpl_p),
m_configuration_sp(configuration_sp)
{
DEBUG_STREAM << "DBManager::DBManager()" << endl;
m_deviceName = deviceImpl_p->get_name_lower();
m_session_sp.reset(new soci::session);
m_tangoDB_sp.reset(new Tango::Database);
}
//==============================================================================
// DBManager::DBManager()
//==============================================================================
DBManager::~DBManager()
{
DEBUG_STREAM << "DBManager::~DBManager()" << endl;
m_session_sp->close();
}
//==============================================================================
// DBManager::DBManager()
//==============================================================================
DBManager::SP DBManager::create(Tango::DeviceImpl* deviceImpl_p,
Configuration::SP configuration_sp)
{
DBManager::SP d_sp(new DBManager(deviceImpl_p, configuration_sp),
DBManager::Deleter());
return d_sp;
}
//==============================================================================
// DBManager::connect()
//==============================================================================
void DBManager::connect() throw(soci::soci_error)
{
DEBUG_STREAM << "DBManager::connect()" << endl;
boost::mutex::scoped_lock lock(m_sessionMutex);
std::stringstream connection;
connection << " host=" << m_configuration_sp->getDatabaseHost();
connection << " port=" << m_configuration_sp->getDatabasePort();
connection << " user=" << m_configuration_sp->getDatabaseUsername();
connection << " password=" << m_configuration_sp->getDatabasePassword();
#ifdef VERBOSE_DEBUG
INFO_STREAM << "MAIN CONNECTION: " << connection.str() << endl;
#endif
m_session_sp->open(soci::mysql, connection.str());
}
//==============================================================================
// DBManager::disconnect()
//==============================================================================
void DBManager::disconnect()
{
DEBUG_STREAM << "DBManager::disconnect()" << endl;
boost::mutex::scoped_lock lock(m_sessionMutex);
m_session_sp->close();
}
//==============================================================================
// DBManager::retrieveLastTimestamp()
//==============================================================================
boost::posix_time::ptime DBManager::retrieveLastTimestamp()
throw(std::runtime_error, std::out_of_range)
{
DEBUG_STREAM << "DBManager::retrieveLastTimestamp()" << endl;
boost::mutex::scoped_lock lock(m_tangoDBMutex);
Tango::DbData db_data;
db_data.push_back(Tango::DbDatum("LastTimestamp"));
try
{
m_tangoDB_sp->get_device_property(m_deviceName, db_data);
}
catch(Tango::DevFailed& ex)
{
std::stringstream error_msg;
for (unsigned int i=0; i<ex.errors.length(); i++)
{
error_msg << ex.errors[i].reason.in() << endl;
error_msg << ex.errors[i].desc.in() << endl;
error_msg << ex.errors[i].origin.in() << endl;
}
throw std::runtime_error(error_msg.str());
}
std::string timestamp("1970-01-01 00:00:00");
db_data[0] >> timestamp;
return boost::posix_time::time_from_string(timestamp);
}
//==============================================================================
// DBManager::persistLastTimestamp()
//==============================================================================
void DBManager::persistLastTimestamp(boost::posix_time::ptime ptime)
throw(std::runtime_error)
{
DEBUG_STREAM << "DBManager::persistLastTimestamp()" << endl;
boost::mutex::scoped_lock lock(m_tangoDBMutex);
std::string timestampString = boost::posix_time::to_simple_string(ptime);
Tango::DbDatum timestamp("LastTimestamp");
timestamp << timestampString;
Tango::DbData db_data;
db_data.push_back(timestamp);
try
{
m_tangoDB_sp->put_device_property(m_deviceName, db_data);
}
catch(Tango::DevFailed& ex)
{
std::stringstream error_msg;
for (unsigned int i=0; i<ex.errors.length(); i++)
{
error_msg << ex.errors[i].reason.in() << endl;
error_msg << ex.errors[i].desc.in() << endl;
error_msg << ex.errors[i].origin.in() << endl;
}
throw std::runtime_error(error_msg.str());
}
}
//==============================================================================
//==============================================================================
DBManager::FileRowsetSP DBManager::retrieveNewFile(std::string schema,
std::string table, boost::posix_time::ptime ptime) throw(soci::soci_error)
DEBUG_STREAM << "DBManager::retrieveNewFile()" << endl;
boost::mutex::scoped_lock lock(m_sessionMutex);
if(m_session_sp->get_backend() == NULL)
m_session_sp->reconnect();
FileRowsetSP fileRowset_sp(new FileRowset(m_session_sp->prepare << "select"
<< " file_version, file_name, update_time from " << schema << "." << table
<< " where update_time>'" << boost::posix_time::to_iso_string(ptime)
}
//==============================================================================
// DBManager::retrieveNewTuples()
//==============================================================================