Commit faee3577 authored by Marco De Marco's avatar Marco De Marco
Browse files

Client and database manager classes added

parent 04b932ff
Loading
Loading
Loading
Loading

src/Client.cpp

0 → 100644
+381 −0
Original line number Original line Diff line number Diff line
#include <Client.h>

#include <boost/lexical_cast.hpp>
#include <boost/bind.hpp>

namespace DataImporter_ns
{

//==============================================================================
//      Client::Client()
//==============================================================================
Client::Client(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_sp) :
    Tango::LogAdapter(deviceImpl_p), m_deviceImpl_p(deviceImpl_p),
    m_configuration_sp(configuration_sp),  m_resolver(m_ioService),
    m_resetConnectionTimer(m_ioService), m_requestResponseTimer(m_ioService)
{
    DEBUG_STREAM << "Client::Client()" << endl;

    //GOOGLE_PROTOBUF_VERIFY_VERSION;

    m_dBManager_sp = DBManager::create(deviceImpl_p, configuration_sp);

    m_state = Tango::OFF;
    m_status="Disconnected";
}

//==============================================================================
//      Client::~Client()
//==============================================================================
Client::~Client()
{
    DEBUG_STREAM << "Client::~Client()" << endl;

    m_ioService.stop();

    m_work_sp.reset();

    if(m_thread_sp)
    {
        m_thread_sp->interrupt();

        m_thread_sp->join();
    }

    //google::protobuf::ShutdownProtobufLibrary();
}

//==============================================================================
//      Client::start()
//==============================================================================
void Client::start()
{
    DEBUG_STREAM << "Client::start()" << endl;

    m_dBManager_sp->connect();

//    m_protocolManager_sp = ProtocolManager::create(m_deviceImpl_p,
//        m_configuration_sp, m_dBManager_sp);

    m_ioService.reset();

    m_work_sp.reset(new boost::asio::io_service::work(m_ioService));

    m_thread_sp.reset(new boost::thread(boost::bind(&Client::run, this)));
}

//==============================================================================
//      Client::stop()
//==============================================================================
void Client::stop()
{
    DEBUG_STREAM << "Client::stop()" << endl;

    m_ioService.stop();

    m_work_sp.reset();

    if(m_thread_sp)
    {
        m_thread_sp->interrupt();

        m_thread_sp->join();
    }

    m_thread_sp.reset();

    //m_protocolManager_sp.reset();

    m_dBManager_sp->disconnect();
}

//==============================================================================
//      Client::readState()
//==============================================================================
Tango::DevState Client::readState()
{
    DEBUG_STREAM << "Client::readState()" << endl;

    boost::mutex::scoped_lock stateLock(m_stateMutex);

    return m_state;
}

//==============================================================================
//      Client::readStatus()
//==============================================================================
std::string Client::readStatus()
{
    DEBUG_STREAM << "Client::readStatus()" << endl;

    boost::mutex::scoped_lock statusLock(m_statusMutex);

    return m_status;
}

//==============================================================================
//      Client::writeState()
//==============================================================================
void Client::writeState(Tango::DevState state)
{
    DEBUG_STREAM << "Client::writeState()" << endl;

    boost::mutex::scoped_lock stateLock(m_stateMutex);

    m_state = state;
}

//==============================================================================
//      Client::writeStatus()
//==============================================================================
void Client::writeStatus(std::string status)
{
    DEBUG_STREAM << "Client::writeStatus()" << endl;

    boost::mutex::scoped_lock statusLock(m_statusMutex);

    m_status = status;
}

//==============================================================================
//      Client::run()
//==============================================================================
void Client::run()
{
    DEBUG_STREAM << "Client::run() Starting" << endl;

    while(true)
    {
        try
        {
            boost::system::error_code ec;
            m_ioService.run(ec);

            if(ec)
            {
                ERROR_STREAM << "Client::run() " << ec.message() << endl;
            }
            break;
        }
        catch(std::exception& ex)
        {
            ERROR_STREAM << "Client::run() " << ex.what() << endl;
        }
        catch(boost::thread_interrupted& ex)
        {
            DEBUG_STREAM << "Client::run() interrupt" << endl;
            break;
        }
    }

    DEBUG_STREAM << "Client::run() Stopping" << endl;
}

//==============================================================================
//      Client::startResolve()
//==============================================================================
void Client::startResolve()
{
    DEBUG_STREAM << "Client::startResolve()" << endl;

    std::stringstream infoStream;
    infoStream << "Resolving host: " << m_configuration_sp->getRemoteHost()
        << " port: " << m_configuration_sp->getRemotePort();

    INFO_STREAM << "Client::startResolve() " << infoStream.str() << endl;

    writeState(Tango::ON);
    writeStatus(infoStream.str());

    boost::asio::ip::tcp::resolver::query query(m_configuration_sp->getRemoteHost(),
        boost::lexical_cast<std::string>(m_configuration_sp->getRemotePort()));

    m_resetConnectionTimer.expires_from_now(
        boost::posix_time::seconds(m_configuration_sp->getTimeout()));

    m_resolver.async_resolve(query, boost::bind(&Client::handleResolve, this,
        boost::asio::placeholders::error, boost::asio::placeholders::iterator));

    m_resetConnectionTimer.async_wait(boost::bind(&Client::resetConnection, this));
}

//==============================================================================
//      Client::handleResolve()
//==============================================================================
void Client::handleResolve(const boost::system::error_code& errorCode,
    boost::asio::ip::tcp::resolver::iterator endPointIterator)
{
    DEBUG_STREAM << "Client::handleResolve()" << endl;

    if(!errorCode)
    {
        startConnect(endPointIterator);
    }
    else
    {
        ERROR_STREAM << "Client::handleResolve() " << errorCode.message() << endl;

        writeState(Tango::FAULT);
        writeStatus(errorCode.message());
    }
}

////==============================================================================
////      Client::handleRequest()
////==============================================================================
//void Client::handleWriteRequest(const boost::system::error_code& errorCode)
//{
//    DEBUG_STREAM << "Client::handleRequest()" << endl;
//
//    if(!errorCode)
//    {
//        startReadResponseHeader();
//    }
//    else
//    {
//        ERROR_STREAM << "Client::handleRequest() " << errorCode.message() << endl;
//
//        writeState(Tango::FAULT);
//        writeStatus(errorCode.message());
//    }
//}
//
////==============================================================================
////      Client::handleReadResponseHeader()
////==============================================================================
//void Client::handleReadResponseHeader(const boost::system::error_code& errorCode)
//{
//    DEBUG_STREAM << "Client::handleReadResponseHeader()" << endl;
//
//    if(!errorCode)
//    {
//        boost::uint32_t bodySize = decodeHeader(m_readBuff);
//
//        startReadResponseBody(bodySize);
//    }
//    else
//    {
//        ERROR_STREAM << "Client::handleReadResponseHeader() " << errorCode.message() << endl;
//
//        writeState(Tango::FAULT);
//        writeStatus(errorCode.message());
//    }
//}
//
////==============================================================================
////      Client::handleReadResponseBody()
////==============================================================================
//void Client::handleReadResponseBody(const boost::system::error_code& errorCode)
//{
//    DEBUG_STREAM << "Client::handleReadResponseBody()" << endl;
//
//    if(!errorCode)
//    {
//        try
//        {
//            ResponseSP response_sp(new Response);
//
//            response_sp->ParseFromArray(&m_readBuff[HEADER_SIZE],
//                m_readBuff.size() - HEADER_SIZE);
//
//            m_protocolManager_sp->processResponse(response_sp);
//
//            if(m_protocolManager_sp->waitBeforeRequest())
//            {
//                m_requestResponseTimer.expires_from_now(
//                    boost::posix_time::seconds(m_configuration_sp->getRefreshTime()));
//
//                m_requestResponseTimer.async_wait(
//                    boost::bind(&Client::startWriteRequest, this));
//            }
//            else
//            {
//                startWriteRequest();
//            }
//        }
//        catch(std::exception& ec)
//        {
//            ERROR_STREAM << "Client::handleResponse() " << ec.what() << endl;
//
//            writeState(Tango::FAULT);
//            writeStatus(ec.what());
//        }
//        catch(...)
//        {
//            ERROR_STREAM << "Client::handleResponse() Unknown error" << endl;
//
//            writeState(Tango::FAULT);
//            writeStatus("Unknown error");
//        }
//    }
//    else
//    {
//        ERROR_STREAM << "Client::handleResponse() " << errorCode.message() << endl;
//
//        writeState(Tango::FAULT);
//        writeStatus(errorCode.message());
//    }
//}

//==============================================================================
//      Client::resetConnection()
//==============================================================================
void Client::resetConnection()
{
    DEBUG_STREAM << "Client::resetConnection()" << endl;

    if(m_resetConnectionTimer.expires_at() <=
            boost::asio::deadline_timer::traits_type::now())
    {
        ERROR_STREAM << "Client::resetConnection() Connection timeout" << endl;

        m_resetConnectionTimer.expires_at(boost::posix_time::pos_infin);
        m_requestResponseTimer.expires_at(boost::posix_time::pos_infin);

        //m_protocolManager_sp->resetProtocolStatus();

        closeConnection();

        startResolve();
    }

    m_resetConnectionTimer.async_wait(boost::bind(&Client::resetConnection, this));
}

//==============================================================================
//      Client::encodeHeader()
//==============================================================================
void Client::encodeHeader(std::vector<boost::uint8_t>& buf, boost::uint32_t size)
    throw(std::runtime_error)
{
    DEBUG_STREAM << "Client::encodeHeader()" << endl;

    if(buf.size() < HEADER_SIZE)
        throw std::runtime_error("Buffer to small to contain header!");

    buf[0] = static_cast<boost::uint8_t>((size >> 24) & 0xFF);
    buf[1] = static_cast<boost::uint8_t>((size >> 16) & 0xFF);
    buf[2] = static_cast<boost::uint8_t>((size >> 8) & 0xFF);
    buf[3] = static_cast<boost::uint8_t>(size & 0xFF);
}

//==============================================================================
//      Client::decodeHeader()
//==============================================================================
boost::uint32_t Client::decodeHeader(std::vector<boost::uint8_t>& buf)
    throw(std::runtime_error)
{
    DEBUG_STREAM << "Client::decodeHeader()" << endl;

    if(buf.size() < HEADER_SIZE)
        throw std::runtime_error("Buffer to small to contain header!");

    boost::uint32_t size = 0;

    for (unsigned i = 0; i < HEADER_SIZE; ++i)
        size = size * 256 + (static_cast<unsigned>(buf[i]) & 0xFF);

    return size;
}

}   //namespace
 No newline at end of file

