Commit d6b2dbfe authored by Marco De Marco's avatar Marco De Marco
Browse files

Porting complete, compile, before runtime test

parent e5f13819
Loading
Loading
Loading
Loading
+4 −3
Original line number Diff line number Diff line
@@ -18,9 +18,10 @@ LIB_DIR=/usr/local/omniORB-4.1.7/lib \
CC=g++
CXX_DEBUG_FLAGS=-g -DVERBOSE_DEBUG
CXX_RELEASE_FLAGS=-O3
CXX_DEFAULT_FLAGS=-c -Wall -Wextra -Werror -std=c++11 -std=gnu++11
LDFLAGS=-Wall -lomniORB4 -lomniDynamic4 -lCOS4 -lomnithread \
	  -ltango -llog4tango -lsoci_core -lsoci_mysql
CXX_DEFAULT_FLAGS=-c -Wall -Wextra -std=c++11 -std=gnu++11
LDFLAGS=-Wall -lomniORB4 -lomniDynamic4 -lCOS4 -lomnithread -ltango -llog4tango \
	-lsoci_core -lsoci_mysql -lCCfits -lboost_thread -lboost_filesystem \
	-lboost_system -lboost_iostreams -lboost_date_time -lboost_regex
INC_PARM=$(foreach d, $(INC_DIR), -I$d)
LIB_PARM=$(foreach d, $(LIB_DIR), -L$d)
#================================================================================
+33 −26
Original line number Diff line number Diff line
@@ -37,24 +37,27 @@ EventBuffer::SP EventBuffer::create(Tango::DeviceImpl* deviceImpl_p)
//	EventBuffer::insertNew()
//==============================================================================
void EventBuffer::insertNew(boost::filesystem::path path)
	throw(std::overflow_error, std::invalid_argument)
{
    DEBUG_STREAM << "EventBuffer::insertNew()" << endl;

	boost::mutex::scoped_lock lock(m_mutex);

	if(m_buffer.size()+1 > m_buffer.max_size())
		throw std::overflow_error("EventBuffer reach its max size");
    bool inserted = m_buffer.insert(
        std::pair<boost::filesystem::path, bool>(path, false) ).second;
    
	if(!m_buffer.insert(std::pair<boost::filesystem::path, bool>(path, false)).second)
		throw std::invalid_argument("EventBuffer duplicate element " + path.string());
    
    DEBUG_STREAM << "EventBuffer::insertNew(): " << path.string() << " inserted" << endl;
	if(inserted)
    {
        DEBUG_STREAM << "EventBuffer::insertNew() element "
            << path.string() << " inserted" << endl;

        lock.unlock();

        m_conditionVariable.notify_all();           
    }
    else
        WARN_STREAM << "EventBuffer::insertNew() element "
            << path.string() << " duplicated" << endl;
}

//==============================================================================
//	EventBuffer::waitNew()
@@ -91,7 +94,6 @@ boost::filesystem::path EventBuffer::waitNew()
//	EventBuffer::removeProcessed()
//==============================================================================
void EventBuffer::removeProcessed(boost::filesystem::path path)
	throw(std::invalid_argument)
{
    DEBUG_STREAM << "EventBuffer::removeProcessed()" << endl;

@@ -101,16 +103,21 @@ void EventBuffer::removeProcessed(boost::filesystem::path path)

	it = m_buffer.find(path);

	if(it == m_buffer.end())
		throw std::invalid_argument("Element not found into EventBuffer");

	if(it->second == false)
		throw std::invalid_argument("Deleting not processed into EventBuffer");

	if(it != m_buffer.end())
    {
        if(it->second != false)
        {
            m_buffer.erase(it);

    DEBUG_STREAM << "EventBuffer::removeProcessed(): "
            << it->first.string() << " waiting new element" << endl;    
            DEBUG_STREAM << "EventBuffer::removeProcessed() element "
                << path.string() << " removed" << endl;
        }
            ERROR_STREAM << "EventBuffer::removeProcessed() element" 
                << path.string() << " to remove not processed" << endl;    
    }
    else
        ERROR_STREAM << "EventBuffer::removeProcessed() element" 
            << path.string() << " not found" << endl;
}

