Skip to content
ProtocolManager.cpp 11.2 KiB
Newer Older
#include <ProtocolManager.h>
#include <boost/filesystem/v3/path.hpp>
#include <stdexcept>

namespace DataImporter_ns
{

//==============================================================================
//      ProtocolManager::ProtocolManager()
//==============================================================================
ProtocolManager::ProtocolManager(Tango::DeviceImpl* deviceImpl_p,
    Configuration::SP configuration_sp, DBManager::SP dBManager_sp) :
Marco De Marco's avatar
Marco De Marco committed
    Tango::LogAdapter(deviceImpl_p), m_deviceImpl_p(deviceImpl_p),
    m_configuration_sp(configuration_sp), m_dBManager_sp(dBManager_sp)
{
    DEBUG_STREAM << "ProtocolManager::ProtocolManager()" << endl;
}

//==============================================================================
//      ProtocolManager::~ProtocolManager()
//==============================================================================
ProtocolManager::~ProtocolManager()
{
    DEBUG_STREAM << "ProtocolManager::~ProtocolManager()" << endl;
}

//==============================================================================
//      ProtocolManager::create()
//==============================================================================
ProtocolManager::SP ProtocolManager::create(Tango::DeviceImpl* deviceImpl_p,
    Configuration::SP configuration_sp, DBManager::SP dBManager_sp)
{
    ProtocolManager::SP d_sp(new ProtocolManager(deviceImpl_p, configuration_sp,
        dBManager_sp), ProtocolManager::Deleter());

    return d_sp;
}

//==============================================================================
//      ProtocolManager::setRemoteEndpoint()
//==============================================================================
void ProtocolManager::setRemoteEndpoint(std::string remoteEndpoint)
{
    DEBUG_STREAM << "ProtocolManager::setRemoteEndpoint()" << endl;

    m_remoteEndpoint = remoteEndpoint;
}

//==============================================================================
//      ProtocolManager::updateFileLists()
//==============================================================================
Marco De Marco's avatar
Marco De Marco committed
void ProtocolManager::updateFileLists() throw(soci::soci_error)
Marco De Marco's avatar
Marco De Marco committed
    DEBUG_STREAM << "ProtocolManager::updateFileLists()" << endl;
    boost::posix_time::ptime m_lastTimestamp =
        m_dBManager_sp->retrieveLastTimestamp();
Marco De Marco's avatar
Marco De Marco committed
    DEBUG_STREAM << "ProtocolManager::updateFileLists() last timestamp "
        << boost::posix_time::to_simple_string(m_lastTimestamp) << endl;
    m_newFileRowset_sp = m_dBManager_sp->retrieveNewFiles(m_lastTimestamp);
    m_newFileRowsetIt = m_newFileRowset_sp->begin();
    m_failedFileRowset_sp = m_dBManager_sp->retrieveFailedFiles();
    m_failedFileRowsetIt = m_failedFileRowset_sp->begin();
}

//==============================================================================
//      ProtocolManager::hasNextFile()
//==============================================================================
bool ProtocolManager::hasNextFile()
    DEBUG_STREAM << "ProtocolManager::hasNextFile()" << endl;
Marco De Marco's avatar
Marco De Marco committed
    if(m_newFileRowset_sp &&
        m_newFileRowsetIt != m_newFileRowset_sp->end())
    {
        DEBUG_STREAM << "ProtocolManager::hasNextFile() from new list" << endl;
Marco De Marco's avatar
Marco De Marco committed

        m_recoveryMode = false;
        return true;
    }
    else if(m_failedFileRowset_sp &&
        m_failedFileRowsetIt != m_failedFileRowset_sp->end())
    {
        DEBUG_STREAM << "ProtocolManager::hasNextFile() from failed list" << endl;
Marco De Marco's avatar
Marco De Marco committed

        m_recoveryMode = true;
        return true;
    }
    else
    {
        DEBUG_STREAM << "ProtocolManager::hasNextFile() lists empty" << endl;
        m_recoveryMode = false;
        return false;
    }
