Skip to content
ProtocolManager.cpp 14.2 KiB
Newer Older
#include <ProtocolManager.h>
#include <boost/filesystem/v3/path.hpp>

namespace DataImporter_ns
{

//==============================================================================
//      ProtocolManager::ProtocolManager()
//==============================================================================
ProtocolManager::ProtocolManager(Tango::DeviceImpl* deviceImpl_p,
    Configuration::SP configuration_sp, DBManager::SP dBManager_sp) :
    Tango::LogAdapter(deviceImpl_p), m_configuration_sp(configuration_sp),
    m_dBManager_sp(dBManager_sp)
{
    DEBUG_STREAM << "ProtocolManager::ProtocolManager()" << endl;

    m_isAuthorised = false;
    m_isValidated = false;
    m_isTransferRequest = false;
}

//==============================================================================
//      ProtocolManager::ProtocolManager()
//==============================================================================
ProtocolManager::~ProtocolManager()
{
    DEBUG_STREAM << "ProtocolManager::~ProtocolManager()" << endl;
}

//==============================================================================
//      ProtocolManager::ProtocolManager()
//==============================================================================
ProtocolManager::SP ProtocolManager::create(Tango::DeviceImpl* deviceImpl_p,
    Configuration::SP configuration_sp, DBManager::SP dBManager_sp)
{
    ProtocolManager::SP d_sp(new ProtocolManager(deviceImpl_p, configuration_sp,
        dBManager_sp), ProtocolManager::Deleter());

    return d_sp;
}

//==============================================================================
//      ProtocolManager::ProtocolManager()
//==============================================================================
void ProtocolManager::setRemoteEndpoint(std::string remoteEndpoint)
{
    DEBUG_STREAM << "ProtocolManager::setRemoteEndpoint()" << endl;

    m_remoteEndpoint = remoteEndpoint;
}

//==============================================================================
//      ProtocolManager::resetProtocolStatus()
//==============================================================================
void ProtocolManager::resetProtocolStatus()
{
    DEBUG_STREAM << "ProtocolManager::resetProtocolStatus()" << endl;

    m_isAuthorised = false;
    m_isValidated = false;
    m_isTransferRequest = false;
}

//==============================================================================
//      ProtocolManager::createRequest()
//==============================================================================
RequestSP ProtocolManager::createRequest()
    throw(std::runtime_error, std::out_of_range)
{
    DEBUG_STREAM << "ProtocolManager::createRequest()" << endl;

    if(!m_rowSet_sp)
    {
        boost::posix_time::ptime m_lastTimestamp =
            m_dBManager_sp->retrieveLastTimestamp();

        DEBUG_STREAM << "ProtocolManager::createRequest() last timestamp "
            << boost::posix_time::to_simple_string(m_lastTimestamp) << endl;

        m_rowSet_sp = m_dBManager_sp->retrieveNewFile(
            m_configuration_sp->getDatabaseSchema(),
            m_configuration_sp->getDatabaseTable(), m_lastTimestamp);

        m_it = m_rowSet_sp->begin();
    }

    RequestSP request_sp;

    if(!m_isAuthorised)
    {
        request_sp = createAuthroisation();
    else if(!m_isValidated)
    {
        request_sp = createValidation();
    }
    else if(m_rowSet_sp && m_it != m_rowSet_sp->end())
    {
        request_sp = createTransfer();
    }
        request_sp = createKeepAlive();
    }

    if(!request_sp->IsInitialized())
        throw std::runtime_error("Not initialized request!");

    return request_sp;
}

//==============================================================================
//      ProtocolManager::processResponse()
//==============================================================================
void ProtocolManager::processResponse(ResponseSP response_sp)
    throw(std::runtime_error, std::out_of_range)
{
    DEBUG_STREAM << "ProtocolManager::processResponse()" << endl;

    if(!response_sp->IsInitialized())
        throw std::runtime_error("Not initialized response!");

    switch(response_sp->type())
    {
        case Response::AUTHORIZATION:
        {
            processAuthroisation(response_sp);
            break;
        }
        case Response::VALIDATION:
        {
            processValidation(response_sp);
            break;
        }
        case Response::TRANSFER:
        {
            processTransfer(response_sp);
            break;
        }
        case Response::KEEPALIVE:
            processKeepAlive(response_sp);
            break;
        }
        default:
            throw std::runtime_error("Unknown response type");
    }
}

