Newer
Older
#include <DBManager.h>
#include <ctime>
#include <boost/date_time.hpp>
#include <soci/mysql/soci-mysql.h>
namespace DataImporter_ns
{
//==============================================================================
// DBManager::DBManager()
//==============================================================================
DBManager::DBManager(Tango::DeviceImpl* deviceImpl_p,
Configuration::SP configuration_sp) : Tango::LogAdapter(deviceImpl_p),
m_configuration_sp(configuration_sp)
{
DEBUG_STREAM << "DBManager::DBManager()" << endl;
m_deviceName = deviceImpl_p->get_name_lower();
}
//==============================================================================
//==============================================================================
DBManager::~DBManager()
{
DEBUG_STREAM << "DBManager::~DBManager()" << endl;
m_mainSession_sp->close();
m_auxSession_sp->close();
}
//==============================================================================
//==============================================================================
DBManager::SP DBManager::create(Tango::DeviceImpl* deviceImpl_p,
Configuration::SP configuration_sp)
{
DBManager::SP d_sp(new DBManager(deviceImpl_p, configuration_sp),
DBManager::Deleter());
return d_sp;
}
//==============================================================================
// DBManager::connect()
//==============================================================================
void DBManager::connectAll() throw(soci::soci_error)
{
DEBUG_STREAM << "DBManager::connect()" << endl;
boost::mutex::scoped_lock lock(m_sessionMutex);
std::stringstream connection;
connection << " host=" << m_configuration_sp->getDatabaseHost();
connection << " port=" << m_configuration_sp->getDatabasePort();
connection << " user=" << m_configuration_sp->getDatabaseUsername();
connection << " password=" << m_configuration_sp->getDatabasePassword();
#ifdef VERBOSE_DEBUG
INFO_STREAM << "MAIN CONNECTION: " << connection.str() << endl;
m_mainSession_sp->open(soci::mysql, connection.str());
connection.str("");
connection << " host=" << m_configuration_sp->getAuxDatabaseHost();
connection << " port=" << m_configuration_sp->getAuxDatabasePort();
connection << " user=" << m_configuration_sp->getAuxDatabaseUsername();
connection << " password=" << m_configuration_sp->getAuxDatabasePassword();
#ifdef VERBOSE_DEBUG
INFO_STREAM << "AUX CONNECTION: " << connection.str() << endl;
#endif
m_auxSession_sp->open(soci::mysql, connection.str());
}
//==============================================================================
// DBManager::disconnect()
//==============================================================================
{
DEBUG_STREAM << "DBManager::disconnect()" << endl;
boost::mutex::scoped_lock lock(m_sessionMutex);
m_mainSession_sp->close();
m_auxSession_sp->close();
}
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
//==============================================================================
// DBManager::getMainTransaction()
//==============================================================================
DBManager::TransactionSP DBManager::getMainTransaction()
{
DEBUG_STREAM << "DBManager::getMainTransaction()" << endl;
boost::mutex::scoped_lock lock(m_sessionMutex);
TransactionSP transaction_sp(new soci::transaction(*m_mainSession_sp));
return transaction_sp;
}
//==============================================================================
// DBManager::getAuxTransaction()
//==============================================================================
DBManager::TransactionSP DBManager::getAuxTransaction()
{
DEBUG_STREAM << "DBManager::getAuxTransaction()" << endl;
boost::mutex::scoped_lock lock(m_sessionMutex);
TransactionSP transaction_sp(new soci::transaction(*m_auxSession_sp));
return transaction_sp;
}
//==============================================================================
// DBManager::retrieveNewFiles()
//==============================================================================
DBManager::NewFileRowsetSP DBManager::retrieveNewFiles(boost::posix_time::ptime ptime)
throw(soci::soci_error)
{
DEBUG_STREAM << "DBManager::retrieveNewFiles()" << endl;
boost::mutex::scoped_lock lock(m_sessionMutex);
if(m_mainSession_sp->get_backend() == NULL)
m_mainSession_sp->reconnect();
NewFileRowsetSP newFileRowset_sp(new NewFileRowset(m_mainSession_sp->prepare
<< "select file_version, file_name, update_time from "
<< m_configuration_sp->getDatabaseSchema() << "."
<< m_configuration_sp->getDatabaseTable() << " where update_time>'"
<< boost::posix_time::to_iso_string(ptime) << "' order by update_time asc"));
return newFileRowset_sp;
//==============================================================================
// DBManager::updateNewFilePath()
//==============================================================================
void DBManager::updateNewFilePath(std::string storagePath, std::string filePath,
int fileVersion, std::string fileName) throw(soci::soci_error)
{
DEBUG_STREAM << "DBManager::updateNewFilePath()" << endl;
boost::mutex::scoped_lock lock(m_sessionMutex);
if(m_mainSession_sp->get_backend() == NULL)
m_mainSession_sp->reconnect();
*m_mainSession_sp << "update " << m_configuration_sp->getDatabaseSchema()
<< "." << m_configuration_sp->getDatabaseTable()
<< " set storage_path = :storagePath, file_path = :filePath "
<< " where file_version = :fileVersion and file_name like :fileName",
soci::use(storagePath, "storagePath"), soci::use(filePath, "filePath"),
soci::use(fileVersion, "fileVersion"), soci::use(fileName, "fileName");
}
//==============================================================================
// DBManager::retrieveLastTimestamp()
//==============================================================================
boost::posix_time::ptime DBManager::retrieveLastTimestamp()
{
DEBUG_STREAM << "DBManager::retrieveLastTimestamp()" << endl;
boost::mutex::scoped_lock lock(m_sessionMutex);
if(m_auxSession_sp->get_backend() == NULL)
m_auxSession_sp->reconnect();
std::tm tm_time;
*m_auxSession_sp << "select coalesce(max(last_timestamp),'1970-01-01 00:00:00')"
<< " from "<< m_configuration_sp->getAuxDatabaseSchema()
<< "." << m_configuration_sp->getAuxDatabaseTimestampTable()
<< " where device_name like :deviceName",
soci::use(m_deviceName, "deviceName"), soci::into(tm_time);
return boost::posix_time::ptime_from_tm(tm_time);
}
//==============================================================================
// DBManager::persistLastTimestamp()
//==============================================================================
void DBManager::persistLastTimestamp(boost::posix_time::ptime ptime)
{
DEBUG_STREAM << "DBManager::persistLastTimestamp()" << endl;
boost::mutex::scoped_lock lock(m_sessionMutex);
if(m_auxSession_sp->get_backend() == NULL)
m_auxSession_sp->reconnect();
*m_auxSession_sp << "insert into " << m_configuration_sp->getAuxDatabaseSchema()
<< "." << m_configuration_sp->getAuxDatabaseTimestampTable()
<< " (device_name, last_timestamp) values ('" << m_deviceName << "','"
<< boost::posix_time::to_iso_string(ptime) << "') on duplicate key update"
<< " last_timestamp='" << boost::posix_time::to_iso_string(ptime) << "'";
}
//==============================================================================
//==============================================================================
void DBManager::addFailedFile(int fileVersion, std::string fileName)
throw(soci::soci_error)
DEBUG_STREAM << "DBManager::addFailedFile()" << endl;
boost::mutex::scoped_lock lock(m_sessionMutex);
if(m_auxSession_sp->get_backend() == NULL)
m_auxSession_sp->reconnect();
*m_auxSession_sp << "insert ignore into "
<< m_configuration_sp->getAuxDatabaseSchema() << "."
<< m_configuration_sp->getAuxDatabaseFailedTable()
<< " (device_name, file_version, file_name) values (:deviceName, "
<< ":fileVersion, :fileName)", soci::use(m_deviceName, "deviceName"),
soci::use(fileVersion, "fileVersion"), soci::use(fileName, "fileName");
}
//==============================================================================
// DBManager::removeFailedFile()
//==============================================================================
void DBManager::removeFailedFile(int fileVersion, std::string fileName)
throw(soci::soci_error)
{
DEBUG_STREAM << "DBManager::removeFailedFile()" << endl;
boost::mutex::scoped_lock lock(m_sessionMutex);
if(m_auxSession_sp->get_backend() == NULL)
m_auxSession_sp->reconnect();
*m_auxSession_sp << "delete from " << m_configuration_sp->getAuxDatabaseSchema()
<< "." << m_configuration_sp->getAuxDatabaseFailedTable()
<< " where device_name like :deviceName and file_version = :fileVersion"
<< " and file_name like :fileName", soci::use(m_deviceName, "deviceName"),
soci::use(fileVersion, "fileVersion"), soci::use(fileName, "fileName");
}
//==============================================================================
//==============================================================================
DBManager::FailedFileRowsetSP DBManager::retrieveFailedFiles()
throw(soci::soci_error)
{
DEBUG_STREAM << "DBManager::retrieveFailedFiles()" << endl;
boost::mutex::scoped_lock lock(m_sessionMutex);
if(m_auxSession_sp->get_backend() == NULL)
m_auxSession_sp->reconnect();
FailedFileRowsetSP failedFileRowset_sp(new FailedFileRowset(
m_auxSession_sp->prepare << "select file_version, file_name from "
<< m_configuration_sp->getAuxDatabaseSchema() << "."
<< m_configuration_sp->getAuxDatabaseFailedTable()
<< " where device_name like '" << m_deviceName << "'"));
return failedFileRowset_sp;
}