Loading src/EventBuffer.cpp 0 → 100644 +126 −0 Original line number Diff line number Diff line #include <EventBuffer.h> #include <boost/thread/locks.hpp> //============================================================================== // EventBuffer::EventBuffer() //============================================================================== EventBuffer::EventBuffer(Tango::DeviceImpl* device_impl_p) : Tango::LogAdapter(device_impl_p) { DEBUG_STREAM << "EventBuffer::EventBuffer()" << endl; } //============================================================================== // EventBuffer::~EventBuffer() //============================================================================== EventBuffer::~EventBuffer() { DEBUG_STREAM << "EventBuffer::~EventBuffer()" << endl; } //============================================================================== // EventBuffer::insertNew() //============================================================================== EventBuffer::SP EventBuffer::create(Tango::DeviceImpl* device_impl_p) { EventBuffer::SP e_sp(new EventBuffer(device_impl_p), EventBuffer::Deleter()); return e_sp; } //============================================================================== // EventBuffer::insertNew() //============================================================================== void EventBuffer::insertNew(boost::filesystem::path path) throw(std::overflow_error, std::invalid_argument) { DEBUG_STREAM << "EventBuffer::insertNew()" << endl; boost::mutex::scoped_lock lock(m_mutex); if(m_buffer.size()+1 > m_buffer.max_size()) throw std::overflow_error("EventBuffer reach its max size"); if(!m_buffer.insert(std::pair<boost::filesystem::path, bool>(path, false)).second) throw std::invalid_argument("EventBuffer duplicate element " + path.string()); DEBUG_STREAM << "EventBuffer::insertNew(): " << path.string() << " inserted" << endl; lock.unlock(); m_conditionVariable.notify_all(); } //============================================================================== // EventBuffer::waitNew() //============================================================================== boost::filesystem::path EventBuffer::waitNew() { DEBUG_STREAM << "EventBuffer::waitNew()" << endl; boost::mutex::scoped_lock lock(m_mutex); do { std::map<boost::filesystem::path, bool>::iterator it; for(it=m_buffer.begin(); it!=m_buffer.end(); it++) { if(it->second == false) { DEBUG_STREAM << "EventBuffer::insertNew(): " "found new element: " << it->first.string() << endl; it->second = true; return it->first; } } DEBUG_STREAM << "EventBuffer::waitNew(): waiting new element" << endl; m_conditionVariable.wait(lock); } while(true); } //============================================================================== // EventBuffer::removeProcessed() //============================================================================== void EventBuffer::removeProcessed(boost::filesystem::path path) throw(std::invalid_argument) { DEBUG_STREAM << "EventBuffer::removeProcessed()" << endl; boost::mutex::scoped_lock lock(m_mutex); std::map<boost::filesystem::path, bool>::iterator it; it = m_buffer.find(path); if(it == m_buffer.end()) throw std::invalid_argument("Element not found into EventBuffer"); if(it->second == false) throw std::invalid_argument("Deleting not processed into EventBuffer"); m_buffer.erase(it); DEBUG_STREAM << "EventBuffer::removeProcessed(): " << it->first.string() << " waiting new element" << endl; } //============================================================================== // EventBuffer::size() //============================================================================== std::size_t EventBuffer::size() { DEBUG_STREAM << "EventBuffer::size()" << endl; boost::mutex::scoped_lock lock(m_mutex); return m_buffer.size(); } /*___oOo___*/ src/EventBuffer.h 0 → 100644 +70 −0 Original line number Diff line number Diff line #ifndef EVENT_BUFFER_H #define EVENT_BUFFER_H #include <tango.h> #include <map> #include <vector> #include <iostream> #include <stdexcept> #include <boost/shared_ptr.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/condition_variable.hpp> #include <boost/filesystem.hpp> class EventBuffer : public Tango::LogAdapter { public: //------------------------------------------------------------------------------ // [Public] Shared pointer typedef //------------------------------------------------------------------------------ typedef boost::shared_ptr<EventBuffer> SP; typedef std::vector< SP > SPVector; protected: //------------------------------------------------------------------------------ // [Protected] Constructor destructor deleter //------------------------------------------------------------------------------ EventBuffer(Tango::DeviceImpl*); virtual ~EventBuffer(); class Deleter; friend class Deleter; class Deleter { public: void operator()(EventBuffer* e) { delete e; } }; public: //------------------------------------------------------------------------------ // [Public] Users methods //------------------------------------------------------------------------------ static EventBuffer::SP create(Tango::DeviceImpl*); virtual void insertNew(boost::filesystem::path) throw(std::overflow_error, std::invalid_argument); virtual boost::filesystem::path waitNew(); virtual void removeProcessed(boost::filesystem::path) throw(std::invalid_argument); virtual std::size_t size(); protected: //------------------------------------------------------------------------------ // [Protected] Class variables //------------------------------------------------------------------------------ //Access synchronization mutex boost::mutex m_mutex; //Access synchronization condition variable boost::condition_variable m_conditionVariable; //File buffer std::map<boost::filesystem::path, bool> m_buffer; }; #endif /*!EVENT_BUFFER_H*/ Loading
src/EventBuffer.cpp 0 → 100644 +126 −0 Original line number Diff line number Diff line #include <EventBuffer.h> #include <boost/thread/locks.hpp> //============================================================================== // EventBuffer::EventBuffer() //============================================================================== EventBuffer::EventBuffer(Tango::DeviceImpl* device_impl_p) : Tango::LogAdapter(device_impl_p) { DEBUG_STREAM << "EventBuffer::EventBuffer()" << endl; } //============================================================================== // EventBuffer::~EventBuffer() //============================================================================== EventBuffer::~EventBuffer() { DEBUG_STREAM << "EventBuffer::~EventBuffer()" << endl; } //============================================================================== // EventBuffer::insertNew() //============================================================================== EventBuffer::SP EventBuffer::create(Tango::DeviceImpl* device_impl_p) { EventBuffer::SP e_sp(new EventBuffer(device_impl_p), EventBuffer::Deleter()); return e_sp; } //============================================================================== // EventBuffer::insertNew() //============================================================================== void EventBuffer::insertNew(boost::filesystem::path path) throw(std::overflow_error, std::invalid_argument) { DEBUG_STREAM << "EventBuffer::insertNew()" << endl; boost::mutex::scoped_lock lock(m_mutex); if(m_buffer.size()+1 > m_buffer.max_size()) throw std::overflow_error("EventBuffer reach its max size"); if(!m_buffer.insert(std::pair<boost::filesystem::path, bool>(path, false)).second) throw std::invalid_argument("EventBuffer duplicate element " + path.string()); DEBUG_STREAM << "EventBuffer::insertNew(): " << path.string() << " inserted" << endl; lock.unlock(); m_conditionVariable.notify_all(); } //============================================================================== // EventBuffer::waitNew() //============================================================================== boost::filesystem::path EventBuffer::waitNew() { DEBUG_STREAM << "EventBuffer::waitNew()" << endl; boost::mutex::scoped_lock lock(m_mutex); do { std::map<boost::filesystem::path, bool>::iterator it; for(it=m_buffer.begin(); it!=m_buffer.end(); it++) { if(it->second == false) { DEBUG_STREAM << "EventBuffer::insertNew(): " "found new element: " << it->first.string() << endl; it->second = true; return it->first; } } DEBUG_STREAM << "EventBuffer::waitNew(): waiting new element" << endl; m_conditionVariable.wait(lock); } while(true); } //============================================================================== // EventBuffer::removeProcessed() //============================================================================== void EventBuffer::removeProcessed(boost::filesystem::path path) throw(std::invalid_argument) { DEBUG_STREAM << "EventBuffer::removeProcessed()" << endl; boost::mutex::scoped_lock lock(m_mutex); std::map<boost::filesystem::path, bool>::iterator it; it = m_buffer.find(path); if(it == m_buffer.end()) throw std::invalid_argument("Element not found into EventBuffer"); if(it->second == false) throw std::invalid_argument("Deleting not processed into EventBuffer"); m_buffer.erase(it); DEBUG_STREAM << "EventBuffer::removeProcessed(): " << it->first.string() << " waiting new element" << endl; } //============================================================================== // EventBuffer::size() //============================================================================== std::size_t EventBuffer::size() { DEBUG_STREAM << "EventBuffer::size()" << endl; boost::mutex::scoped_lock lock(m_mutex); return m_buffer.size(); } /*___oOo___*/
src/EventBuffer.h 0 → 100644 +70 −0 Original line number Diff line number Diff line #ifndef EVENT_BUFFER_H #define EVENT_BUFFER_H #include <tango.h> #include <map> #include <vector> #include <iostream> #include <stdexcept> #include <boost/shared_ptr.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/condition_variable.hpp> #include <boost/filesystem.hpp> class EventBuffer : public Tango::LogAdapter { public: //------------------------------------------------------------------------------ // [Public] Shared pointer typedef //------------------------------------------------------------------------------ typedef boost::shared_ptr<EventBuffer> SP; typedef std::vector< SP > SPVector; protected: //------------------------------------------------------------------------------ // [Protected] Constructor destructor deleter //------------------------------------------------------------------------------ EventBuffer(Tango::DeviceImpl*); virtual ~EventBuffer(); class Deleter; friend class Deleter; class Deleter { public: void operator()(EventBuffer* e) { delete e; } }; public: //------------------------------------------------------------------------------ // [Public] Users methods //------------------------------------------------------------------------------ static EventBuffer::SP create(Tango::DeviceImpl*); virtual void insertNew(boost::filesystem::path) throw(std::overflow_error, std::invalid_argument); virtual boost::filesystem::path waitNew(); virtual void removeProcessed(boost::filesystem::path) throw(std::invalid_argument); virtual std::size_t size(); protected: //------------------------------------------------------------------------------ // [Protected] Class variables //------------------------------------------------------------------------------ //Access synchronization mutex boost::mutex m_mutex; //Access synchronization condition variable boost::condition_variable m_conditionVariable; //File buffer std::map<boost::filesystem::path, bool> m_buffer; }; #endif /*!EVENT_BUFFER_H*/