Commit ca098181 authored by Marco De Marco's avatar Marco De Marco
Browse files

Protocol buffer communication works

parent f765dfa9
Loading
Loading
Loading
Loading
+82 −48
Original line number Diff line number Diff line
#include <PlainSession.h>

#include <boost/bind.hpp>

#include <netinet/in.h>

namespace MetadataExporter_ns
{

@@ -60,80 +63,111 @@ void PlainSession::start()
    INFO_STREAM << "PlainSession::start() connection from "
        << m_plainSocket.remote_endpoint() << endl;

    boost::asio::async_read_until(m_plainSocket, m_request, "\r\n",
    startReadHeader();
}

//==============================================================================
//      PlainSession::startReadHeader()
//==============================================================================
void PlainSession::startReadHeader()
{
        DEBUG_STREAM << "PlainSession::startReadHeader()" << endl;

        m_readBuff.resize(HEADER_SIZE);

        boost::asio::async_read(m_plainSocket, boost::asio::buffer(m_readBuff),
            m_strand.wrap(
                boost::bind(
                &PlainSession::handleRequest, shared_from_this(),
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred)));
                    &PlainSession::handleReadHeader, shared_from_this(),
                    boost::asio::placeholders::error)));
}

//==============================================================================
//      PlainSession::handleRequest()
//      PlainSession::handleReadHeader()
//==============================================================================
void PlainSession::handleRequest(const boost::system::error_code& ec, std::size_t size)
void PlainSession::handleReadHeader(const boost::system::error_code& error)
{
    DEBUG_STREAM << "PlainSession::handleRequest()" << endl;
    DEBUG_STREAM << "PlainSession::handleReadHeader()" << endl;

    if(!ec)
    if(!error)
    {
        m_request.commit(size);
        boost::uint32_t bodySize = decodeHeader(m_readBuff);

        std::istream iStream(&m_request);
        std::string request;
        iStream >> request;
        DEBUG_STREAM << "PlainSession::handleReadHeader() SIZE: " << bodySize << endl;

        INFO_STREAM << "PlainSession::handleRequest() request \"" << request << "\"" << endl;

        std::ostream oStream(&m_response);
        oStream << request << "\r\n";

        boost::asio::async_write(m_plainSocket, m_response,
            m_strand.wrap(
                boost::bind(&PlainSession::handleResponse, shared_from_this(),
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred)));
    }
    else if(ec == boost::asio::error::eof)
    {
        INFO_STREAM << "PlainSession::handleRequest() disconnection from "
            << m_plainSocket.remote_endpoint() << endl;
        startReadBody(bodySize);
    }
    else
    {
        WARN_STREAM << "PlainSession::handleRequest() error " << ec.message()
            << " from " << m_plainSocket.remote_endpoint() << endl;
        WARN_STREAM << "PlainSession::handleReadHeader() " << error.message() << endl;
    }
}

//==============================================================================
//      PlainSession::handleResponse()
//      PlainSession::startReadBody()
//==============================================================================
void PlainSession::handleResponse(const boost::system::error_code& ec, std::size_t size)
void PlainSession::startReadBody(boost::uint32_t bodySize)
{
    DEBUG_STREAM << "PlainSession::handleResponse()" << endl;
    DEBUG_STREAM << "PlainSession::startReadBody()" << endl;

    if(!ec)
    {
        m_response.consume(size);
    m_readBuff.resize(HEADER_SIZE + bodySize);

    boost::asio::mutable_buffers_1 mutableBuffer =
        boost::asio::buffer(&m_readBuff[HEADER_SIZE], bodySize);

        boost::asio::async_read_until(m_plainSocket, m_request, "\r\n",
    boost::asio::async_read(m_plainSocket, mutableBuffer,
        m_strand.wrap(
            boost::bind(
                    &PlainSession::handleRequest, shared_from_this(),
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred)));
                &PlainSession::handleReadBody, shared_from_this(),
                boost::asio::placeholders::error)));
}
    else if(ec == boost::asio::error::eof)