src/Client.h

0 → 100644
+179 −0
Original line number Original line Diff line number Diff line
#ifndef CLIENT_H
#define	CLIENT_H

#include <Configuration.h>
#include <DBManager.h>
//#include <ProtocolManager.h>
//#include <Request.pb.h>
//#include <Response.pb.h>

#include <tango.h>

#include <boost/shared_ptr.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/cstdint.hpp>

//#include <Request.pb.h>
//#include <Response.pb.h>

namespace DataImporter_ns
{

class Client : public Tango::LogAdapter
{
public:
//------------------------------------------------------------------------------
//  [Public] Shared pointer typedef
//------------------------------------------------------------------------------
    typedef boost::shared_ptr<Client> SP;

protected:
//------------------------------------------------------------------------------
//  [Protected] Constructor destructor
//------------------------------------------------------------------------------
    Client(Tango::DeviceImpl*, Configuration::SP);

    virtual ~Client();

public:
//------------------------------------------------------------------------------
//  [Public] Thread management methods
//------------------------------------------------------------------------------
    virtual void start() = 0;

    virtual void stop() = 0;

//------------------------------------------------------------------------------
//  [Public] Read state and status methods
//------------------------------------------------------------------------------
    virtual Tango::DevState readState();

    virtual std::string readStatus();

protected:
//------------------------------------------------------------------------------
//  [Protected] Write state and status methods
//------------------------------------------------------------------------------
    virtual void writeState(Tango::DevState);