//==============================================================================
+2 −4
Original line number Diff line number Diff line
@@ -46,13 +46,11 @@ public:
//------------------------------------------------------------------------------
	static EventBuffer::SP create(Tango::DeviceImpl*);

	virtual void insertNew(boost::filesystem::path) 
		throw(std::overflow_error, std::invalid_argument);
	virtual void insertNew(boost::filesystem::path);

	virtual boost::filesystem::path waitNew();

	virtual void removeProcessed(boost::filesystem::path) 
		throw(std::invalid_argument);
	virtual void removeProcessed(boost::filesystem::path);

	virtual std::size_t size(); 

src/EventThread.cpp

0 → 100644
+317 −0
Original line number Diff line number Diff line
#include <EventThread.h>
#include <FitsImporter.h>
#include <WorkerThread.h>

#include <cassert>
#include <unistd.h>
#include <fcntl.h>
#include <cerrno>

#include <boost/filesystem.hpp>
#include <boost/scoped_ptr.hpp>

namespace FitsImporter_ns
{

//==============================================================================
//	EventThread::EventThread()
//==============================================================================
EventThread::EventThread(FitsImporter* fitsImporter_p, 
    Configuration::SP configuration_sp, Instrument::SPVector instrumentList_spvector,
    Instrument::SP defaultInstrument_sp) : Tango::LogAdapter(fitsImporter_p),
    m_fitsImporter_p(fitsImporter_p), m_configuration_sp(configuration_sp),
    m_instrumentList_spvector(instrumentList_spvector), 
    m_defaultInstrument_sp(defaultInstrument_sp)
{
	DEBUG_STREAM << "EventThread::EventThread()" << endl;
    
    m_state = Tango::OFF;
    m_status = "Event thread not running";    
}

//==============================================================================
//	EventThread::~EventThread()
//==============================================================================
EventThread::~EventThread()
{
	DEBUG_STREAM << "EventThread::~EventThread()" << endl;

	if(m_threadGroup_sp)
	{
		m_threadGroup_sp->interrupt_all();
	
		m_threadGroup_sp->join_all();
	}
}

//==============================================================================
//	EventThread::create()
//==============================================================================
EventThread::SP EventThread::create(FitsImporter* fitsImporter_p, 
    Configuration::SP configuration_sp, Instrument::SPVector instrumentList_spvector,
    Instrument::SP defaultInstrument_sp) 
{
	EventThread::SP e_sp(new EventThread(fitsImporter_p, configuration_sp,
        instrumentList_spvector, defaultInstrument_sp), EventThread::Deleter());

	return e_sp;
}

//==============================================================================
//	EventThread::start()
//==============================================================================
void EventThread::start() throw(std::runtime_error)	
{
	DEBUG_STREAM << "EventThread::start()" << endl;

    initEventBuffer();

    initINotify();

    initConnectionManager();

    initThreadGroup();
}

//==============================================================================
//	EventThread::stop()
//==============================================================================
void EventThread::stop()
{
	DEBUG_STREAM << "EventThread::stop()" << endl;

	if(m_threadGroup_sp)
	{
		m_threadGroup_sp->interrupt_all();
	
		m_threadGroup_sp->join_all();
	}

	inotify_rm_watch(fd, wd);

	if(fd) { close(fd); }
}

//==============================================================================
//	EventThread::readState()
//==============================================================================
Tango::DevState EventThread::readState()
{
    DEBUG_STREAM << "EventThread::readState()" << endl;
    
    boost::mutex::scoped_lock stateLock(m_stateMutex);
    
    return m_state;
}
    
//==============================================================================
//	EventThread::readStatus()
//==============================================================================
std::string EventThread::readStatus()
{
    DEBUG_STREAM << "EventThread::readStatus()" << endl;
    
    boost::mutex::scoped_lock statusLock(m_statusMutex);
    
    return m_status;
}

//==============================================================================
//	EventThread::initEventBuffer()
//==============================================================================
void EventThread::initEventBuffer() throw(std::runtime_error)	
{
    DEBUG_STREAM << "EventThread::initEventBuffer()" << endl;

    m_eventBuffer_sp = EventBuffer::create(m_fitsImporter_p);

    std::string watchPath(m_configuration_sp->getWatchPath());

    boost::filesystem::path path(watchPath);

    //Check if watch path exists
    if(!boost::filesystem::exists(path))
        throw std::runtime_error("Watch path \"" +
            watchPath + "\" does not exist");

    //And if it's a directory
    if(!boost::filesystem::is_directory(path))
    throw std::runtime_error("Watch path \"" +
    watchPath + "\" is not a valid directory");

    boost::filesystem::directory_iterator startIt(path);
    boost::filesystem::directory_iterator endIt;

    while(startIt != endIt)
    {
        if(boost::filesystem::is_regular_file(startIt->status()))
            m_eventBuffer_sp->insertNew(startIt->path());
        
        startIt++;
    }
}

//==============================================================================
//	EventThread::initINotify()
//==============================================================================
void EventThread::initINotify() throw(std::runtime_error)
{	
    DEBUG_STREAM << "EventThread::initINotify()" << endl;

    if((fd = inotify_init ()) < 0)
        throw std::runtime_error("INotify initialization");

	std::string watchPath(m_configuration_sp->getWatchPath());

	uint32_t iNotifyMask = m_configuration_sp->getINotifyMask();

    if((wd = inotify_add_watch(fd, watchPath.c_str(), iNotifyMask)) < 0)
        throw std::runtime_error("INotify add watch");

    int flags;
    if((flags = fcntl(fd,F_GETFL,0)) < 0)
        throw std::runtime_error("File descriptor get flags");

    if(fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0)
        throw std::runtime_error("File descriptor set flags");
}

//==============================================================================
//	EventThread::initConnectionManager()
//==============================================================================
void EventThread::initConnectionManager() throw(std::runtime_error)
{
    DEBUG_STREAM << "EventThread::initConnectionManager()" << endl;

	m_connectionManager_sp = ConnectionManager::create(m_fitsImporter_p,
		 m_configuration_sp, m_instrumentList_spvector);
}

//==============================================================================
//	EventThread::initThreadGroup()
//==============================================================================
void EventThread::initThreadGroup() throw(std::runtime_error)
{
    DEBUG_STREAM << "EventThread::initThreadGroup()" << endl;

	m_threadGroup_sp.reset(new boost::thread_group);

	int workerNumber = m_configuration_sp->getWorkerNumber();

	WorkerThread worker(m_fitsImporter_p, m_eventBuffer_sp, m_connectionManager_sp,
        m_configuration_sp, m_instrumentList_spvector, m_defaultInstrument_sp);        
	
	try
	{
		//Add to thread group event thread
		m_threadGroup_sp->add_thread(new boost::thread(&EventThread::eventLoop, this));			

		//Add to thread group worker threads	
		for(int i=0; i<workerNumber; i++)
			m_threadGroup_sp->add_thread(new boost::thread(&WorkerThread::workerLoop, worker));
	}
	catch(boost::thread_resource_error& ex)
	{
		std::stringstream error_stream;
		error_stream << "InitThreadGroup: " << ex.what();
		throw std::runtime_error(error_stream.str());
	}	
}

//==============================================================================
//	EventThread::eventLoop()
//==============================================================================
void EventThread::eventLoop()
{
	DEBUG_STREAM << "EventThread::eventLoop() starting loop" << endl;
       
	int sleepTime = m_configuration_sp->getSleepTime();
	boost::filesystem::path watchPath(m_configuration_sp->getWatchPath());
    
	while(true)
	{
		try
		{
			char buffer[BUF_LEN];
			int length;

			if((length = read( fd, buffer, BUF_LEN )) < 0)
			{
				if(errno != EINTR && errno != EAGAIN)
				{
                    boost::mutex::scoped_lock stateLock(m_stateMutex);   
                    m_state = Tango::ALARM;

                    boost::mutex::scoped_lock statusLock(m_statusMutex);    
                    m_status = "Event thread error on watch path read";
				}
				else
                {
                    boost::mutex::scoped_lock stateLock(m_stateMutex);   
                    m_state = Tango::ON;

                    boost::mutex::scoped_lock statusLock(m_statusMutex);    
                    m_status = "Event thread running";                
                }
			}
			else
            {
                boost::mutex::scoped_lock stateLock(m_stateMutex);   
                m_state = Tango::ON;

                boost::mutex::scoped_lock statusLock(m_statusMutex);    
                m_status = "Event thread new data found";                
                
                struct inotify_event *event;
                for(int i=0; i<length; i += EVENT_SIZE + event->len)
                {
                    event = ( struct inotify_event * ) &buffer[ i ];

                    boost::filesystem::path file(event->name);
                    boost::filesystem::path path(watchPath);
                    path /= file;

                    //Check if event is a regular file
                    if(boost::filesystem::is_regular_file(path))
                        m_eventBuffer_sp->insertNew(path);
                }           
            }
		
            DEBUG_STREAM << "EventThread::eventLoop() sleep for " << sleepTime << endl;
            
			boost::posix_time::milliseconds sleepPosixTime(sleepTime);
			boost::this_thread::sleep(sleepPosixTime);
		}
		catch(boost::thread_interrupted& ex)
		{
            DEBUG_STREAM << "EventThread::eventLoop() stopping loop" << endl;
            
            boost::mutex::scoped_lock stateLock(m_stateMutex);   
            m_state = Tango::OFF;
    
            boost::mutex::scoped_lock statusLock(m_statusMutex);    
            m_status = "Event thread not running"; 
            
			break;
		}
        catch(std::exception& ex)
        {
            boost::mutex::scoped_lock stateLock(m_stateMutex);   
            m_state = Tango::ALARM;

            boost::mutex::scoped_lock statusLock(m_statusMutex);    
            m_status = "Event thread ";
            m_status.append(ex.what()); 
        }        
		catch(...)
		{
            boost::mutex::scoped_lock stateLock(m_stateMutex);   
            m_state = Tango::ALARM;

            boost::mutex::scoped_lock statusLock(m_statusMutex);    
            m_status = "Event thread unknown exception";
		}
	} //while       
} 

}   //namespace

