Commit 06dcf948 authored by Marco De Marco's avatar Marco De Marco
Browse files

Code refactored, metadata transfer without wait

parent c494d4b3
Loading
Loading
Loading
Loading
+17 −10
Original line number Diff line number Diff line
@@ -37,7 +37,7 @@ Client::~Client()

    if(m_thread_sp)
    {
        //m_thread_sp->interrupt();
        m_thread_sp->interrupt();

        m_thread_sp->join();
    }
@@ -77,7 +77,7 @@ void Client::stop()

    if(m_thread_sp)
    {
        //m_thread_sp->interrupt();
        m_thread_sp->interrupt();

        m_thread_sp->join();
    }
@@ -108,7 +108,7 @@ std::string Client::readStatus()
{
    DEBUG_STREAM << "Client::readStatus()" << endl;

    boost::mutex::scoped_lock statusLock(m_stateMutex);
    boost::mutex::scoped_lock statusLock(m_statusMutex);

    return m_status;
}
@@ -132,7 +132,7 @@ void Client::writeStatus(std::string status)
{
    DEBUG_STREAM << "Client::writeStatus()" << endl;

    boost::mutex::scoped_lock statusLock(m_stateMutex);
    boost::mutex::scoped_lock statusLock(m_statusMutex);

    m_status = status;
}
@@ -190,7 +190,8 @@ void Client::startResolve()
    boost::asio::ip::tcp::resolver::query query(m_configuration_sp->getRemoteHost(),
        boost::lexical_cast<std::string>(m_configuration_sp->getRemotePort()));

    m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30));
    m_resetConnectionTimer.expires_from_now(
        boost::posix_time::seconds(m_configuration_sp->getTimeout()));

    m_resolver.async_resolve(query, boost::bind(&Client::handleResolve, this,
        boost::asio::placeholders::error, boost::asio::placeholders::iterator));