//==============================================================================
//      PlainSession::handleReadBody()
//==============================================================================
void PlainSession::handleReadBody(const boost::system::error_code& error)
{
        INFO_STREAM << "PlainSession::handleResponse() disconnection from "
            << m_plainSocket.remote_endpoint() << endl;
    DEBUG_STREAM << "PlainSession::handleReadBody()" << endl;

    if(!error)
    {
        handleRequest();

        startReadHeader();
    }
    else
    {
        WARN_STREAM << "PlainSession::handleResponse() error " << ec.message()
            << " from " << m_plainSocket.remote_endpoint() << endl;
        WARN_STREAM << "PlainSession::handleReadHeader() " << error.message() << endl;
    }
}

//==============================================================================
//      PlainSession::handleRequest()
//==============================================================================
void PlainSession::handleRequest()
{
    DEBUG_STREAM << "PlainSession::handleRequest()" << endl;

    RequestSP request_sp(new Request);

    request_sp->ParseFromArray(&m_readBuff[HEADER_SIZE], m_readBuff.size() - HEADER_SIZE);

    ResponseSP response_sp = prepareResponse(request_sp);

    //@warning: byteSize return int not unsigned
    boost::uint32_t bodySize = response_sp->ByteSize();

    DEBUG_STREAM << "PlainSession::handleRequest() SIZE: " << bodySize << endl;

    std::vector<boost::uint8_t> writeBuff;
    writeBuff.resize(HEADER_SIZE + bodySize);

    encodeHeader(writeBuff, bodySize);

    response_sp->SerializeToArray(&writeBuff[HEADER_SIZE], bodySize);

    //TODO: srand!!!! FIXME
    boost::asio::write(m_plainSocket, boost::asio::buffer(writeBuff));
}

}   //namespace
 No newline at end of file
+8 −6
Original line number Diff line number Diff line
@@ -44,19 +44,21 @@ protected:
//------------------------------------------------------------------------------
//  [Protected] Utilities methods
//------------------------------------------------------------------------------
    virtual void handleRequest(const boost::system::error_code&, std::size_t);
    virtual void startReadHeader();

    virtual void handleResponse(const boost::system::error_code&, std::size_t);
    virtual void handleReadHeader(const boost::system::error_code&);

    virtual void startReadBody(boost::uint32_t);

    virtual void handleReadBody(const boost::system::error_code&);

    virtual void handleRequest();

//------------------------------------------------------------------------------
//	[Protected] Class variables
//------------------------------------------------------------------------------
    //TCP socket object
    boost::asio::ip::tcp::socket m_plainSocket;

    boost::asio::streambuf m_request;

    boost::asio::streambuf m_response;
};

}   //End of namespace
+30 −36
Original line number Diff line number Diff line
@@ -81,49 +81,43 @@ void SSLSession::handleHandShake(const boost::system::error_code& ec)
}

//==============================================================================
//      SSLSession::handleRequest()
//      SSLSession::startReadHeader()
//==============================================================================
void SSLSession::handleRequest(const boost::system::error_code& ec, std::size_t size)
{
    DEBUG_STREAM << "SSLSession::handleRequest()" << endl;

    if(!ec)
    {

    }
    else if(ec == boost::asio::error::eof)
void SSLSession::startReadHeader()
{
        INFO_STREAM << "SSLSession::handleRequest() disconnection from "
            << m_sslSocket.lowest_layer().remote_endpoint() << endl;
    }
    else
    {
        WARN_STREAM << "SSLSession::handleRequest() error " << ec.message()
            << " from " << m_sslSocket.lowest_layer().remote_endpoint() << endl;
    }
        DEBUG_STREAM << "SSLSession::startReadHeader()" << endl;
}

//==============================================================================
//      SSLSession::handleResponse()
//      SSLSession::handleReadHeader()
//==============================================================================
void SSLSession::handleResponse(const boost::system::error_code& ec, std::size_t size)
void SSLSession::handleReadHeader(const boost::system::error_code&)
{
    DEBUG_STREAM << "SSLSession::handleResponse()" << endl;
    DEBUG_STREAM << "SSLSession::handleReadHeader()" << endl;
}

    if(!ec)
//==============================================================================
//      SSLSession::startReadBody()
//==============================================================================
void SSLSession::startReadBody(boost::uint32_t bodySize)
{

    DEBUG_STREAM << "SSLSession::startReadBody()" << endl;
}
    else if(ec == boost::asio::error::eof)

