Commit 4238f6e9 authored by Marco De Marco's avatar Marco De Marco
Browse files

Code refactored, metadata transfer without wait

parent b402dca5
Loading
Loading
Loading
Loading
+1 −8
Original line number Diff line number Diff line
@@ -108,13 +108,6 @@ DBManager::InformationList DBManager::retrieveInformation(std::string schema,

    std::copy(rows.begin(), rows.end(), std::back_inserter(informationList));

    if(informationList.empty())
    {
        std::stringstream errorStream;
        errorStream << schema << "." << table << " not exists";
        throw soci::soci_error(errorStream.str());
    }

    return informationList;
}

@@ -122,7 +115,7 @@ DBManager::InformationList DBManager::retrieveInformation(std::string schema,
//      DBManager::retrieveNewTuples()
//==============================================================================
DBManager::RowsetSP DBManager::retrieveNewTuples(std::string schema,
    std::string table, std::tm update_time) throw(soci::soci_error)
    std::string table, std::tm update_time) throw(soci::soci_error, std::out_of_range)
{
    DEBUG_STREAM << "DBManager::retrieveNewTuples()" << endl;

+1 −1
Original line number Diff line number Diff line
@@ -78,7 +78,7 @@ public:
    typedef boost::shared_ptr< soci::rowset<soci::row> > RowsetSP;

    virtual RowsetSP retrieveNewTuples(std::string, std::string, std::tm)
        throw(soci::soci_error);
        throw(soci::soci_error, std::out_of_range);

protected:
//------------------------------------------------------------------------------
+84 −72
Original line number Diff line number Diff line
#include <ProtocolManager.h>

#include <boost/date_time.hpp>

namespace MetadataExporter_ns
{

@@ -55,6 +57,9 @@ ResponseSP ProtocolManager::prepareResponse(RequestSP request_sp)
{
    DEBUG_STREAM << "ProtocolManager::prepareResponse()" << endl;

    if(!request_sp->IsInitialized())
        throw std::runtime_error("Not initialized request!");

    ResponseSP response_sp;

    switch(request_sp->type())
@@ -88,7 +93,6 @@ ResponseSP ProtocolManager::prepareResponse(RequestSP request_sp)
//      ProtocolManager::prepareAuthroisation()
//==============================================================================
ResponseSP ProtocolManager::prepareAuthroisation(RequestSP request_sp)
    throw(std::runtime_error)
{
    DEBUG_STREAM << "ProtocolManager::prepareAuthroisation()" << endl;

@@ -141,7 +145,6 @@ ResponseSP ProtocolManager::prepareAuthroisation(RequestSP request_sp)
//      ProtocolManager::prepareValidation()
//==============================================================================
ResponseSP ProtocolManager::prepareValidation(RequestSP request_sp)
    throw(std::runtime_error)
{
    DEBUG_STREAM << "ProtocolManager::prepareValidation()" << endl;

@@ -165,17 +168,24 @@ ResponseSP ProtocolManager::prepareValidation(RequestSP request_sp)
                DBManager::InformationList informationList =
                    m_dBManager_sp->retrieveInformation(m_validatedSchema, m_validatedTable);

                if(validationReq.columns_size() != (int)informationList.size())
                    throw std::runtime_error("Columns number does not match");

                const google::protobuf::RepeatedPtrField
                    < Request::Validation::Column >& columns = validationReq.columns();
                if(informationList.empty())
                {
                    std::stringstream errorStream;
                    errorStream << "Table " << m_validatedSchema << "."
                        << m_validatedTable << " not exists";
                    throw soci::soci_error(errorStream.str());
                }

                google::protobuf::RepeatedPtrField
                    < Request::Validation::Column >::const_iterator it;
                if(validationReq.columns_size() != (int)informationList.size())
                {
                    std::stringstream errorStream;
                    errorStream << "Table " << m_validatedSchema << "."
                        << m_validatedTable << " has different columns size";
                    throw soci::soci_error(errorStream.str());
                }

                for(it=columns.begin(); it!=columns.end(); ++it)
                    validateColumn(*it, informationList);
                for(int i=0; i<validationReq.columns_size(); ++i)
                    validateColumn(validationReq.columns(i), informationList);

                INFO_STREAM << "ProtocolManager::prepareValidation() "
                    << "Validation accepted for " << m_validatedSchema << "."
@@ -194,14 +204,6 @@ ResponseSP ProtocolManager::prepareValidation(RequestSP request_sp)
                validationRes->set_state(Response::Validation::REJECTED);
                validationRes->set_status(ex.what());
            }
            catch(...)
            {
                WARN_STREAM << "ProtocolManager::prepareValidation() "
                    << "Unknown exception from " << m_remoteEndpoint << endl;

                validationRes->set_state(Response::Validation::REJECTED);
                validationRes->set_status("Unknown exception");
            }
        }
        else
        {
@@ -228,7 +230,6 @@ ResponseSP ProtocolManager::prepareValidation(RequestSP request_sp)
//      ProtocolManager::prepareMetadata()
//==============================================================================
ResponseSP ProtocolManager::prepareMetadata(RequestSP request_sp)
    throw(std::runtime_error)
{
    DEBUG_STREAM << "ProtocolManager::prepareMetadata()" << endl;

@@ -247,12 +248,16 @@ ResponseSP ProtocolManager::prepareMetadata(RequestSP request_sp)
            int64_t rawTimestamp = metadataReq.timestamp();
            std::tm tmTimestamp = *localtime(&rawTimestamp);

            try
            {
                boost::posix_time::ptime ptTimestamp =
                    boost::posix_time::ptime_from_tm(tmTimestamp);

                DEBUG_STREAM << "ProtocolManager::prepareMetadata() Searching in "
                    << m_validatedSchema << "." << m_validatedTable << " timestamp "
                << asctime(&tmTimestamp) << " from " << m_remoteEndpoint << endl;
                    << boost::posix_time::to_simple_string(ptTimestamp)
                    << " from " << m_remoteEndpoint << endl;

            try
            {
                if(!m_rowSet_sp)
                {
                    m_rowSet_sp = m_dBManager_sp->retrieveNewTuples(
@@ -274,7 +279,7 @@ ResponseSP ProtocolManager::prepareMetadata(RequestSP request_sp)
                    metadataRes->set_status("No more data");
                }
            }
            catch(std::runtime_error& ex)
            catch(std::exception& ex)
            {
                WARN_STREAM << "ProtocolManager::prepareMetadata() "
                    << ex.what() << " from " << m_remoteEndpoint << endl;
@@ -282,16 +287,6 @@ ResponseSP ProtocolManager::prepareMetadata(RequestSP request_sp)
                metadataRes->set_state(Response::Metadata::REJECTED);
                metadataRes->set_status(ex.what());

                m_rowSet_sp.reset();
            }
            catch(...)
            {
                WARN_STREAM << "ProtocolManager::prepareMetadata() "
                    << "Unknown exception from " << m_remoteEndpoint << endl;

                metadataRes->set_state(Response::Metadata::REJECTED);
                metadataRes->set_status("Unknown exception");

                m_rowSet_sp.reset();
            }
        }
@@ -330,7 +325,7 @@ void ProtocolManager::validateColumn(const Request::Validation::Column& column,
    for(it=informationList.begin(); it!=informationList.end(); ++it)
    {
        if(!it->get<0>())
            throw std::runtime_error("Empty column name");
            throw std::runtime_error("Empty column name in information schema");
        std::string columnName = it->get<0>().get();

        if(column.name().compare(columnName)==0)
@@ -338,7 +333,7 @@ void ProtocolManager::validateColumn(const Request::Validation::Column& column,
            found = true;

            if(!it->get<1>())
                throw std::runtime_error("Empty column type");
                throw std::runtime_error("Empty column type in information schema");
            std::string columnType = it->get<1>().get();

            if(column.type().compare(columnType)!=0)
@@ -350,7 +345,7 @@ void ProtocolManager::validateColumn(const Request::Validation::Column& column,
            }

            if(!it->get<2>())
                throw std::runtime_error("Empty is nullable");
                throw std::runtime_error("Empty column nullable in information schema");
            std::string isNullable = it->get<2>().get();

            if(column.nullable().compare(isNullable)!=0)
@@ -362,10 +357,10 @@ void ProtocolManager::validateColumn(const Request::Validation::Column& column,
            }

            #ifdef VERBOSE_DEBUG
                DEBUG_STREAM << "CLIENT: " << columnName << " | " << columnType
                    <<  " | " << isNullable << endl;
                DEBUG_STREAM << "SERVER: " << column.name() << " | "
                    << column.type() <<  " | " << column.nullable() << endl;
                INFO_STREAM << "ProtocolManager::validateColumn(): " << columnName
                    << " " << columnType << " " << isNullable << endl;
                INFO_STREAM << "ProtocolManager::validateColumn(): " << column.name()
                    << " " << column.type() << " " << column.nullable() << endl;
            #endif
        }
    }
@@ -373,7 +368,7 @@ void ProtocolManager::validateColumn(const Request::Validation::Column& column,
    if(!found)
    {
        std::stringstream errorStream;
        errorStream << "Column " << column.name() << " not found on server";
        errorStream << "Column " << column.name() << " not found";
        throw std::runtime_error(errorStream.str());
    }
}
@@ -382,38 +377,33 @@ void ProtocolManager::validateColumn(const Request::Validation::Column& column,
//      ProtocolManager::fillMetadata()
//==============================================================================
void ProtocolManager::fillMetadata(Response::Metadata* metadataRes)
    throw(std::runtime_error)
    throw(std::runtime_error, std::out_of_range)
{
    DEBUG_STREAM << "ProtocolManager::fillMetadata()" << endl;

    std::time_t currentTime = 0;
    boost::posix_time::ptime ptCurrent;

    while(m_it != m_rowSet_sp->end())
    {
        int id = m_it->get<int>("id");
        std::string file_path = m_it->get<std::string>("file_path");
        int file_version = m_it->get<int>("file_version");
        std::string file_name = m_it->get<std::string>("file_name");
        std::tm newTm = m_it->get<std::tm>("update_time");
        std::tm tmNew = m_it->get<std::tm>("update_time");

        #ifdef VERBOSE_DEBUG
            INFO_STREAM << "|" << id << "|" << file_path << "|" << file_version
                << "|" << file_name << "|" << asctime(&newTm) << "|" << endl;
        #endif

        std::time_t newTime = mktime(&newTm);
        boost::posix_time::ptime ptNew = boost::posix_time::ptime_from_tm(tmNew);

        if(currentTime == 0)
            currentTime = newTime;
        if(ptCurrent.is_not_a_date_time())
            ptCurrent = ptNew;

        std::tm currentTm = *localtime(&currentTime);
        INFO_STREAM << "ProtocolManager::fillMetadata() sending "
            << id << " " << file_version << " " << file_name << " "
            << boost::posix_time::to_simple_string(ptNew) << endl;

        INFO_STREAM << "NEW TIME: " << asctime(&newTm) << endl;
        INFO_STREAM << "CURRENT TIME: " << asctime(&currentTm) << endl;

        if(difftime(newTime, currentTime)>0)
        if(ptNew > ptCurrent)
        {
            INFO_STREAM << "BREAK" << endl;
            INFO_STREAM << "ProtocolManager::fillMetadata() stop ["
                << boost::posix_time::to_simple_string(ptNew) << " > "
                << boost::posix_time::to_simple_string(ptCurrent) << "]" << endl;
            break;
        }

@@ -425,7 +415,7 @@ void ProtocolManager::fillMetadata(Response::Metadata* metadataRes)

    if(m_it == m_rowSet_sp->end())
    {
        INFO_STREAM << "RESET" << endl;
        INFO_STREAM << "ProtocolManager::fillMetadata() all data sent" << endl;
        m_rowSet_sp.reset();
    }
}
@@ -434,7 +424,7 @@ void ProtocolManager::fillMetadata(Response::Metadata* metadataRes)
//      ProtocolManager::fillRow()
//==============================================================================
void ProtocolManager::fillRow(Response::Metadata::Row* row)
    throw(std::runtime_error)
    throw(std::runtime_error, std::out_of_range)
{
    DEBUG_STREAM << "ProtocolManager::fillRow()" << endl;

@@ -446,7 +436,10 @@ void ProtocolManager::fillRow(Response::Metadata::Row* row)

        if(m_it->get_indicator(i) == soci::i_null)
        {
            DEBUG_STREAM << "NAME: " << name << " NULL" << endl;
            #ifdef VERBOSE_DEBUG
                INFO_STREAM << "ProtocolManager::fillRow() name "
                    << name << " null" << endl;
            #endif
            continue;
        }

@@ -456,7 +449,10 @@ void ProtocolManager::fillRow(Response::Metadata::Row* row)
            {
                std::string value = m_it->get<std::string>(i);

                DEBUG_STREAM << "NAME: " << name << " " << value << endl;
                #ifdef VERBOSE_DEBUG
                    INFO_STREAM << "ProtocolManager::fillRow() name "
                        << name << " " << value << endl;
                #endif

                Response::Metadata::Row::DtString* dtString = row->add_strings_list();
                dtString->set_key(name);
@@ -468,7 +464,10 @@ void ProtocolManager::fillRow(Response::Metadata::Row* row)
            {
                double value = m_it->get<double>(i);

                DEBUG_STREAM << "NAME: " << name << " VALUE: " << value << endl;
                #ifdef VERBOSE_DEBUG
                    INFO_STREAM << "ProtocolManager::fillRow() name "
                        << name << " " << value << endl;
                #endif

                Response::Metadata::Row::DtDouble* dtDouble = row->add_double_list();
                dtDouble->set_key(name);
@@ -480,7 +479,10 @@ void ProtocolManager::fillRow(Response::Metadata::Row* row)
            {
                int value = m_it->get<int>(i);

                DEBUG_STREAM << "NAME: " << name << " VALUE: " << value << endl;
                #ifdef VERBOSE_DEBUG
                    INFO_STREAM << "ProtocolManager::fillRow() name "
                        << name << " " << value << endl;
                #endif

                Response::Metadata::Row::DtInteger* dtInteger = row->add_integer_list();
                dtInteger->set_key(name);
@@ -492,7 +494,10 @@ void ProtocolManager::fillRow(Response::Metadata::Row* row)
            {
                long long value = m_it->get<long long>(i);

                DEBUG_STREAM << "NAME: " << name << " VALUE: " << value << endl;
                #ifdef VERBOSE_DEBUG
                    INFO_STREAM << "ProtocolManager::fillRow() name "
                        << name << " " << value << endl;
                #endif

                Response::Metadata::Row::DtLongLong* dtLongLong = row->add_long_long_list();
                dtLongLong->set_key(name);
@@ -504,7 +509,10 @@ void ProtocolManager::fillRow(Response::Metadata::Row* row)
            {
                unsigned long value = m_it->get<unsigned long>(i);

                DEBUG_STREAM << "NAME: " << name << " VALUE: " << value << endl;
                #ifdef VERBOSE_DEBUG
                    INFO_STREAM << "ProtocolManager::fillRow() name "
                        << name << " " << value << endl;
                #endif

                Response::Metadata::Row::DtUnsignedLong* dtUnsignedLong = row->add_unsinged_long_list();
                dtUnsignedLong->set_key(name);
@@ -514,13 +522,17 @@ void ProtocolManager::fillRow(Response::Metadata::Row* row)
            }
            case soci::dt_date:
            {
                std::tm value = m_it->get<std::tm>(i);
                std::tm tmValue = m_it->get<std::tm>(i);
                boost::posix_time::ptime ptValue = boost::posix_time::ptime_from_tm(tmValue);

                DEBUG_STREAM << "NAME: " << name << " VALUE: " << asctime(&value) << endl;
                #ifdef VERBOSE_DEBUG
                    INFO_STREAM << "ProtocolManager::fillRow() name "
                        << name << " " << boost::posix_time::to_simple_string(ptValue) << endl;
                #endif

                Response::Metadata::Row::DtDate* dtDate = row->add_date_list();
                dtDate->set_key(name);
                dtDate->set_value(mktime(&value));
                dtDate->set_value(mktime(&tmValue));

                break;
            }
+7 −8
Original line number Diff line number Diff line
@@ -63,14 +63,11 @@ protected:
//------------------------------------------------------------------------------
//  [Protected] Request specific methods
//------------------------------------------------------------------------------
    virtual ResponseSP prepareAuthroisation(RequestSP)
        throw(std::runtime_error);
    virtual ResponseSP prepareAuthroisation(RequestSP);

    virtual ResponseSP prepareValidation(RequestSP)
        throw(std::runtime_error);
    virtual ResponseSP prepareValidation(RequestSP);

    virtual ResponseSP prepareMetadata(RequestSP)
        throw(std::runtime_error);
    virtual ResponseSP prepareMetadata(RequestSP);

//------------------------------------------------------------------------------
//  [Protected] Columns validation method
@@ -81,9 +78,11 @@ protected:
//------------------------------------------------------------------------------
//  [Protected] Metadata serialization method
//------------------------------------------------------------------------------
    virtual void fillMetadata(Response::Metadata*) throw(std::runtime_error);
    virtual void fillMetadata(Response::Metadata*)
        throw(std::runtime_error, std::out_of_range);

    virtual void fillRow(Response::Metadata::Row*) throw(std::runtime_error);
    virtual void fillRow(Response::Metadata::Row*)
        throw(std::runtime_error, std::out_of_range);

//------------------------------------------------------------------------------
//  [Protected] Class variables
+2 −2
Original line number Diff line number Diff line
@@ -45,7 +45,7 @@ Server::~Server()

    if(m_threadGroup_sp)
    {
        //m_threadGroup_sp->interrupt_all();
        m_threadGroup_sp->interrupt_all();

        m_threadGroup_sp->join_all();
    }
@@ -119,7 +119,7 @@ void Server::stop() throw(std::runtime_error)

    if(m_threadGroup_sp)
    {
        //m_threadGroup_sp->interrupt_all();
        m_threadGroup_sp->interrupt_all();

        m_threadGroup_sp->join_all();
    }