    virtual void writeStatus(std::string);

//------------------------------------------------------------------------------
//  [Protected] IO service run thread method
//------------------------------------------------------------------------------
    virtual void run();

//------------------------------------------------------------------------------
//  [Protected] Endpoint resolution methods
//------------------------------------------------------------------------------
    virtual void startResolve();

    virtual void handleResolve(const boost::system::error_code&,
        boost::asio::ip::tcp::resolver::iterator);

//------------------------------------------------------------------------------
//  [Protected] Connection initialization methods
//------------------------------------------------------------------------------
    virtual void startConnect(boost::asio::ip::tcp::resolver::iterator) = 0;

    virtual void handleConnect(const boost::system::error_code&,
        boost::asio::ip::tcp::resolver::iterator) = 0;

////------------------------------------------------------------------------------
////  [Protected] Write request methods
////------------------------------------------------------------------------------
//    virtual void startWriteRequest() = 0;
//
//    virtual void handleWriteRequest(const boost::system::error_code&);
//
////------------------------------------------------------------------------------
////  [Protected] Read response header methods
////------------------------------------------------------------------------------
//    virtual void startReadResponseHeader() = 0;
//
//    virtual void handleReadResponseHeader(const boost::system::error_code&);
//
////------------------------------------------------------------------------------
////  [Protected] Read response body methods
////------------------------------------------------------------------------------
//    virtual void startReadResponseBody(boost::uint32_t) = 0;
//
//    virtual void handleReadResponseBody(const boost::system::error_code&);

//------------------------------------------------------------------------------
//  [Protected] Connection reset and timeout handler methods
//------------------------------------------------------------------------------
    virtual void closeConnection() = 0;

