Skip to content
DBManager.cpp 11.9 KiB
Newer Older
#include <DBManager.h>

#include <ctime>

#include <boost/date_time.hpp>

#include <soci/mysql/soci-mysql.h>

namespace DataImporter_ns
{

//==============================================================================
//      DBManager::DBManager()
//==============================================================================
DBManager::DBManager(Tango::DeviceImpl* deviceImpl_p,
    Configuration::SP configuration_sp) : Tango::LogAdapter(deviceImpl_p),
    m_configuration_sp(configuration_sp)
{
    DEBUG_STREAM << "DBManager::DBManager()" << endl;

    m_deviceName = deviceImpl_p->get_name_lower();

Marco De Marco's avatar
Marco De Marco committed
    m_mainSession_sp.reset(new soci::session);
Marco De Marco's avatar
Marco De Marco committed
    m_auxSession_sp.reset(new soci::session);
}

//==============================================================================
//      DBManager::~DBManager()
//==============================================================================
DBManager::~DBManager()
{
    DEBUG_STREAM << "DBManager::~DBManager()" << endl;

Marco De Marco's avatar
Marco De Marco committed
    m_mainSession_sp->close();

    m_auxSession_sp->close();
}

//==============================================================================
//      DBManager::create()
//==============================================================================
DBManager::SP DBManager::create(Tango::DeviceImpl* deviceImpl_p,
    Configuration::SP configuration_sp)
{
    DBManager::SP d_sp(new DBManager(deviceImpl_p, configuration_sp),
        DBManager::Deleter());

    return d_sp;
}

//==============================================================================
//      DBManager::connect()
//==============================================================================
void DBManager::connectAll() throw(soci::soci_error)
{
    DEBUG_STREAM << "DBManager::connect()" << endl;

    boost::mutex::scoped_lock lock(m_sessionMutex);
    bool reconnect = true;

    std::stringstream connection;
    connection << " host=" << m_configuration_sp->getDatabaseHost();
    connection << " port=" << m_configuration_sp->getDatabasePort();
    connection << " user=" << m_configuration_sp->getDatabaseUsername();
    connection << " password=" << m_configuration_sp->getDatabasePassword();

    #ifdef VERBOSE_DEBUG
        INFO_STREAM << "MAIN CONNECTION: " << connection.str() << endl;
Marco De Marco's avatar
Marco De Marco committed
    m_mainSession_sp->open(soci::mysql, connection.str());

    soci::mysql_session_backend* mainBackend =
            static_cast<soci::mysql_session_backend*>(
            m_mainSession_sp->get_backend());

    MYSQL* mainMysql = mainBackend->conn_;

    mysql_options(mainMysql, MYSQL_OPT_RECONNECT, &reconnect);

Marco De Marco's avatar
Marco De Marco committed
    connection.str("");

    connection << " host=" << m_configuration_sp->getAuxDatabaseHost();
    connection << " port=" << m_configuration_sp->getAuxDatabasePort();
    connection << " user=" << m_configuration_sp->getAuxDatabaseUsername();
    connection << " password=" << m_configuration_sp->getAuxDatabasePassword();

    #ifdef VERBOSE_DEBUG
        INFO_STREAM << "AUX CONNECTION: " << connection.str() << endl;
Marco De Marco's avatar
Marco De Marco committed
    #endif

    m_auxSession_sp->open(soci::mysql, connection.str());

    soci::mysql_session_backend* auxBackend =
            static_cast<soci::mysql_session_backend*>(
            m_auxSession_sp->get_backend());

    MYSQL* auxMysql = auxBackend->conn_;

    mysql_options(auxMysql, MYSQL_OPT_RECONNECT, &reconnect);
}

//==============================================================================
//      DBManager::disconnect()
//==============================================================================
void DBManager::disconnectAll()
{
    DEBUG_STREAM << "DBManager::disconnect()" << endl;

    boost::mutex::scoped_lock lock(m_sessionMutex);
Marco De Marco's avatar
Marco De Marco committed
    m_mainSession_sp->close();

    m_auxSession_sp->close();
}

//==============================================================================
//      DBManager::getMainTransaction()
//==============================================================================
DBManager::TransactionSP DBManager::getMainTransaction()
{
    DEBUG_STREAM << "DBManager::getMainTransaction()" << endl;

    boost::mutex::scoped_lock lock(m_sessionMutex);

    TransactionSP transaction_sp(new soci::transaction(*m_mainSession_sp));

    return transaction_sp;
}

//==============================================================================
//      DBManager::getAuxTransaction()
//==============================================================================
DBManager::TransactionSP DBManager::getAuxTransaction()
{
    DEBUG_STREAM << "DBManager::getAuxTransaction()" << endl;

    boost::mutex::scoped_lock lock(m_sessionMutex);

    TransactionSP transaction_sp(new soci::transaction(*m_auxSession_sp));

    return transaction_sp;
}

//==============================================================================
//      DBManager::retrieveLastTimestamp()
//==============================================================================
boost::posix_time::ptime DBManager::retrieveLastTimestamp()
Marco De Marco's avatar
Marco De Marco committed
    throw(soci::soci_error)
{
    DEBUG_STREAM << "DBManager::retrieveLastTimestamp()" << endl;

Marco De Marco's avatar
Marco De Marco committed
    boost::mutex::scoped_lock lock(m_sessionMutex);

    if(m_auxSession_sp->get_backend() == NULL)
        m_auxSession_sp->reconnect();

   std::tm tm_time;

    *m_auxSession_sp << "select coalesce(max(last_timestamp),'1970-01-01 00:00:00')"
Marco De Marco's avatar
Marco De Marco committed
        << " from "<< m_configuration_sp->getAuxDatabaseSchema()
        << "." << m_configuration_sp->getAuxDatabaseTimestampTable()
        << " where device_name like :deviceName",
        soci::use(m_deviceName, "deviceName"), soci::into(tm_time);

    return boost::posix_time::ptime_from_tm(tm_time);
}

//==============================================================================
//      DBManager::persistLastTimestamp()
//==============================================================================
void DBManager::persistLastTimestamp(boost::posix_time::ptime ptime)
Marco De Marco's avatar
Marco De Marco committed
    throw(soci::soci_error)
{
    DEBUG_STREAM << "DBManager::persistLastTimestamp()" << endl;

Marco De Marco's avatar
Marco De Marco committed
    boost::mutex::scoped_lock lock(m_sessionMutex);

    if(m_auxSession_sp->get_backend() == NULL)
        m_auxSession_sp->reconnect();

    *m_auxSession_sp << "insert into " << m_configuration_sp->getAuxDatabaseSchema()
        << "." << m_configuration_sp->getAuxDatabaseTimestampTable()
        << " (device_name, last_timestamp) values ('" << m_deviceName << "','"
        << boost::posix_time::to_iso_string(ptime) << "') on duplicate key update"
        << " last_timestamp='" << boost::posix_time::to_iso_string(ptime) << "'";
//==============================================================================
//      DBManager::retrieveNewFiles()
//==============================================================================
DBManager::FileRowsetSP DBManager::retrieveNewFiles(boost::posix_time::ptime ptime)
    throw(soci::soci_error)
{
    DEBUG_STREAM << "DBManager::retrieveNewFiles()" << endl;

    boost::mutex::scoped_lock lock(m_sessionMutex);

    if(m_mainSession_sp->get_backend() == NULL)
        m_mainSession_sp->reconnect();

Marco De Marco's avatar
Marco De Marco committed
    std::string selectKey = m_configuration_sp->getSelectKey();
    std::string selectValue = m_configuration_sp->getSelectValue();

    FileRowsetSP newFileRowset_sp;

    if(selectKey.empty())
    {
        newFileRowset_sp.reset(new FileRowset(m_mainSession_sp->prepare
            << "select storage_path, file_path, file_version, file_name, update_time "
            << "from " << m_configuration_sp->getDatabaseSchema() << "."
            << m_configuration_sp->getDatabaseTable() << " where update_time>'"
            << boost::posix_time::to_iso_string(ptime) << "' order by update_time asc"));
    }
    else
    {
        newFileRowset_sp.reset(new FileRowset(m_mainSession_sp->prepare
            << "select storage_path, file_path, file_version, file_name, update_time "
            << "from " << m_configuration_sp->getDatabaseSchema() << "."
            << m_configuration_sp->getDatabaseTable() << " where update_time>'"
            << boost::posix_time::to_iso_string(ptime) << "' and "
            << selectKey << " like '%" << selectValue << "%' "
            << "order by update_time asc"));
    }

    return newFileRowset_sp;
}

//==============================================================================
//      DBManager::updateNewFilePath()
//==============================================================================
void DBManager::updateNewFilePath(std::string storagePath, std::string filePath,
    int fileVersion, std::string fileName) throw(soci::soci_error)
{
    DEBUG_STREAM << "DBManager::updateNewFilePath()" << endl;

    boost::mutex::scoped_lock lock(m_sessionMutex);

    if(m_mainSession_sp->get_backend() == NULL)
        m_mainSession_sp->reconnect();

    *m_mainSession_sp << "update " << m_configuration_sp->getDatabaseSchema()
        << "." << m_configuration_sp->getDatabaseTable()
        << " set storage_path = :storagePath, file_path = :filePath "
        << "  where file_version = :fileVersion and file_name like :fileName",
        soci::use(storagePath, "storagePath"), soci::use(filePath, "filePath"),
        soci::use(fileVersion, "fileVersion"), soci::use(fileName, "fileName");
}

//==============================================================================
Marco De Marco's avatar
Marco De Marco committed
//      DBManager::addFailedFile()
//==============================================================================
Marco De Marco's avatar
Marco De Marco committed
void DBManager::addFailedFile(int fileVersion, std::string fileName)
    throw(soci::soci_error)
Marco De Marco's avatar
Marco De Marco committed
    DEBUG_STREAM << "DBManager::addFailedFile()" << endl;
    boost::mutex::scoped_lock lock(m_sessionMutex);
Marco De Marco's avatar
Marco De Marco committed
    if(m_auxSession_sp->get_backend() == NULL)
        m_auxSession_sp->reconnect();

    *m_auxSession_sp << "insert ignore into "
        << m_configuration_sp->getAuxDatabaseSchema() << "."
        << m_configuration_sp->getAuxDatabaseFailedTable()
        << " (device_name, file_version, file_name) values (:deviceName, "
        << ":fileVersion, :fileName)", soci::use(m_deviceName, "deviceName"),
        soci::use(fileVersion, "fileVersion"), soci::use(fileName, "fileName");
}

//==============================================================================
//      DBManager::removeFailedFile()
//==============================================================================
void DBManager::removeFailedFile(int fileVersion, std::string fileName)
    throw(soci::soci_error)
{
    DEBUG_STREAM << "DBManager::removeFailedFile()" << endl;

    boost::mutex::scoped_lock lock(m_sessionMutex);
Marco De Marco's avatar
Marco De Marco committed
    if(m_auxSession_sp->get_backend() == NULL)
        m_auxSession_sp->reconnect();
Marco De Marco's avatar
Marco De Marco committed
    *m_auxSession_sp << "delete from " << m_configuration_sp->getAuxDatabaseSchema()
        << "." << m_configuration_sp->getAuxDatabaseFailedTable()
        << " where device_name like  :deviceName and file_version = :fileVersion"
        << " and  file_name like :fileName", soci::use(m_deviceName, "deviceName"),
        soci::use(fileVersion, "fileVersion"), soci::use(fileName, "fileName");
}

//==============================================================================
Marco De Marco's avatar
Marco De Marco committed
//      DBManager::retrieveFailedFiles()
//==============================================================================
DBManager::FileRowsetSP DBManager::retrieveFailedFiles()
Marco De Marco's avatar
Marco De Marco committed
    throw(soci::soci_error)
{
    DEBUG_STREAM << "DBManager::retrieveFailedFiles()" << endl;

    boost::mutex::scoped_lock lock(m_sessionMutex);

    if(m_auxSession_sp->get_backend() == NULL)
        m_auxSession_sp->reconnect();

    FileRowsetSP failedFileRowset_sp(new FileRowset(
        m_auxSession_sp->prepare << "select m.storage_path, m.file_path, "
        << " m.file_version, m.file_name, m.update_time from "
Marco De Marco's avatar
Marco De Marco committed
        << m_configuration_sp->getDatabaseSchema() << "."
        << m_configuration_sp->getDatabaseTable() << " as m join "
Marco De Marco's avatar
Marco De Marco committed
        << m_configuration_sp->getAuxDatabaseSchema() << "."
        << m_configuration_sp->getAuxDatabaseFailedTable() << " as f "
        << "on f.file_version = m.file_version and f.file_name = m.file_name "
        << "where device_name like '" << m_deviceName << "'"));
Marco De Marco's avatar
Marco De Marco committed

    return failedFileRowset_sp;
}