Commit bad4b1c5 authored by Marco De Marco's avatar Marco De Marco
Browse files

Code refactored, download works, loop works, not completed

parent 675a9532
Loading
Loading
Loading
Loading
+36 −35
Original line number Original line Diff line number Diff line
@@ -223,6 +223,8 @@ void Client::handleUpdateLists()
    {
    {
        m_listsUpdateTimer.expires_from_now(
        m_listsUpdateTimer.expires_from_now(
            boost::posix_time::seconds(m_configuration_sp->getRefreshTime()));
            boost::posix_time::seconds(m_configuration_sp->getRefreshTime()));

        m_listsUpdateTimer.async_wait(boost::bind(&Client::handleUpdateLists, this));
    }
    }
}
}


@@ -333,16 +335,14 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode)
            response_sp->ParseFromArray(&m_readBuff[HEADER_SIZE],
            response_sp->ParseFromArray(&m_readBuff[HEADER_SIZE],
                m_readBuff.size() - HEADER_SIZE);
                m_readBuff.size() - HEADER_SIZE);


            FileWrapper::SP fileWrapper_sp =
            startReadData(m_protocolManager_sp->processResponse(response_sp));
                m_protocolManager_sp->processResponse(response_sp);

            startReadData(fileWrapper_sp);
        }
        }
        catch(std::logic_error& ec)
        catch(std::logic_error& ec)
        {
        {
            WARN_STREAM << "Client::handleReadResponseBody() " << ec.what() << endl;
            WARN_STREAM << "Client::handleReadResponseBody() " << ec.what() << endl;


            //TODO: mark file as failed and try with next next
            writeState(Tango::ALARM);
            writeStatus(ec.what());
        }
        }
        catch(std::runtime_error& ec)
        catch(std::runtime_error& ec)
        {
        {
@@ -350,8 +350,6 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode)


            writeState(Tango::ALARM);
            writeState(Tango::ALARM);
            writeStatus(ec.what());
            writeStatus(ec.what());

            //TODO: stop and set ALARM
        }
        }
        catch(...)
        catch(...)
        {
        {
@@ -359,8 +357,6 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode)


            writeState(Tango::ALARM);
            writeState(Tango::ALARM);
            writeStatus("Unknown error");
            writeStatus("Unknown error");

            //TODO: shit storm happens... stop and set ALARM
        }
        }
    }
    }
    else
    else
@@ -380,8 +376,8 @@ void Client::handleReadData(FileWrapper::SP fileWrapper_sp, std::size_t recvByte
{
{
    if(!errorCode)
    if(!errorCode)
    {
    {
        //TODO: if output stream is bad?
        if(!fileWrapper_sp->isBad())

        {
            if(recvBytes>0)
            if(recvBytes>0)
                fileWrapper_sp->write(m_fileBuff, recvBytes);
                fileWrapper_sp->write(m_fileBuff, recvBytes);


@@ -393,27 +389,37 @@ void Client::handleReadData(FileWrapper::SP fileWrapper_sp, std::size_t recvByte
            {
            {
                INFO_STREAM << "Client::handleReadData() transfer complete " << endl;
                INFO_STREAM << "Client::handleReadData() transfer complete " << endl;


            if(m_protocolManager_sp->hasNextFile())
                m_protocolManager_sp->markAsCompleted();
            {

                m_protocolManager_sp->nextFile();
                m_protocolManager_sp->nextFile();


                if(m_protocolManager_sp->hasNextFile())
                {
                    startWriteRequest();
                    startWriteRequest();
                }
                }
                else
                else
                {
                {
                    closeConnection();

                    startUpdateLists();
                    startUpdateLists();
                }
                }
            }
            }
        }
        }
    else if(errorCode == boost::asio::error::eof)
        else
        {
        {
        DEBUG_STREAM << "Client::handleReadData() end of file from "
            ERROR_STREAM << "Client::handleReadData() bad I/O" << endl;
            << m_remoteEndpoint << endl;

            writeState(Tango::ALARM);
            writeStatus("Bad I/O");
        }
    }
    }
    else
    else
    {
    {
        ERROR_STREAM << "Client::handleReadData() "
        ERROR_STREAM << "Client::handleReadData() "
            << errorCode.message() << " from " << m_remoteEndpoint << endl;
            << errorCode.message() << " from " << m_remoteEndpoint << endl;

        writeState(Tango::ALARM);
        writeStatus(errorCode.message());
    }
    }
}
}


@@ -429,15 +435,10 @@ void Client::resetConnection()
    {
    {
        ERROR_STREAM << "Client::resetConnection() Connection timeout" << endl;
        ERROR_STREAM << "Client::resetConnection() Connection timeout" << endl;


        m_resetConnectionTimer.expires_at(boost::posix_time::pos_infin);
        m_listsUpdateTimer.expires_at(boost::posix_time::pos_infin);

        closeConnection();
        closeConnection();


        startUpdateLists();
        startUpdateLists();
    }
    }

    m_resetConnectionTimer.async_wait(boost::bind(&Client::resetConnection, this));
}
}


