Skip to content
DBManager.cpp 6.1 KiB
Newer Older
#include <DBManager.h>

#include <ctime>

#include <boost/date_time.hpp>

#include <soci/mysql/soci-mysql.h>

namespace DataImporter_ns
{

//==============================================================================
//      DBManager::DBManager()
//==============================================================================
DBManager::DBManager(Tango::DeviceImpl* deviceImpl_p,
    Configuration::SP configuration_sp) : Tango::LogAdapter(deviceImpl_p),
    m_configuration_sp(configuration_sp)
{
    DEBUG_STREAM << "DBManager::DBManager()" << endl;

    m_deviceName = deviceImpl_p->get_name_lower();

    m_session_sp.reset(new soci::session);

    m_tangoDB_sp.reset(new Tango::Database);
}

//==============================================================================
//      DBManager::DBManager()
//==============================================================================
DBManager::~DBManager()
{
    DEBUG_STREAM << "DBManager::~DBManager()" << endl;

    m_session_sp->close();
}

//==============================================================================
//      DBManager::DBManager()
//==============================================================================
DBManager::SP DBManager::create(Tango::DeviceImpl* deviceImpl_p,
    Configuration::SP configuration_sp)
{
    DBManager::SP d_sp(new DBManager(deviceImpl_p, configuration_sp),
        DBManager::Deleter());

    return d_sp;
}

//==============================================================================
//      DBManager::connect()
//==============================================================================
void DBManager::connect() throw(soci::soci_error)
{
    DEBUG_STREAM << "DBManager::connect()" << endl;

    boost::mutex::scoped_lock lock(m_sessionMutex);

    std::stringstream connection;
    connection << " host=" << m_configuration_sp->getDatabaseHost();
    connection << " port=" << m_configuration_sp->getDatabasePort();
    connection << " user=" << m_configuration_sp->getDatabaseUsername();
    connection << " password=" << m_configuration_sp->getDatabasePassword();

    #ifdef VERBOSE_DEBUG
    INFO_STREAM << "MAIN CONNECTION: " << connection.str() << endl;
    #endif

    m_session_sp->open(soci::mysql, connection.str());
}

//==============================================================================
//      DBManager::disconnect()
//==============================================================================
void DBManager::disconnect()
{
    DEBUG_STREAM << "DBManager::disconnect()" << endl;

    boost::mutex::scoped_lock lock(m_sessionMutex);
//==============================================================================
//      DBManager::retrieveLastTimestamp()
//==============================================================================
std::tm DBManager::retrieveLastTimestamp()
    throw(std::runtime_error, std::out_of_range)
{
    DEBUG_STREAM << "DBManager::retrieveLastTimestamp()" << endl;

    boost::mutex::scoped_lock lock(m_tangoDBMutex);

    Tango::DbData db_data;
    db_data.push_back(Tango::DbDatum("LastTimestamp"));

    try
    {
        m_tangoDB_sp->get_device_property(m_deviceName, db_data);
    }
    catch(Tango::DevFailed& ex)
    {
        std::stringstream error_msg;
        for (unsigned int i=0; i<ex.errors.length(); i++)
        {
                error_msg << ex.errors[i].reason.in() << endl;
                error_msg << ex.errors[i].desc.in() << endl;
                error_msg << ex.errors[i].origin.in() << endl;
        }
        throw std::runtime_error(error_msg.str());
    }

    std::string timestamp("1970-01-01 00:00:00");
    db_data[0] >> timestamp;

    boost::posix_time::ptime ptime = boost::posix_time::time_from_string(timestamp);

    return boost::posix_time::to_tm(ptime);
}

//==============================================================================
//      DBManager::persistLastTimestamp()
//==============================================================================
void DBManager::persistLastTimestamp(std::tm lastTimestamp)
    throw(std::runtime_error, std::out_of_range)
{
    DEBUG_STREAM << "DBManager::persistLastTimestamp()" << endl;

    boost::mutex::scoped_lock lock(m_tangoDBMutex);

    boost::posix_time::ptime ptime = boost::posix_time::ptime_from_tm(lastTimestamp);

    std::string timestampString = boost::posix_time::to_simple_string(ptime);

    Tango::DbDatum timestamp("LastTimestamp");
    timestamp << timestampString;

    Tango::DbData db_data;
    db_data.push_back(timestamp);

    try
    {
        m_tangoDB_sp->put_device_property(m_deviceName, db_data);
    }
    catch(Tango::DevFailed& ex)
    {
        std::stringstream error_msg;
        for (unsigned int i=0; i<ex.errors.length(); i++)
        {
                error_msg << ex.errors[i].reason.in() << endl;
                error_msg << ex.errors[i].desc.in() << endl;
                error_msg << ex.errors[i].origin.in() << endl;
        }
        throw std::runtime_error(error_msg.str());
    }
}

//==============================================================================
//      DBManager::retrieveNewTuples()
//==============================================================================
DBManager::FileRowsetSP DBManager::retrieveNewTuples(std::string schema,
    std::string table, std::tm lastTimestamp) throw(soci::soci_error,
    std::out_of_range)
{
    DEBUG_STREAM << "DBManager::retrieveNewTuples()" << endl;

    boost::mutex::scoped_lock lock(m_sessionMutex);

    if(m_session_sp->get_backend() == NULL)
        m_session_sp->reconnect();

    boost::posix_time::ptime timestamp = boost::posix_time::ptime_from_tm(lastTimestamp);

    FileRowsetSP fileRowset_sp(new FileRowset(m_session_sp->prepare << "select"
        << " file_version, file_name, update_time from " << schema << "." << table
        << " where update_time>'" << boost::posix_time::to_iso_string(timestamp)
        << "' order by update_time asc"));
    return fileRowset_sp;
}

//==============================================================================
//      DBManager::retrieveNewTuples()
//==============================================================================