src/EventThread.h

0 → 100644
+124 −0
Original line number Diff line number Diff line
#ifndef EVENT_THREAD_H
#define EVENT_THREAD_H

#include <Configuration.h>
#include <Instrument.h>
#include <EventBuffer.h>
#include <ConnectionManager.h>

#include <tango.h>

#include <stdexcept>
#include <sys/inotify.h>

#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>

namespace FitsImporter_ns
{

class FitsImporter;
    
class EventThread : public Tango::LogAdapter
{
public:
//------------------------------------------------------------------------------
//	[Public] Shared pointer typedef	
//------------------------------------------------------------------------------
	typedef boost::shared_ptr<EventThread> SP;

protected:
//------------------------------------------------------------------------------
//	[Protected] Constructor destructor deleter
//------------------------------------------------------------------------------
	EventThread(FitsImporter*, Configuration::SP, Instrument::SPVector, 
            Instrument::SP);

	virtual ~EventThread();	

	class Deleter;
	friend class Deleter;
	class Deleter
	{
	public:
		void operator()(EventThread* e) { delete e; }
	};

public:
//------------------------------------------------------------------------------
//	[Public] Users methods		
//------------------------------------------------------------------------------
	static EventThread::SP create(FitsImporter*, Configuration::SP,
            Instrument::SPVector, Instrument::SP);

