Commit 56041802 authored by Marco De Marco's avatar Marco De Marco
Browse files

Request response methods added

parent a540a8da
Loading
Loading
Loading
Loading
+111 −101
Original line number Diff line number Diff line
@@ -16,7 +16,7 @@ Client::Client(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_
{
    DEBUG_STREAM << "Client::Client()" << endl;

    //GOOGLE_PROTOBUF_VERIFY_VERSION;
    GOOGLE_PROTOBUF_VERIFY_VERSION;

    m_dBManager_sp = DBManager::create(deviceImpl_p, configuration_sp);

@@ -42,7 +42,7 @@ Client::~Client()
        m_thread_sp->join();
    }

    //google::protobuf::ShutdownProtobufLibrary();
    google::protobuf::ShutdownProtobufLibrary();
}

//==============================================================================
@@ -54,8 +54,8 @@ void Client::start()

    m_dBManager_sp->connect();

//    m_protocolManager_sp = ProtocolManager::create(m_deviceImpl_p,
//        m_configuration_sp, m_dBManager_sp);
    m_protocolManager_sp = ProtocolManager::create(m_deviceImpl_p,
        m_configuration_sp, m_dBManager_sp);

    m_ioService.reset();

@@ -84,7 +84,7 @@ void Client::stop()

    m_thread_sp.reset();

    //m_protocolManager_sp.reset();
    m_protocolManager_sp.reset();

    m_dBManager_sp->disconnect();
}
@@ -220,102 +220,112 @@ void Client::handleResolve(const boost::system::error_code& errorCode,
    }
}

////==============================================================================
////      Client::handleRequest()
////==============================================================================
//void Client::handleWriteRequest(const boost::system::error_code& errorCode)
//{
//    DEBUG_STREAM << "Client::handleRequest()" << endl;
//
//    if(!errorCode)
//    {
//        startReadResponseHeader();
//    }
//    else
//    {
//        ERROR_STREAM << "Client::handleRequest() " << errorCode.message() << endl;
//
//        writeState(Tango::FAULT);
//        writeStatus(errorCode.message());
//    }
//}
//
////==============================================================================
////      Client::handleReadResponseHeader()
////==============================================================================
//void Client::handleReadResponseHeader(const boost::system::error_code& errorCode)
//{
//    DEBUG_STREAM << "Client::handleReadResponseHeader()" << endl;
//
//    if(!errorCode)
//    {
//        boost::uint32_t bodySize = decodeHeader(m_readBuff);
//
//        startReadResponseBody(bodySize);
//    }
//    else
//    {
//        ERROR_STREAM << "Client::handleReadResponseHeader() " << errorCode.message() << endl;
//
//        writeState(Tango::FAULT);
//        writeStatus(errorCode.message());
//    }
//}
//
////==============================================================================
////      Client::handleReadResponseBody()
////==============================================================================
//void Client::handleReadResponseBody(const boost::system::error_code& errorCode)
//{
//    DEBUG_STREAM << "Client::handleReadResponseBody()" << endl;
//
//    if(!errorCode)
//    {
//        try
//        {
//            ResponseSP response_sp(new Response);
//
//            response_sp->ParseFromArray(&m_readBuff[HEADER_SIZE],
//                m_readBuff.size() - HEADER_SIZE);
//
//==============================================================================
//      Client::handleRequest()
//==============================================================================
void Client::handleWriteRequest(const boost::system::error_code& errorCode)
{
    DEBUG_STREAM << "Client::handleRequest()" << endl;

    if(!errorCode)
    {
        startReadResponseHeader();
    }
    else
    {
        ERROR_STREAM << "Client::handleRequest() " << errorCode.message() << endl;

        writeState(Tango::FAULT);
        writeStatus(errorCode.message());
    }
}

//==============================================================================
//      Client::handleReadResponseHeader()
//==============================================================================
void Client::handleReadResponseHeader(const boost::system::error_code& errorCode)
{
    DEBUG_STREAM << "Client::handleReadResponseHeader()" << endl;

    if(!errorCode)
    {
        boost::uint32_t bodySize = decodeHeader(m_readBuff);

        startReadResponseBody(bodySize);
    }
    else
    {
        ERROR_STREAM << "Client::handleReadResponseHeader() " << errorCode.message() << endl;

        writeState(Tango::FAULT);
        writeStatus(errorCode.message());
    }
}

