Skip to content
ProtocolManager.cpp 17.3 KiB
Newer Older
#include <ProtocolManager.h>
#include <DataImporter.h>

#include <boost/filesystem.hpp>

namespace DataImporter_ns
{

//==============================================================================
//      ProtocolManager::ProtocolManager()
//==============================================================================
ProtocolManager::ProtocolManager(DataImporter* dataImporter_p,
    Configuration::SP configuration_sp, DBManager::SP dBManager_sp) :
    Tango::LogAdapter(dataImporter_p), m_dataImporter_p(dataImporter_p),
Marco De Marco's avatar
Marco De Marco committed
    m_configuration_sp(configuration_sp), m_dBManager_sp(dBManager_sp)
{
    DEBUG_STREAM << "ProtocolManager::ProtocolManager()" << endl;
}

//==============================================================================
//      ProtocolManager::~ProtocolManager()
//==============================================================================
ProtocolManager::~ProtocolManager()
{
    DEBUG_STREAM << "ProtocolManager::~ProtocolManager()" << endl;
}

//==============================================================================
//      ProtocolManager::create()
//==============================================================================
ProtocolManager::SP ProtocolManager::create(DataImporter* dataImporter_p,
    Configuration::SP configuration_sp, DBManager::SP dBManager_sp)
{
    ProtocolManager::SP d_sp(new ProtocolManager(dataImporter_p, configuration_sp,
        dBManager_sp), ProtocolManager::Deleter());

    return d_sp;
}

//==============================================================================
//      ProtocolManager::setRemoteEndpoint()
//==============================================================================
void ProtocolManager::setRemoteEndpoint(std::string remoteEndpoint)
{
    DEBUG_STREAM << "ProtocolManager::setRemoteEndpoint()" << endl;

    m_remoteEndpoint = remoteEndpoint;
}

//==============================================================================
//      ProtocolManager::updateNewList()
//==============================================================================
void ProtocolManager::updateNewList() throw(std::runtime_error)
    DEBUG_STREAM << "ProtocolManager::updateNewList()" << endl;
    boost::posix_time::ptime m_lastTimestamp =
        m_dBManager_sp->retrieveLastTimestamp();
    DEBUG_STREAM << "ProtocolManager::updateNewList() last timestamp "
        << boost::posix_time::to_simple_string(m_lastTimestamp) << endl;
    m_newFileRowset_sp = m_dBManager_sp->retrieveNewFiles(m_lastTimestamp);
Marco De Marco's avatar
Marco De Marco committed

    m_newFileRowsetIt = m_newFileRowset_sp->begin();
}

//==============================================================================
//      ProtocolManager::updateFailedList()
//==============================================================================
void ProtocolManager::updateFailedList() throw(std::runtime_error)
{
    DEBUG_STREAM << "ProtocolManager::updateFailedList()" << endl;
    m_failedFileRowset_sp = m_dBManager_sp->retrieveFailedFiles();
Marco De Marco's avatar
Marco De Marco committed
    m_failedFileRowsetIt = m_failedFileRowset_sp->begin();
}

//==============================================================================
//      ProtocolManager::hasNextNewList()
//==============================================================================
bool ProtocolManager::hasNextNewList()
    DEBUG_STREAM << "ProtocolManager::hasNextNewList()" << endl;
Marco De Marco's avatar
Marco De Marco committed
    if(m_newFileRowset_sp &&
        m_newFileRowsetIt != m_newFileRowset_sp->end())
    {
        DEBUG_STREAM << "ProtocolManager::hasNextNewList() true" << endl;
Marco De Marco's avatar
Marco De Marco committed
        return true;
    }
    else
    {
        DEBUG_STREAM << "ProtocolManager::hasNextNewList() false" << endl;
        return false;
    }
}

//==============================================================================
//      ProtocolManager::hasNextFailedList()
//==============================================================================
bool ProtocolManager::hasNextFailedList()
{
    DEBUG_STREAM << "ProtocolManager::hasNextFailedList()" << endl;

    if(m_failedFileRowset_sp &&
Marco De Marco's avatar
Marco De Marco committed
        m_failedFileRowsetIt != m_failedFileRowset_sp->end())
    {
        DEBUG_STREAM << "ProtocolManager::hasNextFailedList() true" << endl;
        return true;
    }
    else
    {
        DEBUG_STREAM << "ProtocolManager::hasNextFailedList() false" << endl;
        return false;
    }
}