@@ -278,13 +279,19 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode)

            m_protocolManager_sp->processResponse(response_sp);

            //@todo: if no more data wait before starWriteRequest,
            //       else starWriteRequest now
            if(m_protocolManager_sp->waitBeforeRequest())
            {
                m_requestResponseTimer.expires_from_now(
                    boost::posix_time::seconds(m_configuration_sp->getRefreshTime()));

            m_requestResponseTimer.expires_from_now(boost::posix_time::seconds(10));
                m_requestResponseTimer.async_wait(boost::bind(&Client::startWriteRequest, this));
            }
        catch(std::runtime_error& ec)
            else
            {
                startWriteRequest();
            }
        }
        catch(std::exception& ec)
        {
            ERROR_STREAM << "Client::handleResponse() " << ec.what() << endl;

+7 −15
Original line number Diff line number Diff line
@@ -134,7 +134,7 @@ std::tm DBManager::retrieveLastTimestamp(std::string schema, std::string table)
//      DBManager::persistMetadata()
//==============================================================================
void DBManager::persistMetadata(std::string schema, std::string table,
    const Response::Metadata& metadata) throw(soci::soci_error)
    const Response::Metadata& metadata) throw(soci::soci_error, std::out_of_range)
{
    DEBUG_STREAM << "DBManager::persistMetadata()" << endl;

@@ -145,16 +145,8 @@ void DBManager::persistMetadata(std::string schema, std::string table,

    soci::transaction transaction(*m_session_sp);

    const google::protobuf::RepeatedPtrField
        < Response::Metadata::Row >& rows = metadata.rows();

    google::protobuf::RepeatedPtrField
        < Response::Metadata::Row >::const_iterator it;

    for(it=rows.begin(); it!=rows.end(); ++it)
    {
        *m_session_sp << composeInsertQuery(schema, table, *it);
    }
    for(int i=0; i<metadata.rows_size(); ++i)
        *m_session_sp << composeInsertQuery(schema, table, metadata.rows(i));

    transaction.commit();
}
@@ -163,7 +155,7 @@ void DBManager::persistMetadata(std::string schema, std::string table,
//      DBManager::composeInsertQuery()
//==============================================================================
std::string DBManager::composeInsertQuery(std::string schema, std::string table,
    const Response::Metadata::Row& row)
    const Response::Metadata::Row& row) throw(std::out_of_range)
{
    DEBUG_STREAM << "DBManager::composeInsertQuery()" << endl;

@@ -171,8 +163,6 @@ std::string DBManager::composeInsertQuery(std::string schema, std::string table,
    std::stringstream valuesStream;
    std::stringstream keyValuesStream;

    //@fixme: check against malformed timestamp

    //DtDate list
    for(int i = 0; i<row.date_list_size(); ++i)
    {
@@ -249,7 +239,9 @@ std::string DBManager::composeInsertQuery(std::string schema, std::string table,
        << " (" << keys << ") VALUES (" << values << ") "
        << " ON DUPLICATE KEY UPDATE " << keyValues;

    INFO_STREAM << "QUERY: " << query.str() << endl;
    #ifdef VERBOSE_DEBUG
        INFO_STREAM << "DBManager::composeInsertQuery() " << query.str() << endl;
    #endif

    return query.str();
}
+2 −2
Original line number Diff line number Diff line
@@ -83,14 +83,14 @@ public:
        throw(soci::soci_error);

    virtual void persistMetadata(std::string, std::string, const Response::Metadata&)
        throw(soci::soci_error);
        throw(soci::soci_error, std::out_of_range);

protected:
//------------------------------------------------------------------------------
//  [Protected] Utilities method
//------------------------------------------------------------------------------
    virtual std::string composeInsertQuery(std::string, std::string,
        const Response::Metadata::Row&);
        const Response::Metadata::Row&) throw(std::out_of_range);

//------------------------------------------------------------------------------
//  [Protected] Class variables
+1 −1
Original line number Diff line number Diff line
@@ -162,7 +162,7 @@ void PlainClient::startWriteRequest()
            boost::bind(&PlainClient::handleWriteRequest, this,
            boost::asio::placeholders::error));
    }
    catch(std::runtime_error& ec)
    catch(std::exception& ec)
    {
        ERROR_STREAM << "PlainClient::startWriteRequest() " << ec.what() << endl;

+41 −15
Original line number Diff line number Diff line
#include <ProtocolManager.h>

#include <boost/date_time.hpp>

namespace MetadataImporter_ns
{

@@ -15,6 +17,7 @@ ProtocolManager::ProtocolManager(Tango::DeviceImpl* deviceImpl_p,

    m_isAuthorised = false;
    m_isValidated = false;
    m_hasMoreData = true;
}

//==============================================================================
@@ -47,10 +50,21 @@ void ProtocolManager::setRemoteEndpoint(std::string remoteEndpoint)
    m_remoteEndpoint = remoteEndpoint;
}

//==============================================================================
//      ProtocolManager::waitBeforeRequest()
//==============================================================================
bool ProtocolManager::waitBeforeRequest()
{
    DEBUG_STREAM << "ProtocolManager::waitBeforeRequest()" << endl;

    return !m_hasMoreData;
}

//==============================================================================
//      ProtocolManager::createRequest()
//==============================================================================
RequestSP ProtocolManager::createRequest() throw(std::runtime_error)
RequestSP ProtocolManager::createRequest()
    throw(std::runtime_error, std::out_of_range)
{
    DEBUG_STREAM << "ProtocolManager::createRequest()" << endl;

@@ -79,7 +93,7 @@ RequestSP ProtocolManager::createRequest() throw(std::runtime_error)
//      ProtocolManager::processResponse()
//==============================================================================
void ProtocolManager::processResponse(ResponseSP response_sp)
    throw(std::runtime_error)
    throw(std::runtime_error, std::out_of_range)
{
    DEBUG_STREAM << "ProtocolManager::processResponse()" << endl;

@@ -164,28 +178,27 @@ RequestSP ProtocolManager::createValidation() throw(std::runtime_error)
        Request::Validation::Column* column = validation->add_columns();

        if(!it->get<0>())
            throw std::runtime_error("Empty column name");
            throw std::runtime_error("Empty column name in information schema");

        std::string columnName = it->get<0>().get();
        column->set_name(columnName);

        if(!it->get<1>())
            throw std::runtime_error("Empty column type");
            throw std::runtime_error("Empty column type in information schema");

        std::string columnType = it->get<1>().get();
        column->set_type(columnType);

        if(!it->get<2>())
            throw std::runtime_error("Empty is nullable");
            throw std::runtime_error("Empty column nullable in information schema");

        std::string isNullable = it->get<2>().get();
        column->set_nullable(isNullable);

        #ifdef VERBOSE_DEBUG
        INFO_STREAM << "NAME " << columnName << " TYPE " << columnType
            << " ISNULLABLE " << isNullable << endl;
            INFO_STREAM << "ProtocolManager::createValidation() " << columnName
                << " " << columnType << " " << isNullable << endl;
        #endif

    }

    return request_sp;
@@ -194,7 +207,8 @@ RequestSP ProtocolManager::createValidation() throw(std::runtime_error)
//==============================================================================
//      ProtocolManager::createMetadata()
//==============================================================================
RequestSP ProtocolManager::createMetadata() throw(std::runtime_error)
RequestSP ProtocolManager::createMetadata()
        throw(std::runtime_error, std::out_of_range)
{
    DEBUG_STREAM << "ProtocolManager::createMetadata()" << endl;

@@ -205,15 +219,19 @@ RequestSP ProtocolManager::createMetadata() throw(std::runtime_error)
    std::string schema = m_configuration_sp->getDatabaseSchema();
    std::string table = m_configuration_sp->getDatabaseTable();

    std::tm timestamp = m_dBManager_sp->retrieveLastTimestamp(schema, table);
    std::tm tmTimestamp = m_dBManager_sp->retrieveLastTimestamp(schema, table);

    boost::posix_time::ptime ptTimestamp =
        boost::posix_time::ptime_from_tm(tmTimestamp);

    INFO_STREAM << "ProtocolManager::createMetadata() Send schema "
        << schema << " table " << table << " timestamp " << asctime(&timestamp)
        << schema << " table " << table << " timestamp "
        << boost::posix_time::to_simple_string(ptTimestamp)
        << " to " << m_remoteEndpoint << endl;

    Request::Metadata* metadata = request_sp->mutable_metadata();

    metadata->set_timestamp(mktime(&timestamp));
    metadata->set_timestamp(mktime(&tmTimestamp));

    return request_sp;
}
@@ -274,7 +292,7 @@ void ProtocolManager::processValidation(ResponseSP response_sp)
//      ProtocolManager::processMetadata()
//==============================================================================
void ProtocolManager::processMetadata(ResponseSP response_sp)
    throw(std::runtime_error)
    throw(std::runtime_error, std::out_of_range)
{
    DEBUG_STREAM << "ProtocolManager::processMetadata()" << endl;

@@ -290,7 +308,15 @@ void ProtocolManager::processMetadata(ResponseSP response_sp)
            << " table " << table << " from " << m_remoteEndpoint << endl;

        if(metadata.rows_size() != 0)
        {
            m_dBManager_sp->persistMetadata(schema, table, metadata);

            m_hasMoreData = true;
        }
        else
        {
            m_hasMoreData = false;
        }
    }
    else
    {
Loading