Loading script/script.sh 0 → 100755 +70 −0 Original line number Diff line number Diff line #!/bin/bash #----------------------------------------------------------------------- # User parameters #----------------------------------------------------------------------- #Verify tool path VERIFY_TOOL="/home/mdm/workspace/nexecs/test/tools/fitsverify" CHECK_STRING="conform to the FITS format" NO_FILE_ERROR="failed to find or open the following file" FATAL_ERROR="Fatal" EOF_ERROR="End-of-file" #----------------------------------------------------------------------- # Verify script #----------------------------------------------------------------------- if [ "$1" == "CHECK" ]; then res=$($VERIFY_TOOL 2>&1) check=$(echo $res | grep "$CHECK_STRING" | wc | awk '{print $1}') if [ "$check" -ge "1" ]; then echo "CHECK OK" else echo "NOT OK" fi exit 0 else #Check regular expression -> fatal file=$1 file_name=${file##*/} if [[ ! "${file_name,,}" =~ ^[^\.].*\.(fits|fts).*$ ]]; then echo "FATAL" exit 0 fi #if fits verify tools exists -> fatal if [ ! -x $VERIFY_TOOL ]; then echo "FATAL" exit 0 fi #if fits file not exists -> fatal if [ ! -f $1 ]; then echo "FATAL" exit 0 fi #Check with fits verify res=$($VERIFY_TOOL $1 2>&1) #if fitsverify return fatal error -> wait fatal=$(echo $res | grep "$FATAL_ERROR" | wc | awk '{print $1}') if [ "$fatal" -ge "1" ]; then echo "WAIT" exit 0 fi #if fitsverify return end of file -> wait eof=$(echo $res | grep "$EOF_ERROR" | wc | awk '{print $1}') if [ "$eof" -ge "1" ]; then echo "WAIT" exit 0 fi #else -> ok echo "OK" exit 0 fi #----------------------------------------------------------------------- src/EventThread.cpp +104 −5 Original line number Diff line number Diff line Loading @@ -16,9 +16,9 @@ namespace PreProcessor_ns //============================================================================== // EventThread::EventThread() //============================================================================== EventThread::EventThread(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_sp) : Tango::LogAdapter(deviceImpl_p), m_configuration_sp(configuration_sp) EventThread::EventThread(PreProcessor* preProcessor_p, Configuration::SP configuration_sp) : Tango::LogAdapter(preProcessor_p), m_preProcessor_p(preProcessor_p), m_configuration_sp(configuration_sp) { DEBUG_STREAM << "EventThread::EventThread()" << endl; Loading @@ -44,10 +44,10 @@ EventThread::~EventThread() //============================================================================== // EventThread::create() //============================================================================== EventThread::SP EventThread::create(Tango::DeviceImpl* deviceImpl_p, EventThread::SP EventThread::create(PreProcessor* preProcessor_p, Configuration::SP configuration_sp) { EventThread::SP e_sp(new EventThread(deviceImpl_p, configuration_sp), EventThread::SP e_sp(new EventThread(preProcessor_p, configuration_sp), EventThread::Deleter()); return e_sp; Loading @@ -62,7 +62,11 @@ void EventThread::start() try { initEventBuffer(); initINotify(); initThreadGroup(); } catch(std::exception& ex) { Loading Loading @@ -145,6 +149,101 @@ void EventThread::writeStatus(std::string status) m_status = status; } //============================================================================== // EventThread::initEventBuffer() //============================================================================== void EventThread::initEventBuffer() throw(std::runtime_error) { DEBUG_STREAM << "EventThread::initEventBuffer()" << endl; m_eventBuffer_sp = EventBuffer::create(m_preProcessor_p); std::string watchPath(m_configuration_sp->getWatchPath()); boost::filesystem::path path(watchPath); //Check if watch path exists if(!boost::filesystem::exists(path)) throw std::runtime_error("Watch path \"" + watchPath + "\" does not exist"); //And if it's a directory if(!boost::filesystem::is_directory(path)) throw std::runtime_error("Watch path \"" + watchPath + "\" is not a valid directory"); //All files in watch path are inserted into event buffer boost::filesystem::directory_iterator startIt(path); boost::filesystem::directory_iterator endIt; while(startIt != endIt) { if(boost::filesystem::is_regular_file(startIt->status())) m_eventBuffer_sp->insertNew(startIt->path()); startIt++; } } //============================================================================== // EventThread::initINotify() //============================================================================== void EventThread::initINotify() throw(std::runtime_error) { DEBUG_STREAM << "EventThread::initINotify()" << endl; if((m_fileDescriptor = inotify_init ()) < 0) throw std::runtime_error("INotify initialization error"); std::string watchPath(m_configuration_sp->getWatchPath()); uint32_t iNotifyMask = m_configuration_sp->getINotifyMask(); if((m_watchDescriptor = inotify_add_watch( m_fileDescriptor, watchPath.c_str(), iNotifyMask)) < 0) throw std::runtime_error("INotify add watch error"); int flags; if((flags = fcntl(m_fileDescriptor,F_GETFL,0)) < 0) throw std::runtime_error("File descriptor get flags error"); if(fcntl(m_fileDescriptor, F_SETFL, flags | O_NONBLOCK) < 0) throw std::runtime_error("File descriptor set flags error"); } //============================================================================== // EventThread::initThreadGroup() //============================================================================== void EventThread::initThreadGroup() throw(std::runtime_error) { DEBUG_STREAM << "EventThread::initThreadGroup()" << endl; m_threadGroup_sp.reset(new boost::thread_group); unsigned int workerNumber = m_configuration_sp->getWorkerNumber(); //Create a worker thread and pass all arguments WorkerThread worker(m_preProcessor_p, m_eventBuffer_sp, m_configuration_sp); try { //Add to thread group event thread m_threadGroup_sp->add_thread( new boost::thread(&EventThread::eventLoop, this)); //Add to thread group worker threads for(unsigned int i=0; i<workerNumber; i++) m_threadGroup_sp->add_thread( new boost::thread(&WorkerThread::workerLoop, worker)); } catch(boost::thread_resource_error& ex) { std::stringstream error_stream; error_stream << "InitThreadGroup: " << ex.what(); throw std::runtime_error(error_stream.str()); } } //============================================================================== // EventThread::eventLoop() //============================================================================== Loading src/EventThread.h +10 −4 Original line number Diff line number Diff line Loading @@ -15,7 +15,7 @@ namespace PreProcessor_ns { class FitsImporter; class PreProcessor; class EventThread : public Tango::LogAdapter { Loading @@ -29,7 +29,7 @@ protected: //------------------------------------------------------------------------------ // [Protected] Constructor destructor deleter //------------------------------------------------------------------------------ EventThread(Tango::DeviceImpl*, Configuration::SP); EventThread(PreProcessor*, Configuration::SP); virtual ~EventThread(); Loading @@ -45,7 +45,7 @@ public: //------------------------------------------------------------------------------ // [Public] Class creation method //------------------------------------------------------------------------------ static EventThread::SP create(Tango::DeviceImpl*, Configuration::SP); static EventThread::SP create(PreProcessor*, Configuration::SP); //------------------------------------------------------------------------------ // [Public] Thread management methods Loading @@ -72,13 +72,19 @@ protected: //------------------------------------------------------------------------------ // [Protected] Utilities methods //------------------------------------------------------------------------------ virtual void initEventBuffer() throw(std::runtime_error); virtual void initINotify() throw(std::runtime_error); virtual void initThreadGroup() throw(std::runtime_error); virtual void eventLoop(); //------------------------------------------------------------------------------ // [Protected] Class variables //------------------------------------------------------------------------------ //Tango server class pointer FitsImporter* m_fitsImporter_p; PreProcessor* m_preProcessor_p; //Boost thread group shared pointer boost::scoped_ptr<boost::thread_group> m_threadGroup_sp; Loading src/PreProcessor.cpp +18 −5 Original line number Diff line number Diff line Loading @@ -153,11 +153,11 @@ void PreProcessor::init_device() m_eventThread_sp = EventThread::create(this, m_configuration_sp); //Start device if auto start enabled // if(autoStart) // { // INFO_STREAM << "FitsImporter::init_device() auto start enabled " << endl; // on(); // } if(autoStart) { INFO_STREAM << "FitsImporter::init_device() auto start enabled " << endl; on(); } } catch(std::exception& ex) { Loading Loading @@ -199,6 +199,7 @@ void PreProcessor::get_device_property() dev_prop.push_back(Tango::DbDatum("SleepTime")); dev_prop.push_back(Tango::DbDatum("WaitTime")); dev_prop.push_back(Tango::DbDatum("WorkerNumber")); dev_prop.push_back(Tango::DbDatum("AutoStart")); // is there at least one property to be read ? if (dev_prop.size()>0) Loading Loading @@ -289,6 +290,18 @@ void PreProcessor::get_device_property() } // And try to extract WorkerNumber value from database if (dev_prop[i].is_empty()==false) dev_prop[i] >> workerNumber; // Try to initialize AutoStart from class property cl_prop = ds_class->get_class_property(dev_prop[++i].name); if (cl_prop.is_empty()==false) cl_prop >> autoStart; else { // Try to initialize AutoStart from default device value def_prop = ds_class->get_default_device_property(dev_prop[i].name); if (def_prop.is_empty()==false) def_prop >> autoStart; } // And try to extract AutoStart value from database if (dev_prop[i].is_empty()==false) dev_prop[i] >> autoStart; } /*----- PROTECTED REGION ID(PreProcessor::get_device_property_after) ENABLED START -----*/ Loading src/PreProcessor.h +2 −0 Original line number Diff line number Diff line Loading @@ -103,6 +103,8 @@ public: Tango::DevUShort waitTime; // WorkerNumber: Tango::DevUShort workerNumber; // AutoStart: Exec On command after init if state is not fault Tango::DevBoolean autoStart; // Constructors and destructors Loading Loading
script/script.sh 0 → 100755 +70 −0 Original line number Diff line number Diff line #!/bin/bash #----------------------------------------------------------------------- # User parameters #----------------------------------------------------------------------- #Verify tool path VERIFY_TOOL="/home/mdm/workspace/nexecs/test/tools/fitsverify" CHECK_STRING="conform to the FITS format" NO_FILE_ERROR="failed to find or open the following file" FATAL_ERROR="Fatal" EOF_ERROR="End-of-file" #----------------------------------------------------------------------- # Verify script #----------------------------------------------------------------------- if [ "$1" == "CHECK" ]; then res=$($VERIFY_TOOL 2>&1) check=$(echo $res | grep "$CHECK_STRING" | wc | awk '{print $1}') if [ "$check" -ge "1" ]; then echo "CHECK OK" else echo "NOT OK" fi exit 0 else #Check regular expression -> fatal file=$1 file_name=${file##*/} if [[ ! "${file_name,,}" =~ ^[^\.].*\.(fits|fts).*$ ]]; then echo "FATAL" exit 0 fi #if fits verify tools exists -> fatal if [ ! -x $VERIFY_TOOL ]; then echo "FATAL" exit 0 fi #if fits file not exists -> fatal if [ ! -f $1 ]; then echo "FATAL" exit 0 fi #Check with fits verify res=$($VERIFY_TOOL $1 2>&1) #if fitsverify return fatal error -> wait fatal=$(echo $res | grep "$FATAL_ERROR" | wc | awk '{print $1}') if [ "$fatal" -ge "1" ]; then echo "WAIT" exit 0 fi #if fitsverify return end of file -> wait eof=$(echo $res | grep "$EOF_ERROR" | wc | awk '{print $1}') if [ "$eof" -ge "1" ]; then echo "WAIT" exit 0 fi #else -> ok echo "OK" exit 0 fi #-----------------------------------------------------------------------
src/EventThread.cpp +104 −5 Original line number Diff line number Diff line Loading @@ -16,9 +16,9 @@ namespace PreProcessor_ns //============================================================================== // EventThread::EventThread() //============================================================================== EventThread::EventThread(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_sp) : Tango::LogAdapter(deviceImpl_p), m_configuration_sp(configuration_sp) EventThread::EventThread(PreProcessor* preProcessor_p, Configuration::SP configuration_sp) : Tango::LogAdapter(preProcessor_p), m_preProcessor_p(preProcessor_p), m_configuration_sp(configuration_sp) { DEBUG_STREAM << "EventThread::EventThread()" << endl; Loading @@ -44,10 +44,10 @@ EventThread::~EventThread() //============================================================================== // EventThread::create() //============================================================================== EventThread::SP EventThread::create(Tango::DeviceImpl* deviceImpl_p, EventThread::SP EventThread::create(PreProcessor* preProcessor_p, Configuration::SP configuration_sp) { EventThread::SP e_sp(new EventThread(deviceImpl_p, configuration_sp), EventThread::SP e_sp(new EventThread(preProcessor_p, configuration_sp), EventThread::Deleter()); return e_sp; Loading @@ -62,7 +62,11 @@ void EventThread::start() try { initEventBuffer(); initINotify(); initThreadGroup(); } catch(std::exception& ex) { Loading Loading @@ -145,6 +149,101 @@ void EventThread::writeStatus(std::string status) m_status = status; } //============================================================================== // EventThread::initEventBuffer() //============================================================================== void EventThread::initEventBuffer() throw(std::runtime_error) { DEBUG_STREAM << "EventThread::initEventBuffer()" << endl; m_eventBuffer_sp = EventBuffer::create(m_preProcessor_p); std::string watchPath(m_configuration_sp->getWatchPath()); boost::filesystem::path path(watchPath); //Check if watch path exists if(!boost::filesystem::exists(path)) throw std::runtime_error("Watch path \"" + watchPath + "\" does not exist"); //And if it's a directory if(!boost::filesystem::is_directory(path)) throw std::runtime_error("Watch path \"" + watchPath + "\" is not a valid directory"); //All files in watch path are inserted into event buffer boost::filesystem::directory_iterator startIt(path); boost::filesystem::directory_iterator endIt; while(startIt != endIt) { if(boost::filesystem::is_regular_file(startIt->status())) m_eventBuffer_sp->insertNew(startIt->path()); startIt++; } } //============================================================================== // EventThread::initINotify() //============================================================================== void EventThread::initINotify() throw(std::runtime_error) { DEBUG_STREAM << "EventThread::initINotify()" << endl; if((m_fileDescriptor = inotify_init ()) < 0) throw std::runtime_error("INotify initialization error"); std::string watchPath(m_configuration_sp->getWatchPath()); uint32_t iNotifyMask = m_configuration_sp->getINotifyMask(); if((m_watchDescriptor = inotify_add_watch( m_fileDescriptor, watchPath.c_str(), iNotifyMask)) < 0) throw std::runtime_error("INotify add watch error"); int flags; if((flags = fcntl(m_fileDescriptor,F_GETFL,0)) < 0) throw std::runtime_error("File descriptor get flags error"); if(fcntl(m_fileDescriptor, F_SETFL, flags | O_NONBLOCK) < 0) throw std::runtime_error("File descriptor set flags error"); } //============================================================================== // EventThread::initThreadGroup() //============================================================================== void EventThread::initThreadGroup() throw(std::runtime_error) { DEBUG_STREAM << "EventThread::initThreadGroup()" << endl; m_threadGroup_sp.reset(new boost::thread_group); unsigned int workerNumber = m_configuration_sp->getWorkerNumber(); //Create a worker thread and pass all arguments WorkerThread worker(m_preProcessor_p, m_eventBuffer_sp, m_configuration_sp); try { //Add to thread group event thread m_threadGroup_sp->add_thread( new boost::thread(&EventThread::eventLoop, this)); //Add to thread group worker threads for(unsigned int i=0; i<workerNumber; i++) m_threadGroup_sp->add_thread( new boost::thread(&WorkerThread::workerLoop, worker)); } catch(boost::thread_resource_error& ex) { std::stringstream error_stream; error_stream << "InitThreadGroup: " << ex.what(); throw std::runtime_error(error_stream.str()); } } //============================================================================== // EventThread::eventLoop() //============================================================================== Loading
src/EventThread.h +10 −4 Original line number Diff line number Diff line Loading @@ -15,7 +15,7 @@ namespace PreProcessor_ns { class FitsImporter; class PreProcessor; class EventThread : public Tango::LogAdapter { Loading @@ -29,7 +29,7 @@ protected: //------------------------------------------------------------------------------ // [Protected] Constructor destructor deleter //------------------------------------------------------------------------------ EventThread(Tango::DeviceImpl*, Configuration::SP); EventThread(PreProcessor*, Configuration::SP); virtual ~EventThread(); Loading @@ -45,7 +45,7 @@ public: //------------------------------------------------------------------------------ // [Public] Class creation method //------------------------------------------------------------------------------ static EventThread::SP create(Tango::DeviceImpl*, Configuration::SP); static EventThread::SP create(PreProcessor*, Configuration::SP); //------------------------------------------------------------------------------ // [Public] Thread management methods Loading @@ -72,13 +72,19 @@ protected: //------------------------------------------------------------------------------ // [Protected] Utilities methods //------------------------------------------------------------------------------ virtual void initEventBuffer() throw(std::runtime_error); virtual void initINotify() throw(std::runtime_error); virtual void initThreadGroup() throw(std::runtime_error); virtual void eventLoop(); //------------------------------------------------------------------------------ // [Protected] Class variables //------------------------------------------------------------------------------ //Tango server class pointer FitsImporter* m_fitsImporter_p; PreProcessor* m_preProcessor_p; //Boost thread group shared pointer boost::scoped_ptr<boost::thread_group> m_threadGroup_sp; Loading
src/PreProcessor.cpp +18 −5 Original line number Diff line number Diff line Loading @@ -153,11 +153,11 @@ void PreProcessor::init_device() m_eventThread_sp = EventThread::create(this, m_configuration_sp); //Start device if auto start enabled // if(autoStart) // { // INFO_STREAM << "FitsImporter::init_device() auto start enabled " << endl; // on(); // } if(autoStart) { INFO_STREAM << "FitsImporter::init_device() auto start enabled " << endl; on(); } } catch(std::exception& ex) { Loading Loading @@ -199,6 +199,7 @@ void PreProcessor::get_device_property() dev_prop.push_back(Tango::DbDatum("SleepTime")); dev_prop.push_back(Tango::DbDatum("WaitTime")); dev_prop.push_back(Tango::DbDatum("WorkerNumber")); dev_prop.push_back(Tango::DbDatum("AutoStart")); // is there at least one property to be read ? if (dev_prop.size()>0) Loading Loading @@ -289,6 +290,18 @@ void PreProcessor::get_device_property() } // And try to extract WorkerNumber value from database if (dev_prop[i].is_empty()==false) dev_prop[i] >> workerNumber; // Try to initialize AutoStart from class property cl_prop = ds_class->get_class_property(dev_prop[++i].name); if (cl_prop.is_empty()==false) cl_prop >> autoStart; else { // Try to initialize AutoStart from default device value def_prop = ds_class->get_default_device_property(dev_prop[i].name); if (def_prop.is_empty()==false) def_prop >> autoStart; } // And try to extract AutoStart value from database if (dev_prop[i].is_empty()==false) dev_prop[i] >> autoStart; } /*----- PROTECTED REGION ID(PreProcessor::get_device_property_after) ENABLED START -----*/ Loading
src/PreProcessor.h +2 −0 Original line number Diff line number Diff line Loading @@ -103,6 +103,8 @@ public: Tango::DevUShort waitTime; // WorkerNumber: Tango::DevUShort workerNumber; // AutoStart: Exec On command after init if state is not fault Tango::DevBoolean autoStart; // Constructors and destructors Loading