//==============================================================================
//      ProtocolManager::isTransferRequest()
//==============================================================================
bool ProtocolManager::isTransferRequest()
{
    DEBUG_STREAM << "ProtocolManager::isTransferRequest()" << endl;

    return m_isTransferRequest;
}

//==============================================================================
//      ProtocolManager::getFileDir()
//==============================================================================
std::string ProtocolManager::getFileDir()
{
    DEBUG_STREAM << "ProtocolManager::getFileDir()" << endl;

    return m_fileDir;
}

//==============================================================================
//      ProtocolManager::getFileName()
//==============================================================================
std::string ProtocolManager::getFileName()
{
    DEBUG_STREAM << "ProtocolManager::getFileName()" << endl;

    return m_fileName;
}

//==============================================================================
//      ProtocolManager::getFileSize()
//==============================================================================
int ProtocolManager::getFileSize()
{
    DEBUG_STREAM << "ProtocolManager::getFileSize()" << endl;

    return m_fileSize;
}

//==============================================================================
//      ProtocolManager::createAuthroisation()
//==============================================================================
RequestSP ProtocolManager::createAuthroisation() throw(std::runtime_error)
{
    DEBUG_STREAM << "ProtocolManager::createAuthroisation()" << endl;

    RequestSP request_sp(new Request);

    request_sp->set_type(Request::AUTHORIZATION);

    std::string user = m_configuration_sp->getRemoteUsername();
    std::string password = m_configuration_sp->getRemotePassword();

    #ifdef VERBOSE_DEBUG
        INFO_STREAM << "ProtocolManager::createAuthroisation() Send username "
            << user << " password " << password << " to " << m_remoteEndpoint << endl;
    #else
        INFO_STREAM << "ProtocolManager::createAuthroisation() Send to "
            << m_remoteEndpoint << endl;
    #endif

    Request::Authorization* authorization = request_sp->mutable_authorization();
    authorization->set_username(user);
    authorization->set_password(password);

    return request_sp;
}

//==============================================================================
//      ProtocolManager::createValidation()
//==============================================================================
RequestSP ProtocolManager::createValidation() throw(std::runtime_error)
{
    DEBUG_STREAM << "ProtocolManager::createValidation()" << endl;

    RequestSP request_sp(new Request);

    request_sp->set_type(Request::VALIDATION);

    std::string schema = m_configuration_sp->getDatabaseSchema();
    std::string table = m_configuration_sp->getDatabaseTable();

    INFO_STREAM << "ProtocolManager::createValidation() Send schema "
        << schema << " table " << table << " to " << m_remoteEndpoint << endl;

    Request::Validation* validation = request_sp->mutable_validation();
    validation->set_schema(schema);
    validation->set_table(table);

    return request_sp;
}

//==============================================================================
//      ProtocolManager::createTransfer()
//==============================================================================
RequestSP ProtocolManager::createTransfer() throw(std::runtime_error,
    std::out_of_range)
    DEBUG_STREAM << "ProtocolManager::createTransfer()" << endl;

    RequestSP request_sp(new Request);

    request_sp->set_type(Request::TRANSFER);
    if(!m_it->get<0>())
        throw std::runtime_error("Empty file version found");
    int fileVersion = m_it->get<0>().get();

    if(!m_it->get<1>())
        throw std::runtime_error("Empty file name found");
    std::string fileName = m_it->get<1>().get();

    if(!m_it->get<2>())
        throw std::runtime_error("Empty update time found");
    std::tm tmNew = m_it->get<2>().get();

    boost::posix_time::ptime ptNew = boost::posix_time::ptime_from_tm(tmNew);

    INFO_STREAM << "ProtocolManager::createTransfer() request file "
        << fileName << " version " << fileVersion << " "
        << boost::posix_time::to_simple_string(ptNew) << endl;

    Request::Transfer* transfer = request_sp->mutable_transfer();
        transfer->set_file_version(fileVersion);
        transfer->set_file_name(fileName);

    if(m_currentTimestamp.is_not_a_date_time())
        m_currentTimestamp = ptNew;

    if(ptNew > m_currentTimestamp)
    {
        INFO_STREAM << "ProtocolManager::createTransfer() save ["
            << boost::posix_time::to_simple_string(ptNew) << " > "
            << boost::posix_time::to_simple_string(m_currentTimestamp) << "]" << endl;

        m_dBManager_sp->persistLastTimestamp(ptNew);
    }
    if(m_it == m_rowSet_sp->end())
    {
        INFO_STREAM << "ProtocolManager::createTransfer() all data sent" << endl;
        m_rowSet_sp.reset();
    }

    return request_sp;
}