//==============================================================================
//==============================================================================
+11 −3
Original line number Original line Diff line number Diff line
@@ -47,7 +47,8 @@ void PlainClient::startConnect(boost::asio::ip::tcp::resolver::iterator endPoint
{
{
    DEBUG_STREAM << "PlainClient::startConnect()" << endl;
    DEBUG_STREAM << "PlainClient::startConnect()" << endl;


    m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30));
    m_resetConnectionTimer.expires_from_now(
        boost::posix_time::seconds(m_configuration_sp->getTimeout()));


    if(endPointIterator != boost::asio::ip::tcp::resolver::iterator())
    if(endPointIterator != boost::asio::ip::tcp::resolver::iterator())
    {
    {
@@ -135,7 +136,8 @@ void PlainClient::startWriteRequest()


        request_sp->SerializeToArray(&writeBuff[HEADER_SIZE], bodySize);
        request_sp->SerializeToArray(&writeBuff[HEADER_SIZE], bodySize);


        m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30));
        m_resetConnectionTimer.expires_from_now(
            boost::posix_time::seconds(m_configuration_sp->getTimeout()));


        boost::asio::async_write(m_plainSocket, boost::asio::buffer(writeBuff),
        boost::asio::async_write(m_plainSocket, boost::asio::buffer(writeBuff),
            boost::bind(&PlainClient::handleWriteRequest, this,
            boost::bind(&PlainClient::handleWriteRequest, this,
@@ -173,7 +175,8 @@ void PlainClient::startReadResponseHeader()


    m_readBuff.resize(HEADER_SIZE);
    m_readBuff.resize(HEADER_SIZE);


    m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30));
    m_resetConnectionTimer.expires_from_now(
        boost::posix_time::seconds(m_configuration_sp->getTimeout()));


    boost::asio::async_read(m_plainSocket, boost::asio::buffer(m_readBuff),
    boost::asio::async_read(m_plainSocket, boost::asio::buffer(m_readBuff),
        boost::bind(&PlainClient::handleReadResponseHeader, this,
        boost::bind(&PlainClient::handleReadResponseHeader, this,
@@ -218,6 +221,9 @@ void PlainClient::startReadData(FileWrapper::SP fileWrapper_sp)


    m_fileBuff.resize(bufferSize);
    m_fileBuff.resize(bufferSize);


    m_resetConnectionTimer.expires_from_now(
        boost::posix_time::seconds(m_configuration_sp->getTimeout()));

    boost::asio::async_read(m_plainSocket, boost::asio::buffer(m_fileBuff),
    boost::asio::async_read(m_plainSocket, boost::asio::buffer(m_fileBuff),
        boost::bind(&PlainClient::handleReadData, this, fileWrapper_sp,
        boost::bind(&PlainClient::handleReadData, this, fileWrapper_sp,
            boost::asio::placeholders::bytes_transferred,
            boost::asio::placeholders::bytes_transferred,
@@ -236,6 +242,8 @@ void PlainClient::closeConnection()


    INFO_STREAM << "PlainClient::closeConnection() " << infoStream.str() << endl;
    INFO_STREAM << "PlainClient::closeConnection() " << infoStream.str() << endl;


    m_resetConnectionTimer.expires_at(boost::posix_time::pos_infin);

    boost::system::error_code errorCode;
    boost::system::error_code errorCode;


    m_plainSocket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, errorCode);
    m_plainSocket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, errorCode);
+53 −1
Original line number Original line Diff line number Diff line
@@ -100,6 +100,58 @@ bool ProtocolManager::hasNextFile()
    }
    }
}
}


//==============================================================================
//      ProtocolManager::markAsCompleted()
//==============================================================================
void ProtocolManager::markAsCompleted()
{
    DEBUG_STREAM << "ProtocolManager::markAsCompleted()" << endl;

    if(!m_recoveryMode)
    {
        if(!m_newFileRowset_sp ||
            m_newFileRowsetIt == m_newFileRowset_sp->end())
                throw std::runtime_error("New list not initialized or empty");

        if(!m_newFileRowsetIt->get<0>())
            throw std::invalid_argument("Empty file version found on new list");
        int fileVersion = m_newFileRowsetIt->get<0>().get();

        if(!m_newFileRowsetIt->get<1>())
            throw std::invalid_argument("Empty file name found on new list");
        std::string fileName = m_newFileRowsetIt->get<1>().get();

        if(!m_newFileRowsetIt->get<2>())
            throw std::invalid_argument("Empty update time found on new list");
        std::tm update_time = m_newFileRowsetIt->get<2>().get();

        INFO_STREAM << "ProtocolManager::createRequest() mark completed "
            << fileName << " version " << fileVersion << endl;

        boost::posix_time::ptime current_time =
            boost::posix_time::ptime_from_tm(update_time);

        INFO_STREAM << "ProtocolManager::createRequest() "
            << boost::posix_time::to_simple_string(current_time) << endl;

        m_dBManager_sp->persistLastTimestamp(current_time);
    }
    else
    {
        ERROR_STREAM << "ProtocolManager::createRequest() mark failed list" << endl;
    }
}

//==============================================================================
//      ProtocolManager::markAsFailed()
//==============================================================================
void ProtocolManager::markAsFailed()
{
    DEBUG_STREAM << "ProtocolManager::markAsFailed()" << endl;


}

//==============================================================================
//==============================================================================
//      ProtocolManager::nextFile()
//      ProtocolManager::nextFile()
//==============================================================================
//==============================================================================
@@ -112,7 +164,7 @@ void ProtocolManager::nextFile()
        if(m_newFileRowset_sp &&
        if(m_newFileRowset_sp &&
            m_newFileRowsetIt != m_newFileRowset_sp->end())
            m_newFileRowsetIt != m_newFileRowset_sp->end())
        {
        {
            DEBUG_STREAM << "ProtocolManager::nextFile() from new list" << endl;
            DEBUG_STREAM << "ProtocolManager::nextFile() new list" << endl;


            ++m_newFileRowsetIt;
            ++m_newFileRowsetIt;
        }
        }
+4 −2
Original line number Original line Diff line number Diff line
@@ -63,6 +63,10 @@ public:


    virtual bool hasNextFile();
    virtual bool hasNextFile();


    virtual void markAsCompleted();

    virtual void markAsFailed();

    virtual void nextFile();
    virtual void nextFile();


//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
@@ -78,8 +82,6 @@ protected:
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
//  [Protected] File path method
//  [Protected] File path method
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------


    virtual boost::filesystem::path composePath(std::string, std::string, int);
    virtual boost::filesystem::path composePath(std::string, std::string, int);


//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
+13 −4
Original line number Original line Diff line number Diff line
@@ -65,7 +65,8 @@ void SSLClient::startConnect(boost::asio::ip::tcp::resolver::iterator endPointIt
{
{
    DEBUG_STREAM << "SSLClient::startConnect()" << endl;
    DEBUG_STREAM << "SSLClient::startConnect()" << endl;


    m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30));
    m_resetConnectionTimer.expires_from_now(
        boost::posix_time::seconds(m_configuration_sp->getTimeout()));


    if(endPointIterator != boost::asio::ip::tcp::resolver::iterator())
    if(endPointIterator != boost::asio::ip::tcp::resolver::iterator())
    {
    {
@@ -122,7 +123,8 @@ void SSLClient::startHandShake()
{
{
    DEBUG_STREAM << "SSLClient::startHandShake()" << endl;
    DEBUG_STREAM << "SSLClient::startHandShake()" << endl;


    m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30));
    m_resetConnectionTimer.expires_from_now(
        boost::posix_time::seconds(m_configuration_sp->getTimeout()));


    m_sSLSocket.async_handshake(boost::asio::ssl::stream_base::client,
    m_sSLSocket.async_handshake(boost::asio::ssl::stream_base::client,
        boost::bind(&SSLClient::handleHandShake, this,
        boost::bind(&SSLClient::handleHandShake, this,
@@ -187,7 +189,8 @@ void SSLClient::startWriteRequest()


        request_sp->SerializeToArray(&writeBuff[HEADER_SIZE], bodySize);
        request_sp->SerializeToArray(&writeBuff[HEADER_SIZE], bodySize);


        m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30));
        m_resetConnectionTimer.expires_from_now(
            boost::posix_time::seconds(m_configuration_sp->getTimeout()));


        boost::asio::async_write(m_sSLSocket, boost::asio::buffer(writeBuff),
        boost::asio::async_write(m_sSLSocket, boost::asio::buffer(writeBuff),
            boost::bind(&SSLClient::handleWriteRequest, this,
            boost::bind(&SSLClient::handleWriteRequest, this,
@@ -225,7 +228,8 @@ void SSLClient::startReadResponseHeader()


    m_readBuff.resize(HEADER_SIZE);
    m_readBuff.resize(HEADER_SIZE);


    m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30));
    m_resetConnectionTimer.expires_from_now(
        boost::posix_time::seconds(m_configuration_sp->getTimeout()));


    boost::asio::async_read(m_sSLSocket, boost::asio::buffer(m_readBuff),
    boost::asio::async_read(m_sSLSocket, boost::asio::buffer(m_readBuff),
        boost::bind(&SSLClient::handleReadResponseHeader, this,
        boost::bind(&SSLClient::handleReadResponseHeader, this,
@@ -270,6 +274,9 @@ void SSLClient::startReadData(FileWrapper::SP fileWrapper_sp)


    m_fileBuff.resize(bufferSize);
    m_fileBuff.resize(bufferSize);


    m_resetConnectionTimer.expires_from_now(
        boost::posix_time::seconds(m_configuration_sp->getTimeout()));

    boost::asio::async_read(m_sSLSocket, boost::asio::buffer(m_fileBuff),
    boost::asio::async_read(m_sSLSocket, boost::asio::buffer(m_fileBuff),
        boost::bind(&SSLClient::handleReadData, this, fileWrapper_sp,
        boost::bind(&SSLClient::handleReadData, this, fileWrapper_sp,
            boost::asio::placeholders::bytes_transferred,
            boost::asio::placeholders::bytes_transferred,
@@ -288,6 +295,8 @@ void SSLClient::closeConnection()


    INFO_STREAM << "SSLClient::closeConnection() " << infoStream.str() << endl;
    INFO_STREAM << "SSLClient::closeConnection() " << infoStream.str() << endl;


    m_resetConnectionTimer.expires_at(boost::posix_time::pos_infin);

    boost::system::error_code errorCode;
    boost::system::error_code errorCode;


    m_sSLSocket.lowest_layer().shutdown(
    m_sSLSocket.lowest_layer().shutdown(