//==============================================================================
//      ProtocolManager::isRecoveryTimeElapsed()
//==============================================================================
bool ProtocolManager::isRecoveryTimeElapsed()
{
    DEBUG_STREAM << "ProtocolManager::isRecoveryTimeElapsed()" << endl;

    boost::posix_time::ptime now(boost::posix_time::second_clock::local_time());

    if(m_recoveryModeTime.is_not_a_date_time())
        m_recoveryModeTime = now;

    boost::posix_time::time_duration diff = now - m_recoveryModeTime;

Marco De Marco's avatar
Marco De Marco committed
    DEBUG_STREAM << "ProtocolManager::isRecoveryTimeElapsed() " << diff.total_seconds()
        << "/" << (int)m_configuration_sp->getRecoveryTime() << endl;

    if(diff.total_seconds() > (int)m_configuration_sp->getRecoveryTime())
    {
        DEBUG_STREAM << "ProtocolManager::isRecoveryTimeElapsed() true" << endl;
Marco De Marco's avatar
Marco De Marco committed
        m_recoveryModeTime = now;
        return true;
    }
    else
    {
        DEBUG_STREAM << "ProtocolManager::isRecoveryTimeElapsed() false" << endl;
        return false;
//==============================================================================
//      ProtocolManager::getRecoveryMode()
//==============================================================================
bool ProtocolManager::getRecoveryMode()
{
    DEBUG_STREAM << "ProtocolManager::getRecoveryMode()" << endl;
    return m_recoveryMode;
//==============================================================================
//      ProtocolManager::setRecoveryMode()
//==============================================================================
void ProtocolManager::setRecoveryMode(bool recoveryMode)
{
    DEBUG_STREAM << "ProtocolManager::setRecoveryMode()" << endl;

    m_recoveryMode = recoveryMode;
}

//==============================================================================
//      ProtocolManager::createNewListRequest()
//==============================================================================
RequestSP ProtocolManager::createNewListRequest()
    throw(std::logic_error, std::runtime_error)
    DEBUG_STREAM << "ProtocolManager::createNewListRequest()" << endl;

    if(!m_newFileRowset_sp ||
        m_newFileRowsetIt == m_newFileRowset_sp->end())
            throw std::runtime_error("New list not initialized or empty");

    RequestSP request_sp(new Request);

    request_sp->set_username(m_configuration_sp->getDatabaseUsername());
    request_sp->set_password(m_configuration_sp->getDatabasePassword());
    request_sp->set_schema(m_configuration_sp->getDatabaseSchema());
    request_sp->set_table(m_configuration_sp->getDatabaseTable());
    if(!m_newFileRowsetIt->get<0>())
        throw std::invalid_argument("Empty file version found on new list");
    int fileVersion = m_newFileRowsetIt->get<0>().get();
    if(!m_newFileRowsetIt->get<1>())
        throw std::invalid_argument("Empty file name found on new list");
    std::string fileName = m_newFileRowsetIt->get<1>().get();
    request_sp->set_file_version(fileVersion);
    request_sp->set_file_name(fileName);
    INFO_STREAM << "ProtocolManager::createNewListRequest() file " << fileName
        << " version " << fileVersion << " to " << m_remoteEndpoint << endl;
    if(!request_sp->IsInitialized())
        throw std::runtime_error("Request not initialized");
    return request_sp;
}
//==============================================================================
//      ProtocolManager::createFailedListRequest()
//==============================================================================
RequestSP ProtocolManager::createFailedListRequest()
    throw(std::logic_error, std::runtime_error)
{
    DEBUG_STREAM << "ProtocolManager::createFailedListRequest()" << endl;
    if(!m_failedFileRowset_sp ||
        m_failedFileRowsetIt == m_failedFileRowset_sp->end())
            throw std::runtime_error("Failed list not initialized or empty");

    RequestSP request_sp(new Request);

    request_sp->set_username(m_configuration_sp->getDatabaseUsername());
    request_sp->set_password(m_configuration_sp->getDatabasePassword());

    request_sp->set_schema(m_configuration_sp->getDatabaseSchema());
    request_sp->set_table(m_configuration_sp->getDatabaseTable());

    if(!m_failedFileRowsetIt->get<0>())
        throw std::invalid_argument("Empty file version found on failed list");
    int fileVersion = m_failedFileRowsetIt->get<0>().get();

    if(!m_failedFileRowsetIt->get<1>())
        throw std::invalid_argument("Empty file name found on failed list");
    std::string fileName = m_failedFileRowsetIt->get<1>().get();
    request_sp->set_file_version(fileVersion);
    request_sp->set_file_name(fileName);
    INFO_STREAM << "ProtocolManager::createFailedListRequest() file " << fileName
        << " version " << fileVersion << " to " << m_remoteEndpoint << endl;

    if(!request_sp->IsInitialized())
        throw std::runtime_error("Request not initialized");

    return request_sp;
//==============================================================================
Marco De Marco's avatar
Marco De Marco committed
//      ProtocolManager::processResponse()
//==============================================================================
Marco De Marco's avatar
Marco De Marco committed
FileWrapper::SP ProtocolManager::processResponse(ResponseSP response_sp)
    throw(std::logic_error, std::runtime_error)
Marco De Marco's avatar
Marco De Marco committed
    DEBUG_STREAM << "ProtocolManager::processResponse()" << endl;
    if(!response_sp->IsInitialized())
        throw std::runtime_error("Response not initialized");

    if(response_sp->state() == Response::REQUEST_ACCEPTED)
    {
        std::string filePath = response_sp->file_path();
        if(filePath.empty())
            throw std::invalid_argument("Empty file path received");
        int fileVersion = response_sp->file_version();
        std::string fileName = response_sp->file_name();
        if(fileName.empty())
            throw std::invalid_argument("Empty file path received");
        boost::uint64_t fileSize = response_sp->file_size();
        INFO_STREAM << "ProtocolManager::processResponse()  transfer file "
            << fileName << " version " << fileVersion << " size " << fileSize
            << " from " << m_remoteEndpoint << endl;
        return FileWrapper::create(m_dataImporter_p,
            m_configuration_sp->getStoragePath(), filePath,
            fileVersion, fileName, fileSize);
    }
    else if(response_sp->state() == Response::METADATA_NOT_FOUND ||
        response_sp->state() == Response::FILE_NOT_DOWNLOADED ||
        response_sp->state() == Response::FILE_NOT_FOUND)
    {
        throw std::logic_error(response_sp->status());
    }
    else
        throw std::runtime_error(response_sp->status());
Marco De Marco's avatar
Marco De Marco committed
}
Marco De Marco's avatar
Marco De Marco committed
//==============================================================================
//      ProtocolManager::setNewFileTransfered()
Marco De Marco's avatar
Marco De Marco committed
//==============================================================================
void ProtocolManager::setNewFileTransfered(FileWrapper::SP fileWrapper_sp)
    throw(std::logic_error, std::runtime_error)
Marco De Marco's avatar
Marco De Marco committed
{
    DEBUG_STREAM << "ProtocolManager::setNewFileTransfered()" << endl;

    if(!m_newFileRowset_sp ||
        m_newFileRowsetIt == m_newFileRowset_sp->end())
            throw std::runtime_error("New list not initialized or empty");

    std::string storagePath = fileWrapper_sp->getStoragePath();
    std::string filePath = fileWrapper_sp->getFilePath();

    if(!m_newFileRowsetIt->get<0>())
        throw std::invalid_argument("Empty file version found on new list");
    int fileVersion = m_newFileRowsetIt->get<0>().get();
    if(!m_newFileRowsetIt->get<1>())
        throw std::invalid_argument("Empty file name found on new list");
    std::string fileName = m_newFileRowsetIt->get<1>().get();
    if(!m_newFileRowsetIt->get<2>())
        throw std::invalid_argument("Empty update time found on new list");
    std::tm currentTm = m_newFileRowsetIt->get<2>().get();
    INFO_STREAM << "ProtocolManager::setNewFileTransfered() file "
        << fileName << " version " << fileVersion << " transfered" << endl;
    boost::posix_time::ptime currentPtime =
        boost::posix_time::ptime_from_tm(currentTm);
    boost::posix_time::ptime nextPtime(boost::posix_time::pos_infin);
    //FIXME: not incremented in case of exception!!!
    ++m_newFileRowsetIt;
    if(m_newFileRowsetIt != m_newFileRowset_sp->end())
    {
        if(!m_newFileRowsetIt->get<2>())
                throw std::invalid_argument("Empty next update time found on new list");
            std::tm nextTm = m_newFileRowsetIt->get<2>().get();
            nextPtime =boost::posix_time::ptime_from_tm(nextTm);
    }
    DBManager::TransactionSP auxTransaction_sp = m_dBManager_sp->getAuxTransaction();
    DBManager::TransactionSP mainTransaction_sp = m_dBManager_sp->getMainTransaction();
    if(nextPtime > currentPtime)
        m_dBManager_sp->persistLastTimestamp(currentPtime);
    m_dBManager_sp->updateNewFilePath(storagePath, filePath, fileVersion, fileName);
    auxTransaction_sp->commit();
    mainTransaction_sp->commit();

    m_dataImporter_p->incrementRegularCounter();
//==============================================================================
//      ProtocolManager::setFailedFileTransfered()
//==============================================================================
void ProtocolManager::setFailedFileTransfered(FileWrapper::SP fileWrapper_sp)
    throw(std::logic_error, std::runtime_error)
{
    DEBUG_STREAM << "ProtocolManager::setFailedFileTransfered()" << endl;
    if(!m_failedFileRowset_sp ||
        m_failedFileRowsetIt == m_failedFileRowset_sp->end())
            throw std::runtime_error("Failed list not initialized or empty");
    std::string storagePath = fileWrapper_sp->getStoragePath();
    std::string filePath = fileWrapper_sp->getFilePath();
    if(!m_failedFileRowsetIt->get<0>())
        throw std::invalid_argument("Empty file version found on failed list");
    int fileVersion = m_failedFileRowsetIt->get<0>().get();
    if(!m_failedFileRowsetIt->get<1>())
        throw std::invalid_argument("Empty file name found on failed list");
    string fileName = m_failedFileRowsetIt->get<1>().get();
    //FIXME: not incremented in case of exception!!!
    ++m_failedFileRowsetIt;
    DBManager::TransactionSP auxTransaction_sp = m_dBManager_sp->getAuxTransaction();
    DBManager::TransactionSP mainTransaction_sp = m_dBManager_sp->getMainTransaction();
    m_dBManager_sp->removeFailedFile(fileVersion, fileName);

    m_dBManager_sp->updateNewFilePath(storagePath, filePath, fileVersion, fileName);

    auxTransaction_sp->commit();
    mainTransaction_sp->commit();

    m_dataImporter_p->decrementFailedCounter();
    m_dataImporter_p->incrementRegularCounter();
}

//==============================================================================
//      ProtocolManager::setNewFileFailed()
//==============================================================================
void ProtocolManager::setNewFileFailed()
    throw(std::logic_error, std::runtime_error)
    DEBUG_STREAM << "ProtocolManager::setNewFileFailed()" << endl;
    if(!m_newFileRowset_sp ||
        m_newFileRowsetIt == m_newFileRowset_sp->end())
            throw std::runtime_error("New list not initialized or empty");
    if(!m_newFileRowsetIt->get<0>())
        throw std::invalid_argument("Empty file version found on new list");
    int fileVersion = m_newFileRowsetIt->get<0>().get();
    if(!m_newFileRowsetIt->get<1>())
        throw std::invalid_argument("Empty file name found on new list");
    string fileName = m_newFileRowsetIt->get<1>().get();
    if(!m_newFileRowsetIt->get<2>())
        throw std::invalid_argument("Empty update time found on new list");
    std::tm currentTm = m_newFileRowsetIt->get<2>().get();

    INFO_STREAM << "ProtocolManager::setFileFailed() file "
        << fileName << " version " << fileVersion << " not transfered" << endl;
    boost::posix_time::ptime currentPtime =
        boost::posix_time::ptime_from_tm(currentTm);
    boost::posix_time::ptime nextPtime(boost::posix_time::pos_infin);
    //FIXME: not incremented in case of exception!!!
    ++m_newFileRowsetIt;
    if(m_newFileRowsetIt != m_newFileRowset_sp->end())
    {
        if(!m_newFileRowsetIt->get<2>())
                throw std::invalid_argument("Empty next update time found on new list");
            std::tm nextTm = m_newFileRowsetIt->get<2>().get();
            nextPtime =boost::posix_time::ptime_from_tm(nextTm);
    }
    DBManager::TransactionSP auxTransaction_sp = m_dBManager_sp->getAuxTransaction();
    DBManager::TransactionSP mainTransaction_sp = m_dBManager_sp->getMainTransaction();
    if(nextPtime > currentPtime)
        m_dBManager_sp->persistLastTimestamp(currentPtime);
    m_dBManager_sp->addFailedFile(fileVersion, fileName);
    auxTransaction_sp->commit();
    mainTransaction_sp->commit();

    m_dataImporter_p->incrementFailedCounter();
//==============================================================================
//      ProtocolManager::setFailedFileFailed()
//==============================================================================
void ProtocolManager::setFailedFileFailed()
    throw(std::logic_error, std::runtime_error)
{
    DEBUG_STREAM << "ProtocolManager::setFailedFileFailed()" << endl;

    ++m_failedFileRowsetIt;
}   //namespace