//==============================================================================
//      Client::handleReadResponseBody()
//==============================================================================
void Client::handleReadResponseBody(const boost::system::error_code& errorCode)
{
    DEBUG_STREAM << "Client::handleReadResponseBody()" << endl;

    if(!errorCode)
    {
        try
        {
            ResponseSP response_sp(new Response);

            response_sp->ParseFromArray(&m_readBuff[HEADER_SIZE],
                m_readBuff.size() - HEADER_SIZE);

            //m_protocolManager_sp->processResponse(response_sp);
//
//            if(m_protocolManager_sp->waitBeforeRequest())
//            {
//                m_requestResponseTimer.expires_from_now(
//                    boost::posix_time::seconds(m_configuration_sp->getRefreshTime()));
//
//                m_requestResponseTimer.async_wait(
//                    boost::bind(&Client::startWriteRequest, this));
//            }
//            else
//            {
//                startWriteRequest();
//            }
//        }
//        catch(std::exception& ec)
//        {
//            ERROR_STREAM << "Client::handleResponse() " << ec.what() << endl;
//
//            writeState(Tango::FAULT);
//            writeStatus(ec.what());
//        }
//        catch(...)
//        {
//            ERROR_STREAM << "Client::handleResponse() Unknown error" << endl;
//
//            writeState(Tango::FAULT);
//            writeStatus("Unknown error");
//        }
//    }
//    else
//    {
//        ERROR_STREAM << "Client::handleResponse() " << errorCode.message() << endl;
//
//        writeState(Tango::FAULT);
//        writeStatus(errorCode.message());
//    }
//}

            if(m_protocolManager_sp->waitBeforeRequest())
            {
                m_requestResponseTimer.expires_from_now(
                    boost::posix_time::seconds(m_configuration_sp->getRefreshTime()));

                m_requestResponseTimer.async_wait(
                    boost::bind(&Client::startWriteRequest, this));
            }
            else
            {
                startWriteRequest();
            }
        }
        catch(std::exception& ec)
        {
            ERROR_STREAM << "Client::handleResponse() " << ec.what() << endl;

            writeState(Tango::FAULT);
            writeStatus(ec.what());
        }
        catch(...)
        {
            ERROR_STREAM << "Client::handleResponse() Unknown error" << endl;

            writeState(Tango::FAULT);
            writeStatus("Unknown error");
        }
    }
    else
    {
        ERROR_STREAM << "Client::handleResponse() " << errorCode.message() << endl;

        writeState(Tango::FAULT);
        writeStatus(errorCode.message());
    }
}

//==============================================================================
//      Client::resetConnection()
//==============================================================================
void Client::handleReadData(const boost::system::error_code& errorCode)
{
    DEBUG_STREAM << "Client::handleReadData()" << endl;

    
}

//==============================================================================
//      Client::resetConnection()
+31 −27
Original line number Diff line number Diff line
@@ -3,9 +3,9 @@

#include <Configuration.h>
#include <DBManager.h>
//#include <ProtocolManager.h>
//#include <Request.pb.h>
//#include <Response.pb.h>
#include <ProtocolManager.h>
#include <Request.pb.h>
#include <Response.pb.h>

#include <tango.h>

@@ -15,9 +15,6 @@
#include <boost/scoped_ptr.hpp>
#include <boost/cstdint.hpp>

//#include <Request.pb.h>
//#include <Response.pb.h>

namespace DataImporter_ns
{

@@ -81,26 +78,33 @@ protected:
    virtual void handleConnect(const boost::system::error_code&,
        boost::asio::ip::tcp::resolver::iterator) = 0;

////------------------------------------------------------------------------------
////  [Protected] Write request methods
////------------------------------------------------------------------------------
//    virtual void startWriteRequest() = 0;
//
//    virtual void handleWriteRequest(const boost::system::error_code&);
//
////------------------------------------------------------------------------------
////  [Protected] Read response header methods
////------------------------------------------------------------------------------
//    virtual void startReadResponseHeader() = 0;
//
//    virtual void handleReadResponseHeader(const boost::system::error_code&);
//
////------------------------------------------------------------------------------
////  [Protected] Read response body methods
////------------------------------------------------------------------------------
//    virtual void startReadResponseBody(boost::uint32_t) = 0;
//
//    virtual void handleReadResponseBody(const boost::system::error_code&);
//------------------------------------------------------------------------------
//  [Protected] Write request methods
//------------------------------------------------------------------------------
    virtual void startWriteRequest() = 0;

    virtual void handleWriteRequest(const boost::system::error_code&);

//------------------------------------------------------------------------------
//  [Protected] Read response header methods
//------------------------------------------------------------------------------
    virtual void startReadResponseHeader() = 0;

    virtual void handleReadResponseHeader(const boost::system::error_code&);

//------------------------------------------------------------------------------
//  [Protected] Read response body methods
//------------------------------------------------------------------------------
    virtual void startReadResponseBody(boost::uint32_t) = 0;