    virtual void resetConnection();

//------------------------------------------------------------------------------
//  [Protected] Header encoding decoding methods
//------------------------------------------------------------------------------
    virtual void encodeHeader(std::vector<boost::uint8_t>&, boost::uint32_t)
        throw(std::runtime_error);

    virtual boost::uint32_t decodeHeader(std::vector<boost::uint8_t>&)
        throw(std::runtime_error);

//------------------------------------------------------------------------------
//  [Protected] Class variables
//------------------------------------------------------------------------------
    //Tango server class pointer
    Tango::DeviceImpl* m_deviceImpl_p;

    //Configuration shared pointer
    Configuration::SP m_configuration_sp;

    //Database manager shared pointer
    DBManager::SP m_dBManager_sp;

    //Protocol manager shared pointer
    //ProtocolManager::SP m_protocolManager_sp;

    //IO service instance
    boost::asio::io_service m_ioService;

    //Work IO service instance
    boost::scoped_ptr<boost::asio::io_service::work> m_work_sp;

    //DNS resolver instance
    boost::asio::ip::tcp::resolver m_resolver;

    //Thread for IO service run scoped pointer
    boost::scoped_ptr<boost::thread> m_thread_sp;

    //First connection timeout
    boost::asio::deadline_timer m_resetConnectionTimer;

    //Request response timeout
    boost::asio::deadline_timer m_requestResponseTimer;

    //Header size on binary stream
    const unsigned HEADER_SIZE = 4;

    //Buffer for binary data read from stream
    std::vector<boost::uint8_t> m_readBuff;

    //Tango state property mutex
    boost::mutex m_stateMutex;

    //Tango state property
    Tango::DevState m_state;

    //Tango status property mutex
    boost::mutex m_statusMutex;

    //Tango status property
    std::string m_status;

    //Address and port of remote endpoint
    std::string m_remoteEndpoint;
};

}   //End of namespace

#endif	/* CLIENT_H */

src/Configuration.h

0 → 100644
+136 −0
Original line number Original line Diff line number Diff line
#ifndef CONFIGURATION_H
#define	CONFIGURATION_H

#include <boost/shared_ptr.hpp>