//==============================================================================
//      ProtocolManager::markAsCompleted()
//==============================================================================
void ProtocolManager::markAsCompleted()
{
    DEBUG_STREAM << "ProtocolManager::markAsCompleted()" << endl;

    if(!m_recoveryMode)
    {
        if(!m_newFileRowset_sp ||
            m_newFileRowsetIt == m_newFileRowset_sp->end())
                throw std::runtime_error("New list not initialized or empty");

        if(!m_newFileRowsetIt->get<0>())
            throw std::invalid_argument("Empty file version found on new list");
        int fileVersion = m_newFileRowsetIt->get<0>().get();

        if(!m_newFileRowsetIt->get<1>())
            throw std::invalid_argument("Empty file name found on new list");
        std::string fileName = m_newFileRowsetIt->get<1>().get();

        if(!m_newFileRowsetIt->get<2>())
            throw std::invalid_argument("Empty update time found on new list");
        std::tm update_time = m_newFileRowsetIt->get<2>().get();

        INFO_STREAM << "ProtocolManager::createRequest() mark completed "
            << fileName << " version " << fileVersion << endl;

        boost::posix_time::ptime current_time =
            boost::posix_time::ptime_from_tm(update_time);

        INFO_STREAM << "ProtocolManager::createRequest() "
            << boost::posix_time::to_simple_string(current_time) << endl;

        m_dBManager_sp->persistLastTimestamp(current_time);
    }
    else
    {
        ERROR_STREAM << "ProtocolManager::createRequest() mark failed list" << endl;
    }
}

//==============================================================================
//      ProtocolManager::markAsFailed()
//==============================================================================
void ProtocolManager::markAsFailed()
{
    DEBUG_STREAM << "ProtocolManager::markAsFailed()" << endl;


}

//==============================================================================
Marco De Marco's avatar
Marco De Marco committed
//      ProtocolManager::nextFile()
//==============================================================================
Marco De Marco's avatar
Marco De Marco committed
void ProtocolManager::nextFile()
Marco De Marco's avatar
Marco De Marco committed
    DEBUG_STREAM << "ProtocolManager::nextFile()" << endl;

    if(!m_recoveryMode)
    {
        if(m_newFileRowset_sp &&
            m_newFileRowsetIt != m_newFileRowset_sp->end())
        {
            DEBUG_STREAM << "ProtocolManager::nextFile() new list" << endl;
Marco De Marco's avatar
Marco De Marco committed

            ++m_newFileRowsetIt;
        }
    }
    else
    {
        if(m_failedFileRowset_sp &&
                m_failedFileRowsetIt != m_failedFileRowset_sp->end())
        {
            DEBUG_STREAM << "ProtocolManager::nextFile() from failed list" << endl;
Marco De Marco's avatar
Marco De Marco committed

            ++m_failedFileRowsetIt;
        }
    }
//==============================================================================
//      ProtocolManager::createRequest()
//==============================================================================
RequestSP ProtocolManager::createRequest()
    throw(std::logic_error, std::runtime_error)
    DEBUG_STREAM << "ProtocolManager::createRequest()" << endl;

    RequestSP request_sp(new Request);

    request_sp->set_username(m_configuration_sp->getDatabaseUsername());
    request_sp->set_password(m_configuration_sp->getDatabasePassword());
    request_sp->set_schema(m_configuration_sp->getDatabaseSchema());
    request_sp->set_table(m_configuration_sp->getDatabaseTable());