	virtual void start() throw(std::runtime_error);
		
	virtual void stop();
    
    virtual Tango::DevState readState();
    
    virtual std::string readStatus();
        
protected:
//------------------------------------------------------------------------------
//	[Protected] Utilities methods
//------------------------------------------------------------------------------
	virtual void initEventBuffer() throw(std::runtime_error);		

	virtual void initINotify() throw(std::runtime_error);		

	virtual void initConnectionManager() throw(std::runtime_error);
		
	virtual void initThreadGroup() throw(std::runtime_error);
		    
    virtual void eventLoop();

//------------------------------------------------------------------------------
//	[Protected] Class variables
//------------------------------------------------------------------------------
    //Tango server class pointer
    FitsImporter* m_fitsImporter_p;
    
	//Boost thread group shared pointer
	boost::scoped_ptr<boost::thread_group> m_threadGroup_sp;    
    
    //Configuration shared pointer
    Configuration::SP m_configuration_sp;
    
    //Instrument list shared pointer
    Instrument::SPVector m_instrumentList_spvector;
    
    //Default instrument shared pointer
    Instrument::SP m_defaultInstrument_sp;
    
	//Event buffer shared pointer
	EventBuffer::SP m_eventBuffer_sp;

	//Connection manager shared pointer
	ConnectionManager::SP m_connectionManager_sp;

    //Tango state synchronization
    boost::mutex m_stateMutex;
    
    //Tango state variable
    Tango::DevState m_state;
    
    //Tango status synchronization
    boost::mutex m_statusMutex;
    
    //Tango status variable
    std::string m_status;
    
    //INotify file descriptors
    int fd, wd;

    //INotify event size
    static const std::size_t EVENT_SIZE = ( sizeof (struct inotify_event) );

    //INotify event buffer length
    static const std::size_t BUF_LEN = ( 1024 * ( EVENT_SIZE + 16 ) );
};

}	//	End of namespace

#endif /*!EVENT_THREAD_H*/
Loading