namespace DataImporter_ns
{

class Configuration
{
public:
//------------------------------------------------------------------------------
//	[Public] Shared pointer typedef
//------------------------------------------------------------------------------
	typedef boost::shared_ptr<Configuration> SP;

private:
//------------------------------------------------------------------------------
//	[Private] Constructor destructor deleter
//------------------------------------------------------------------------------
	Configuration(std::string certificateFile, std::string storagePath,
        std::string remoteHost, unsigned int remotePort,
        std::string remoteUsername, std::string remotePassword,
        std::string databaseHost, unsigned int databasePort,
        std::string databaseUsername, std::string databasePassword,
        std::string databaseSchema, std::string databaseTable,
        unsigned int refreshTime, unsigned int timeout) :
        m_certificateFile (certificateFile), m_storagePath(storagePath),
        m_remoteHost(remoteHost), m_remotePort(remotePort),
        m_remoteUsername(remoteUsername), m_remotePassword(remotePassword),
        m_databaseHost(databaseHost), m_databasePort(databasePort),
        m_databaseUsername(databaseUsername), m_databasePassword(databasePassword),
        m_databaseSchema(databaseSchema), m_databaseTable(databaseTable),
        m_refreshTime(refreshTime), m_timeout(timeout) { };

	virtual ~Configuration() {}

	class Deleter;
	friend class Deleter;
	class Deleter
	{
	public:
		void operator()(Configuration* c) { delete c; }
	};

public:
//------------------------------------------------------------------------------
//	[Public] Create class method
//------------------------------------------------------------------------------
	static Configuration::SP create(std::string certificateFile,
        std::string storagePath, std::string remoteHost,
        unsigned int remotePort, std::string remoteUsername,
        std::string remotePassword, std::string databaseHost
,        unsigned int databasePort, std::string databaseUsername,
        std::string databasePassword, std::string databaseSchema,
        std::string databaseTable, unsigned int refreshTime,
        unsigned int timeout)
	{
		Configuration::SP c_sp(new Configuration(certificateFile, storagePath,
            remoteHost, remotePort, remoteUsername, remotePassword,
            databaseHost, databasePort, databaseUsername, databasePassword,
            databaseSchema, databaseTable, refreshTime, timeout),
            Configuration::Deleter());

		return c_sp;
	}

//------------------------------------------------------------------------------
//	[Public] Getter methods
//------------------------------------------------------------------------------
	std::string	getCertificateFile() const { return m_certificateFile; }
    std::string getStoragePath() const { return m_storagePath; }
	std::string	getRemoteHost() const { return m_remoteHost; }
	unsigned int getRemotePort() const { return m_remotePort; }
	std::string	getRemoteUsername() const { return m_remoteUsername; }
	std::string	getRemotePassword() const { return m_remotePassword; }
	std::string	getDatabaseHost() const { return m_databaseHost; }
	unsigned int getDatabasePort() const { return m_databasePort; }
	std::string	getDatabaseUsername() const { return m_databaseUsername; }
	std::string	getDatabasePassword() const { return m_databasePassword; }
	std::string	getDatabaseSchema() const { return m_databaseSchema; }
	std::string	getDatabaseTable() const { return m_databaseTable; }
	unsigned int getRefreshTime() const { return m_refreshTime; }
	unsigned int getTimeout() const { return m_timeout; }

private:
//------------------------------------------------------------------------------
//	[Private] class variables
//------------------------------------------------------------------------------
	//Absolute path to certificate chain file
	const std::string	m_certificateFile;

	//Absolute path to storage
	const std::string	m_storagePath;

    //Metadata exporter remote host
	const std::string	m_remoteHost;

    //Metadata exporter remote port
	const unsigned int m_remotePort;

    //Metadata exporter login username
	const std::string	m_remoteUsername;

    //Metadata exporter remote password
	const std::string	m_remotePassword;

    //Metadata local database host
	const std::string	m_databaseHost;

    //Metadata local database port
	const unsigned int m_databasePort;

    //Metadata local database username
	const std::string	m_databaseUsername;

    //Metadata local database password
	const std::string	m_databasePassword;

    //Metadata local database schema
	const std::string	m_databaseSchema;

    //Metadata local database table
	const std::string	m_databaseTable;

    //Remote database request period (seconds)
	const unsigned int m_refreshTime;

	//Connection timeout (seconds)
	const unsigned int m_timeout;
};

}   //End of namespace

#endif	/* CONFIGURATION_H */

src/DBManager.cpp

0 → 100644
+80 −0

File added.

Preview size limit exceeded, changes collapsed.

src/DBManager.h

0 → 100644
+86 −0

File added.

Preview size limit exceeded, changes collapsed.

Loading