Newer
Older
#include <DBManager.h>
#include <ctime>
#include <boost/date_time.hpp>
#include <soci/mysql/soci-mysql.h>
namespace DataImporter_ns
{
//==============================================================================
// DBManager::DBManager()
//==============================================================================
DBManager::DBManager(Tango::DeviceImpl* deviceImpl_p,
Configuration::SP configuration_sp) : Tango::LogAdapter(deviceImpl_p),
m_configuration_sp(configuration_sp)
{
DEBUG_STREAM << "DBManager::DBManager()" << endl;
m_deviceName = deviceImpl_p->get_name_lower();
m_session_sp.reset(new soci::session);
m_tangoDB_sp.reset(new Tango::Database);
}
//==============================================================================
// DBManager::DBManager()
//==============================================================================
DBManager::~DBManager()
{
DEBUG_STREAM << "DBManager::~DBManager()" << endl;
m_session_sp->close();
}
//==============================================================================
// DBManager::DBManager()
//==============================================================================
DBManager::SP DBManager::create(Tango::DeviceImpl* deviceImpl_p,
Configuration::SP configuration_sp)
{
DBManager::SP d_sp(new DBManager(deviceImpl_p, configuration_sp),
DBManager::Deleter());
return d_sp;
}
//==============================================================================
// DBManager::connect()
//==============================================================================
void DBManager::connect() throw(soci::soci_error)
{
DEBUG_STREAM << "DBManager::connect()" << endl;
boost::mutex::scoped_lock lock(m_sessionMutex);
std::stringstream connection;
connection << " host=" << m_configuration_sp->getDatabaseHost();
connection << " port=" << m_configuration_sp->getDatabasePort();
connection << " user=" << m_configuration_sp->getDatabaseUsername();
connection << " password=" << m_configuration_sp->getDatabasePassword();
#ifdef VERBOSE_DEBUG
INFO_STREAM << "MAIN CONNECTION: " << connection.str() << endl;
#endif
m_session_sp->open(soci::mysql, connection.str());
}
//==============================================================================
// DBManager::disconnect()
//==============================================================================
void DBManager::disconnect()
{
DEBUG_STREAM << "DBManager::disconnect()" << endl;
boost::mutex::scoped_lock lock(m_sessionMutex);
m_session_sp->close();
}
//==============================================================================
// DBManager::retrieveLastTimestamp()
//==============================================================================
std::tm DBManager::retrieveLastTimestamp()
throw(std::runtime_error, std::out_of_range)
{
DEBUG_STREAM << "DBManager::retrieveLastTimestamp()" << endl;
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
boost::mutex::scoped_lock lock(m_tangoDBMutex);
Tango::DbData db_data;
db_data.push_back(Tango::DbDatum("LastTimestamp"));
try
{
m_tangoDB_sp->get_device_property(m_deviceName, db_data);
}
catch(Tango::DevFailed& ex)
{
std::stringstream error_msg;
for (unsigned int i=0; i<ex.errors.length(); i++)
{
error_msg << ex.errors[i].reason.in() << endl;
error_msg << ex.errors[i].desc.in() << endl;
error_msg << ex.errors[i].origin.in() << endl;
}
throw std::runtime_error(error_msg.str());
}
std::string timestamp("1970-01-01 00:00:00");
db_data[0] >> timestamp;
boost::posix_time::ptime ptime = boost::posix_time::time_from_string(timestamp);
return boost::posix_time::to_tm(ptime);
}
//==============================================================================
// DBManager::persistLastTimestamp()
//==============================================================================
void DBManager::persistLastTimestamp(std::tm lastTimestamp)
throw(std::runtime_error, std::out_of_range)
{
DEBUG_STREAM << "DBManager::persistLastTimestamp()" << endl;
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
boost::mutex::scoped_lock lock(m_tangoDBMutex);
boost::posix_time::ptime ptime = boost::posix_time::ptime_from_tm(lastTimestamp);
std::string timestampString = boost::posix_time::to_simple_string(ptime);
Tango::DbDatum timestamp("LastTimestamp");
timestamp << timestampString;
Tango::DbData db_data;
db_data.push_back(timestamp);
try
{
m_tangoDB_sp->put_device_property(m_deviceName, db_data);
}
catch(Tango::DevFailed& ex)
{
std::stringstream error_msg;
for (unsigned int i=0; i<ex.errors.length(); i++)
{
error_msg << ex.errors[i].reason.in() << endl;
error_msg << ex.errors[i].desc.in() << endl;
error_msg << ex.errors[i].origin.in() << endl;
}
throw std::runtime_error(error_msg.str());
}
}
//==============================================================================
// DBManager::retrieveNewTuples()
//==============================================================================
DBManager::FileRowsetSP DBManager::retrieveNewTuples(std::string schema,
std::string table, std::tm lastTimestamp) throw(soci::soci_error,
std::out_of_range)
{
DEBUG_STREAM << "DBManager::retrieveNewTuples()" << endl;
boost::mutex::scoped_lock lock(m_sessionMutex);
if(m_session_sp->get_backend() == NULL)
m_session_sp->reconnect();
boost::posix_time::ptime timestamp = boost::posix_time::ptime_from_tm(lastTimestamp);
FileRowsetSP fileRowset_sp(new FileRowset(m_session_sp->prepare << "select"
<< " file_version, file_name, update_time from " << schema << "." << table
<< " where update_time>'" << boost::posix_time::to_iso_string(timestamp)
<< "' order by update_time asc"));
}
//==============================================================================
// DBManager::retrieveNewTuples()
//==============================================================================