//==============================================================================
//      SSLSession::handleReadBody()
//==============================================================================
void SSLSession::handleReadBody(const boost::system::error_code&)
{
        INFO_STREAM << "SSLSession::handleResponse() disconnection from "
            << m_sslSocket.lowest_layer().remote_endpoint() << endl;
    DEBUG_STREAM << "SSLSession::handleReadBody()" << endl;
}
    else

//==============================================================================
//      SSLSession::handleRequest()
//==============================================================================
void SSLSession::handleRequest()
{
        WARN_STREAM << "SSLSession::handleResponse() error " << ec.message()
            << " from " << m_sslSocket.lowest_layer().remote_endpoint() << endl;
    }
    DEBUG_STREAM << "SSLSession::handleRequest()" << endl;
}

}   //namespace
+8 −2
Original line number Diff line number Diff line
@@ -49,9 +49,15 @@ protected:
//------------------------------------------------------------------------------
    virtual void handleHandShake(const boost::system::error_code& ec);

    virtual void handleRequest(const boost::system::error_code&, std::size_t);
    virtual void startReadHeader();

    virtual void handleResponse(const boost::system::error_code&, std::size_t);
    virtual void handleReadHeader(const boost::system::error_code&);

    virtual void startReadBody(boost::uint32_t);

    virtual void handleReadBody(const boost::system::error_code&);

    virtual void handleRequest();

//------------------------------------------------------------------------------
//	[Protected] Class variables
+58 −3
Original line number Diff line number Diff line
@@ -25,25 +25,80 @@ Session::~Session()
//      Session::prepareResponse()
//==============================================================================
Session::ResponseSP Session::prepareResponse(Session::RequestSP request)
    throw(std::runtime_error)
{
    DEBUG_STREAM << "Client::prepareResponse()" << endl;
    DEBUG_STREAM << "Session::prepareResponse()" << endl;

    ResponseSP response_sp(new Response());

    switch(request->type())
    {
        case Request::AUTHORIZATION:
        {
            INFO_STREAM << "Session::prepareResponse() AUTHORIZATION REQUEST" << endl;

            const Request::Authorization& auth_req = request->authorization();

            INFO_STREAM << "USERNAME " << auth_req.username() << endl;
            INFO_STREAM << "PASSWORD " << auth_req.password() << endl;

            response_sp->set_type(Response::AUTHORIZATION);

            Response::Authorization* auth_resp = response_sp->mutable_authorization();
            auth_resp->set_state(Response::Authorization::ACCEPTED);
            auth_resp->set_status("Authorization accepted");

            break;
        }

        case Request::METADATA:
        {
            INFO_STREAM << "Session::prepareResponse() METADATA REQUEST" << endl;
            response_sp->set_type(Response::METADATA);
            break;
        }

        default:
            ERROR_STREAM << "BAD REQUEST " << endl; //TODO: bad request case
            break;
            throw std::runtime_error("Unknown request type!");
    }

    return response_sp;
}

//==============================================================================
//      Session::prepareResponse()
//==============================================================================
void Session::encodeHeader(std::vector<boost::uint8_t>& buf, boost::uint32_t size)
    throw(std::runtime_error)
{
    DEBUG_STREAM << "Session::encodeHeader()" << endl;

    if(buf.size() < HEADER_SIZE)
        throw std::runtime_error("Buffer to small to contain header!");

    buf[0] = static_cast<boost::uint8_t>((size >> 24) & 0xFF);
    buf[1] = static_cast<boost::uint8_t>((size >> 16) & 0xFF);
    buf[2] = static_cast<boost::uint8_t>((size >> 8) & 0xFF);
    buf[3] = static_cast<boost::uint8_t>(size & 0xFF);
}

//==============================================================================
//      Session::prepareResponse()
//==============================================================================
boost::uint32_t Session::decodeHeader(std::vector<boost::uint8_t>& buf)
    throw(std::runtime_error)
{
    DEBUG_STREAM << "Session::decodeHeader()" << endl;

    if(buf.size() < HEADER_SIZE)
        throw std::runtime_error("Buffer to small to contain header!");

    boost::uint32_t size = 0;

    for (unsigned i = 0; i < HEADER_SIZE; ++i)
        size = size * 256 + (static_cast<unsigned>(buf[i]) & 0xFF);

    return size;
}

}   //namespace
Loading