    virtual void handleReadResponseBody(const boost::system::error_code&);

//------------------------------------------------------------------------------
//  [Protected] Read response body methods
//------------------------------------------------------------------------------
    virtual void startReadData() = 0;

    virtual void handleReadData(const boost::system::error_code&);

//------------------------------------------------------------------------------
//  [Protected] Connection reset and timeout handler methods
@@ -131,7 +135,7 @@ protected:
    DBManager::SP m_dBManager_sp;

    //Protocol manager shared pointer
    //ProtocolManager::SP m_protocolManager_sp;
    ProtocolManager::SP m_protocolManager_sp;

    //IO service instance
    boost::asio::io_service m_ioService;
+50 −13
Original line number Diff line number Diff line
@@ -19,6 +19,9 @@ private:
//	[Private] Constructor destructor deleter
//------------------------------------------------------------------------------
	Configuration(std::string certificateFile, std::string storagePath,
        std::string dIDBHost, unsigned int dIDBPort, std::string dIDBUser,
        std::string dIDBPassword, std::string dIDBSchema,
        std::string dIDBIndexTable, std::string dIDBRejectedTable,
        std::string remoteHost, unsigned int remotePort,
        std::string remoteUsername, std::string remotePassword,
        std::string databaseHost, unsigned int databasePort,
@@ -26,6 +29,9 @@ private:
        std::string databaseSchema, std::string databaseTable,
        unsigned int refreshTime, unsigned int timeout) :
        m_certificateFile (certificateFile), m_storagePath(storagePath),
        m_dIDBHost(dIDBHost), m_dIDBPort(dIDBPort), m_dIDBUser(dIDBUser),
        m_dIDBPassword(dIDBPassword), m_dIDBSchema(dIDBSchema),
        m_dIDBIndexTable(dIDBIndexTable), m_dIDBRejectedTable(dIDBRejectedTable),
        m_remoteHost(remoteHost), m_remotePort(remotePort),
        m_remoteUsername(remoteUsername), m_remotePassword(remotePassword),
        m_databaseHost(databaseHost), m_databasePort(databasePort),
@@ -48,19 +54,22 @@ public:
//	[Public] Create class method
//------------------------------------------------------------------------------
	static Configuration::SP create(std::string certificateFile,
        std::string storagePath, std::string remoteHost,
        unsigned int remotePort, std::string remoteUsername,
        std::string remotePassword, std::string databaseHost
,        unsigned int databasePort, std::string databaseUsername,
        std::string databasePassword, std::string databaseSchema,
        std::string databaseTable, unsigned int refreshTime,
        unsigned int timeout)
        std::string storagePath, std::string dIDBHost, unsigned int dIDBPort,
        std::string dIDBUser, std::string dIDBPassword, std::string dIDBSchema,
        std::string dIDBIndexTable, std::string dIDBRejectedTable,
        std::string remoteHost, unsigned int remotePort,
        std::string remoteUsername, std::string remotePassword,
        std::string databaseHost, unsigned int databasePort,
        std::string databaseUsername, std::string databasePassword,
        std::string databaseSchema, std::string databaseTable,
        unsigned int refreshTime, unsigned int timeout)
	{
		Configuration::SP c_sp(new Configuration(certificateFile, storagePath,
            remoteHost, remotePort, remoteUsername, remotePassword,
            databaseHost, databasePort, databaseUsername, databasePassword,
            databaseSchema, databaseTable, refreshTime, timeout),
            Configuration::Deleter());
            dIDBHost, dIDBPort, dIDBUser, dIDBPassword, dIDBSchema,
            dIDBIndexTable, dIDBRejectedTable, remoteHost, remotePort,
            remoteUsername, remotePassword, databaseHost, databasePort,
            databaseUsername, databasePassword, databaseSchema, databaseTable,
            refreshTime, timeout), Configuration::Deleter());

		return c_sp;
	}
@@ -70,6 +79,13 @@ public:
//------------------------------------------------------------------------------
	std::string	getCertificateFile() const { return m_certificateFile; }
    std::string getStoragePath() const { return m_storagePath; }
	std::string getDIDBHost() const { return m_dIDBHost; }
	unsigned int getDIDBPort() const { return m_dIDBPort; }
	std::string getDIDBUser() const { return m_dIDBUser; }
	std::string getDIDBPassword() const { return m_dIDBPassword; }
	std::string getDIDBSchema() const { return m_dIDBSchema; }
	std::string getDIDBIndexTable() const { return m_dIDBIndexTable; }
	std::string getDIDBRejectedTable() const { return m_dIDBRejectedTable; }
	std::string	getRemoteHost() const { return m_remoteHost; }
	unsigned int getRemotePort() const { return m_remotePort; }
	std::string	getRemoteUsername() const { return m_remoteUsername; }