Marco De Marco's avatar
Marco De Marco committed
    int fileVersion;
    std::string fileName;

    if(!m_recoveryMode)
    {
        if(!m_newFileRowset_sp ||
            m_newFileRowsetIt == m_newFileRowset_sp->end())
                throw std::runtime_error("New list not initialized or empty");

Marco De Marco's avatar
Marco De Marco committed
        if(!m_newFileRowsetIt->get<0>())
            throw std::invalid_argument("Empty file version found on new list");
Marco De Marco's avatar
Marco De Marco committed
        fileVersion = m_newFileRowsetIt->get<0>().get();
Marco De Marco's avatar
Marco De Marco committed
        if(!m_newFileRowsetIt->get<1>())
            throw std::invalid_argument("Empty file name found on new list");
Marco De Marco's avatar
Marco De Marco committed
        fileName = m_newFileRowsetIt->get<1>().get();
        INFO_STREAM << "ProtocolManager::createRequest() request new file "
Marco De Marco's avatar
Marco De Marco committed
            << fileName << " version " << fileVersion << endl;
    }
    else
    {
        if(!m_failedFileRowset_sp ||
            m_failedFileRowsetIt == m_failedFileRowset_sp->end())
                throw std::runtime_error("Failed list not initialized or empty");

Marco De Marco's avatar
Marco De Marco committed
        if(!m_failedFileRowsetIt->get<0>())
            throw std::invalid_argument("Empty file version found on failed list");
Marco De Marco's avatar
Marco De Marco committed
        fileVersion = m_failedFileRowsetIt->get<0>().get();
Marco De Marco's avatar
Marco De Marco committed
        if(!m_failedFileRowsetIt->get<1>())
            throw std::invalid_argument("Empty file name found on failed list");
Marco De Marco's avatar
Marco De Marco committed
        fileName = m_failedFileRowsetIt->get<1>().get();
        INFO_STREAM << "ProtocolManager::createRequest() request failed file "
Marco De Marco's avatar
Marco De Marco committed
            << fileName << " version " << fileVersion << endl;
    }
    request_sp->set_file_version(fileVersion);
    request_sp->set_file_name(fileName);
    if(!request_sp->IsInitialized())
        throw std::runtime_error("Request not initialized");

    return request_sp;
//==============================================================================
Marco De Marco's avatar
Marco De Marco committed
//      ProtocolManager::processResponse()
//==============================================================================
Marco De Marco's avatar
Marco De Marco committed
FileWrapper::SP ProtocolManager::processResponse(ResponseSP response_sp)
    throw(std::logic_error, std::runtime_error)
Marco De Marco's avatar
Marco De Marco committed
    DEBUG_STREAM << "ProtocolManager::processResponse()" << endl;
    if(!response_sp->IsInitialized())
        throw std::runtime_error("Response not initialized");

Marco De Marco's avatar
Marco De Marco committed
    std::string filePath = response_sp->file_path();

    if(filePath.empty())
        throw std::invalid_argument("Empty file path received");

Marco De Marco's avatar
Marco De Marco committed
    int fileVersion = response_sp->file_version();
Marco De Marco's avatar
Marco De Marco committed
    std::string fileName =  response_sp->file_name();

    if(fileName.empty())
        throw std::invalid_argument("Empty file path received");

Marco De Marco's avatar
Marco De Marco committed
    boost::uint64_t fileSize = response_sp->file_size();
    boost::filesystem::path destPath = composePath(
        m_configuration_sp->getStoragePath(), filePath, fileVersion);

    if(!boost::filesystem::exists(destPath))
            boost::filesystem::create_directories(destPath);

    if(!boost::filesystem::is_directory(destPath))
        throw std::invalid_argument("Destination path \'"
            + destPath.string() + "\' is not a directory" );

    FileWrapper::SP fileWrapper_sp;

    if(response_sp->state() == Response::REQUEST_ACCEPTED)
    {
        INFO_STREAM << "ProtocolManager::processResponse()  transfer file "
            << fileName << " version " << fileVersion << " size " << fileSize << endl;
        destPath /= fileName;
        fileWrapper_sp = FileWrapper::create(m_deviceImpl_p, destPath, fileSize);
    }
    else if(response_sp->state() == Response::METADATA_NOT_FOUND ||
        response_sp->state() == Response::FILE_NOT_DOWNLOADED ||
        response_sp->state() == Response::FILE_NOT_FOUND)
    {
        throw std::logic_error(response_sp->status());
    }
    else
    {
        throw std::runtime_error(response_sp->status());
    }
    return fileWrapper_sp;
Marco De Marco's avatar
Marco De Marco committed
}
Marco De Marco's avatar
Marco De Marco committed
//==============================================================================
//      ProtocolManager::composePath()
//==============================================================================
boost::filesystem::path ProtocolManager::composePath(std::string storagePath,
     std::string filePath, int fileVersion)
Marco De Marco's avatar
Marco De Marco committed
{
    DEBUG_STREAM << "ProtocolManager::composePath()" << endl;
    boost::filesystem::path path(storagePath);
    path /= filePath;
Marco De Marco's avatar
Marco De Marco committed
    std::stringstream fileStream;
    fileStream << "/" << fileVersion;
    path /= fileStream.str();
    DEBUG_STREAM << "ProtocolManager::composePath() \'" << path << "\'" << endl;
    return path;
}   //namespace