//==============================================================================
//      ProtocolManager::createKeepAlive()
//==============================================================================
RequestSP ProtocolManager::createKeepAlive()
{
    DEBUG_STREAM << "ProtocolManager::createKeepAlive()" << endl;

    RequestSP request_sp(new Request);

    request_sp->set_type(Request::KEEPALIVE);

    return request_sp;
//==============================================================================
//      ProtocolManager::processAuthroisation()
//==============================================================================
void ProtocolManager::processAuthroisation(ResponseSP response_sp)
    throw(std::runtime_error)
{
    DEBUG_STREAM << "ProtocolManager::processAuthroisation()" << endl;

    const Response::Authorization& authorization = response_sp->authorization();

    if(authorization.state() == Response::Authorization::ACCEPTED)
    {
        INFO_STREAM << "ProtocolManager::processAuthroisation() State ACCEPTED "
            << "status " << authorization.status() << " from " << m_remoteEndpoint << endl;

        m_isAuthorised = true;
    }
    else
    {
        ERROR_STREAM << "ProtocolManager::processAuthroisation() State REJECTED "
            << "status " << authorization.status() << " from " << m_remoteEndpoint << endl;

        throw std::runtime_error(authorization.status());
    }
}

//==============================================================================
//      ProtocolManager::processValidation()
//==============================================================================
void ProtocolManager::processValidation(ResponseSP response_sp)
    throw(std::runtime_error)
{
    DEBUG_STREAM << "ProtocolManager::processValidation()" << endl;

    const Response::Validation& validation = response_sp->validation();

    if(validation.state() == Response::Validation::ACCEPTED)
    {
        INFO_STREAM << "ProtocolManager::processValidation() State ACCEPTED "
            << "status " << validation.status() << " from " << m_remoteEndpoint << endl;

        m_isValidated = true;
    }
    else
    {
        ERROR_STREAM << "ProtocolManager::processValidation() State REJECTED "
            << "status " << validation.status() << " from " << m_remoteEndpoint << endl;

        throw std::runtime_error(validation.status());
    }
}

//==============================================================================
//      ProtocolManager::processData()
//==============================================================================
void ProtocolManager::processTransfer(ResponseSP response_sp)
        throw(std::runtime_error, std::out_of_range)
{
    DEBUG_STREAM << "ProtocolManager::processData()" << endl;

    const Response::Transfer& transfer = response_sp->transfer();

    if(transfer.state() == Response::Transfer::ACCEPTED)
    {
        INFO_STREAM << "ProtocolManager::processValidation() State ACCEPTED "
            << "status " << transfer.status() << " from " << m_remoteEndpoint << endl;

        INFO_STREAM << "PATH: " << transfer.file_path() << endl;
        INFO_STREAM << "VERSION: " << transfer.file_version() << endl;
        INFO_STREAM << "FILE: " << transfer.file_name() << endl;
        INFO_STREAM << "SIZE: " << transfer.size() << endl;

        m_isTransferRequest = true;

        m_fileSize = transfer.size();

        std::stringstream pathStream;
        pathStream << "/" << m_configuration_sp->getStoragePath() << "/"
            << transfer.file_version() << "/" << transfer.file_version();

        m_fileDir = pathStream.str();
        m_fileName = transfer.file_name();
    }
    else
    {
        ERROR_STREAM << "ProtocolManager::processValidation() State REJECTED "
            << "status " << transfer.status() << " from " << m_remoteEndpoint << endl;

        m_isTransferRequest = false;

        throw std::runtime_error(transfer.status());
    }
}

//==============================================================================
//      ProtocolManager::processKeepAlive()
//==============================================================================
void ProtocolManager::processKeepAlive(ResponseSP response_sp)
{
    DEBUG_STREAM << "ProtocolManager::processKeepAlive()" << endl;

    INFO_STREAM << "ProtocolManager::processKeepAlive() KEEP ALIVE "
        << " from " << m_remoteEndpoint << endl;

    m_isTransferRequest = false;

    m_fileDir.clear();

    m_fileName.clear();

    m_fileSize = 0;

}   //namespace