@@ -93,6 +109,27 @@ private:
	//Absolute path to storage
	const std::string m_storagePath;

	//Host where data import database is running
	const std::string m_dIDBHost;

	//Port where data import database is listening
	const unsigned int m_dIDBPort;

	//User to login in data import database
	const std::string m_dIDBUser;

	//Password to login in data import database
	const std::string m_dIDBPassword;

	//Schema where data import tables are located
	const std::string m_dIDBSchema;

	//Index table name
	const std::string m_dIDBIndexTable;

	//Rejected table name
	const std::string m_dIDBRejectedTable;

    //Metadata exporter remote host
	const std::string	m_remoteHost;

+27 −6
Original line number Diff line number Diff line
@@ -490,10 +490,31 @@ void DataImporter::get_device_property()

        checkIfDirectoryExists(storagePath);

        if(dIDBHost.empty())
            throw(invalid_argument("DIDBHost property is empty or not defined"));

        if(dIDBPort<1 || dIDBPort>MAX_PORT_NUMBER)
            throw(invalid_argument("DIDBPort property out of range or not defined"));

        if(dIDBUser.empty())
            throw(invalid_argument("DIDBUser property is empty or not defined"));

        if(dIDBPassword.empty())
            throw(invalid_argument("DIDBPassword property is empty or not defined"));

        if(dIDBSchema.empty())
            throw(invalid_argument("DIDBSchema property is empty or not defined"));

        if(dIDBIndexTable.empty())
            throw(invalid_argument("DIDBIndexTable property is empty or not defined"));

        if(dIDBRejectedTable.empty())
            throw(invalid_argument("DIDBRejectedTable property is empty or not defined"));

        if(remoteHost.empty())
            throw(invalid_argument("RemoteHost property is empty or not defined"));

        if(remotePort<1 || remotePort>MAX_REMOTE_PORT)
        if(remotePort<1 || remotePort>MAX_PORT_NUMBER)
            throw(invalid_argument("RemotePort property out of range or not defined"));

        if(remoteUsername.empty())
@@ -505,7 +526,7 @@ void DataImporter::get_device_property()
        if(databaseHost.empty())
                throw(invalid_argument("DatabaseHost property is empty or not defined"));

        if(databasePort<1 || databasePort>MAX_DATABASE_PORT)
        if(databasePort<1 || databasePort>MAX_PORT_NUMBER)
            throw(invalid_argument("DatabasePort property out of range or not defined"));

        if(databaseUsername.empty())
@@ -527,9 +548,10 @@ void DataImporter::get_device_property()
            throw(invalid_argument("Timeout property out of range or not defined"));

        m_configuration_sp = Configuration::create(certificateFile, storagePath,
            remoteHost, remotePort, remoteUsername, remotePassword, databaseHost,
            databasePort, databaseUsername, databasePassword, databaseSchema,
            databaseTable, refreshTime, timeout);
            dIDBHost, dIDBPort, dIDBUser, dIDBPassword, dIDBSchema, dIDBIndexTable,
            dIDBRejectedTable, remoteHost, remotePort, remoteUsername, remotePassword,
            databaseHost, databasePort, databaseUsername, databasePassword,
            databaseSchema, databaseTable, refreshTime, timeout);
    }
    catch(invalid_argument& ex)
    {
@@ -715,6 +737,5 @@ void DataImporter::checkIfDirectoryExists(std::string directoryName)
    INFO_STREAM << "DataImporter::checkIfDirectoryExists() " << directoryName << endl;
}


/*----- PROTECTED REGION END -----*/	//	DataImporter::namespace_ending
} //	namespace
+3 −6
Original line number Diff line number Diff line
@@ -73,11 +73,8 @@ class DataImporter : public TANGO_BASE_CLASS
    //Client class shared pointer
    Client::SP m_client_sp;

    //Max port number allowed value for remote connection
    static const unsigned int MAX_REMOTE_PORT = 65535;

    //Max port number allowed value for local database
    static const unsigned int MAX_DATABASE_PORT = 65535;
    //Max port number allowed value for data import database
    static const unsigned int MAX_PORT_NUMBER = 65535;

    //Max time between remote server requests
    static const unsigned int MAX_REFRESH_TIME = 3600;
@@ -94,7 +91,7 @@ public:
	string	certificateFile;
	//	StoragePath:	Absolute path to storage
	string	storagePath;
	//	DIDBHost:	Hostname where data import database is running
	//	DIDBHost:	Host where data import database is running
	string	dIDBHost;
	//	DIDBPort:	Port where data import database is listening
	Tango::DevULong	dIDBPort;
Loading