Loading src/DMDBImporter.cpp +14 −10 Original line number Diff line number Diff line Loading @@ -276,8 +276,8 @@ Mapping::SPVector DMDBImporter::getMapping(int destinationId) { DEBUG_STREAM << "DMDBImporter::getMapping()" << endl; soci::rowset< MappingTuple > rows = (m_session_sp->prepare << "SELECT id, column_name, column_type, fits_key_pri, fits_key_sec, mandatory " soci::rowset< MappingTuple > rows = (m_session_sp->prepare << "SELECT id," "column_name, column_type, fits_key_hdu, fits_key_pri, fits_key_sec, mandatory " "FROM " << m_mappingTable << " WHERE dest_id=:id", soci::use(destinationId,"id")); MappingTupleVector mappingTupleList; Loading @@ -301,26 +301,30 @@ Mapping::SPVector DMDBImporter::getMapping(int destinationId) std::string columnType = it->get<2>().get(); if(!it->get<3>()) throw soci::soci_error("FitsPrimary not found for id " + mapId); std::string fitsPrimary = it->get<3>().get(); throw soci::soci_error("FitsHDU not found for id " + mapId); int fitsHDU = it->get<3>().get(); if(!it->get<4>()) throw soci::soci_error("FitsSecondary not found for id " + mapId); std::string fitsSecondary = it->get<4>().get(); throw soci::soci_error("FitsPrimary not found for id " + mapId); std::string fitsPrimary = it->get<4>().get(); if(!it->get<5>()) throw soci::soci_error("FitsSecondary not found for id " + mapId); std::string fitsSecondary = it->get<5>().get(); if(!it->get<6>()) throw soci::soci_error("Mandatory not found for id " + mapId); int mandatory = it->get<5>().get(); int mandatory = it->get<6>().get(); #ifdef VERBOSE_DEBUG INFO_STREAM << "MAPPING: " << mapId << " " << columnName << " " << columnType << " " << fitsPrimary << " " << fitsSecondary << " " << mandatory << endl; << columnType << " " << fitsHDU << " " << fitsPrimary << " " << fitsSecondary << " " << mandatory << endl; INFO_STREAM << "-------------------------------------------------" << endl; #endif mapping_spvector.push_back( Mapping::create(columnName, columnType, fitsPrimary, fitsSecondary, mandatory) ); fitsHDU, fitsPrimary, fitsSecondary, mandatory) ); } return mapping_spvector; Loading src/DMDBImporter.h +2 −1 Original line number Diff line number Diff line Loading @@ -100,8 +100,9 @@ protected: // [Protected] Mapping tuple typedef //------------------------------------------------------------------------------ typedef boost::tuple< boost::optional<int>, boost::optional<std::string>, boost::optional<std::string>, boost::optional<int>, boost::optional<std::string>, boost::optional<std::string>, boost::optional<std::string>, boost::optional<int> > MappingTuple; boost::optional<int> > MappingTuple; typedef std::vector< MappingTuple > MappingTupleVector; Loading src/Mapping.h +21 −17 Original line number Diff line number Diff line Loading @@ -21,11 +21,11 @@ private: //------------------------------------------------------------------------------ // [Private] Constructor destructor deleter //------------------------------------------------------------------------------ Mapping(std::string columnName, std::string columnType, std::string fitsPrimary, std::string fitsSecondary, bool mandatory) : m_columnName(columnName), m_columnType(columnType), m_fitsPrimary(fitsPrimary), m_fitsSecondary(fitsSecondary), m_mandatory(mandatory) {} Mapping(std::string columnName, std::string columnType, int fitsHDU, std::string fitsPrimary, std::string fitsSecondary, bool mandatory) : m_columnName(columnName), m_columnType(columnType), m_fitsHDU(fitsHDU), m_fitsPrimary(fitsPrimary), m_fitsSecondary(fitsSecondary), m_mandatory(mandatory) {} ~Mapping() {} class Deleter; Loading @@ -40,17 +40,18 @@ public: //------------------------------------------------------------------------------ // [Public] Users methods //------------------------------------------------------------------------------ static Mapping::SP create(std::string columnName, std::string columnType, std::string fitsPrimary, static Mapping::SP create(std::string columnName, std::string columnType, int fitsHDU, std::string fitsPrimary, std::string fitsSecondary, bool mandatory) { Mapping::SP m_sp( new Mapping( columnName, columnType, fitsPrimary, fitsSecondary, mandatory), Mapping::Deleter()); Mapping::SP m_sp( new Mapping( columnName, columnType, fitsHDU, fitsPrimary, fitsSecondary, mandatory), Mapping::Deleter()); return m_sp; } std::string getColumnName() const { return m_columnName; } std::string getColumnType() const { return m_columnType; } int getFitsHDU() const { return m_fitsHDU; } std::string getFitsPrimary() const { return m_fitsPrimary; } std::string getFitsSecondary() const { return m_fitsSecondary; } bool isMandatory() const { return m_mandatory; } Loading @@ -65,6 +66,9 @@ private: //Metadata table column type const std::string m_columnType; //Metadata fits primary and secondary keys HDU const int m_fitsHDU; //Metadata fits primary key to be stored const std::string m_fitsPrimary; Loading src/WorkerThread.cpp +195 −88 Original line number Diff line number Diff line Loading @@ -74,10 +74,6 @@ void WorkerThread::workerLoop() boost::shared_ptr<CCfits::FITS> fitsFile_sp( new CCfits::FITS(origPath.string(), CCfits::Write)); //Read all key in primary HDU CCfits::PHDU& phdu = fitsFile_sp->pHDU(); phdu.readAllKeys(); try { //Try to ingest using an instrument in list Loading Loading @@ -154,6 +150,7 @@ void WorkerThread::ingestUsingInstrumentList(boost::filesystem::path& origPath, DEBUG_STREAM << "WorkerThread::ingestUsingInstrumentList()" << endl; CCfits::PHDU& phdu = fitsFile_sp->pHDU(); phdu.readAllKeys(); Instrument::SP instrument_sp; boost::gregorian::date date; Loading Loading @@ -259,7 +256,7 @@ void WorkerThread::processFits(boost::filesystem::path& origPath, } //Insert metadata into database execQuery(session_sp, destination_sp, date, duplicateMax, origPath, phdu); execQuery(session_sp, destination_sp, date, duplicateMax, origPath, fitsFile_sp); //Archive file to storage moveFile(origPath,destination_sp, date, duplicateMax); Loading Loading @@ -312,15 +309,12 @@ int WorkerThread::findDuplicateMax(ConnectionManager::SessionSP // WorkerThread::execQuery() //============================================================================== void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, Destination::SP destination_sp, boost::gregorian::date& date, int duplicateMax, boost::filesystem::path& origPath, CCfits::PHDU& phdu) Destination::SP destination_sp, boost::gregorian::date& date, int duplicateMax, boost::filesystem::path& origPath, boost::shared_ptr<CCfits::FITS> fitsFile_sp) throw(CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::execQuery()" << endl; //Get mapping for a specific instrument Mapping::SPVector mapping_spvector = destination_sp->getMappingSPVector(); //Insert part of the query std::stringstream insertQuery; insertQuery << "INSERT INTO " << destination_sp->getSchema() << "." Loading @@ -337,8 +331,8 @@ void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, //Append file path column to query insertQuery << "," << destination_sp->getTable() << ".file_path"; int month = date.month(); valuesQuery << ",\'/" << destination_sp->getDirName() << "/" << date.year() << "/" << month << "/" << date.day() << "\'"; valuesQuery << ",\'/" << date.year() << "/" << month << "/" << date.day() << "/" << destination_sp->getDirName() << "\'"; //Append file version column to query insertQuery << "," << destination_sp->getTable() << ".file_version"; Loading @@ -349,16 +343,55 @@ void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, valuesQuery << ",\'" << origPath.filename().string() << ".gz\'"; //Search key from mapping and fill query fillQueryFromFits(insertQuery, valuesQuery, destination_sp, fitsFile_sp); //Merge insert and values queries insertQuery << ") " << valuesQuery.rdbuf() << ")"; DEBUG_STREAM << "WorkerThread::execQuery() query " << insertQuery.str() << endl; //Exec query *session_sp << insertQuery.str(); } //============================================================================== // WorkerThread::fillQueryFromFits() //============================================================================== void WorkerThread::fillQueryFromFits(std::stringstream& insertQuery, std::stringstream& valuesQuery, Destination::SP destination_sp, boost::shared_ptr<CCfits::FITS> fitsFile_sp) throw(CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::fillQueryFromFits()" << endl; //Get mapping for a specific instrument Mapping::SPVector mapping_spvector = destination_sp->getMappingSPVector(); //Loop mapping to append metadata columns to query Mapping::SPVector::iterator it; for(it=mapping_spvector.begin(); it!=mapping_spvector.end(); it++) { int fitsHDU = (*it)->getFitsHDU(); std::string value; try { //Search for primary fits key value = readKey(phdu, (*it)->getFitsPrimary(), (*it)->getColumnType()); try { if(fitsHDU == 0) { //Read primary key from PHDU value = readKeyFromPHDU(fitsFile_sp->pHDU(), (*it)->getFitsPrimary(), (*it)->getColumnType()); } else { //Read primary key from extension HDU value = readKeyFromHDU(fitsFile_sp->extension(fitsHDU), (*it)->getFitsPrimary(), (*it)->getColumnType()); } insertQuery << "," << destination_sp->getTable() << "." << (*it)->getColumnName(); Loading @@ -366,12 +399,22 @@ void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, } catch(CCfits::HDU::NoSuchKeyword& ex) { WARN_STREAM << "WorkerThread::execQuery() exception: " << ex.message() << endl; WARN_STREAM << "WorkerThread::execQuery() " << ex.message() << endl; try { //Search for secondary fits key (alias) value = readKey(phdu, (*it)->getFitsSecondary(), (*it)->getColumnType()); if(fitsHDU == 0) { //Read secondary key from PHDU value = readKeyFromPHDU(fitsFile_sp->pHDU(), (*it)->getFitsSecondary(), (*it)->getColumnType()); } else { //Read secondary key from extension HDU value = readKeyFromHDU(fitsFile_sp->extension(fitsHDU), (*it)->getFitsSecondary(), (*it)->getColumnType()); } insertQuery << "," << destination_sp->getTable() << "." << (*it)->getColumnName(); Loading @@ -379,7 +422,7 @@ void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, } catch(CCfits::HDU::NoSuchKeyword& ex) { WARN_STREAM << "WorkerThread::execQuery() exception: " << ex.message() << endl; WARN_STREAM << "WorkerThread::execQuery() " << ex.message() << endl; //If key is mandatory throw and exception if((*it)->isMandatory()) Loading @@ -387,32 +430,35 @@ void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, } } } catch(CCfits::FITS::NoSuchHDU& ex) { WARN_STREAM << "WorkerThread::execQuery() " << ex.message() << endl; //Merge insert and values queries insertQuery << ") " << valuesQuery.rdbuf() << ")"; DEBUG_STREAM << "WorkerThread::execQuery() query " << insertQuery.str() << endl; //Exec query *session_sp << insertQuery.str(); //If key is mandatory throw and exception if((*it)->isMandatory()) throw ex; } } } //============================================================================== // WorkerThread::readKey() // WorkerThread::readKeyFromPHDU() //============================================================================== std::string WorkerThread::readKey(CCfits::PHDU& phdu, std::string keyName, std::string dMDBKeyType) throw(CCfits::FitsException, std::runtime_error) std::string WorkerThread::readKeyFromPHDU(CCfits::PHDU& phdu, std::string keyName, std::string keyType) throw(CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::readKey()" << endl; DEBUG_STREAM << "WorkerThread::readKeyFromPHDU()" << endl; phdu.readAllKeys(); std::stringstream result_stream; std::stringstream resultStream; boost::regex pattern("^(NAXIS)([0-9]+)$", boost::regex::icase); boost::smatch matches; if(keyName.compare("NAXIS")==0 || keyName.compare("naxis")==0) { result_stream << phdu.axes(); resultStream << phdu.axes(); } else if(boost::regex_match(keyName, matches, pattern)) { Loading @@ -425,7 +471,7 @@ std::string WorkerThread::readKey(CCfits::PHDU& phdu, std::string keyName, if(axisReq<0 || axisReq>=axisMax) throw CCfits::HDU::NoSuchKeyword("Invalid naxis number"); result_stream << phdu.axis(axisReq); resultStream << phdu.axis(axisReq); } catch(boost::bad_lexical_cast& ex) { Loading @@ -434,53 +480,115 @@ std::string WorkerThread::readKey(CCfits::PHDU& phdu, std::string keyName, } else if(keyName.compare("BITPIX")==0 || keyName.compare("bitpix")==0) { result_stream << phdu.bitpix(); resultStream << phdu.bitpix(); } else if(keyName.compare("BSCALE")==0 || keyName.compare("bscale")==0) { result_stream << phdu.scale(); resultStream << phdu.scale(); } else if(keyName.compare("BZERO")==0 || keyName.compare("bzero")==0) { result_stream << phdu.zero(); resultStream << phdu.zero(); } else if(keyName.compare("EXTEND")==0 || keyName.compare("extend")==0) { result_stream << phdu.extend(); resultStream << phdu.extend(); } else if(keyType.compare("varchar")==0 || keyType.compare("char")==0) { resultStream << "\'" << readKey(phdu, keyName, keyType) << "\'"; } else if(keyType.compare("date")==0 || keyType.compare("datetime")==0 || keyType.compare("time")==0 || keyType.compare("timestamp")==0) { resultStream << "\'" << readKey(phdu, keyName, keyType) << "\'"; } else { resultStream << readKey(phdu, keyName, keyType); } return resultStream.str(); } //============================================================================== // WorkerThread::readKeyFromHDU() //============================================================================== std::string WorkerThread::readKeyFromHDU(CCfits::HDU& hdu, std::string keyName, std::string keyType) throw(CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::readKeyFromHDU()" << endl; hdu.readAllKeys(); std::stringstream resultStream; boost::regex pattern("^(NAXIS)([0-9]+)$", boost::regex::icase); boost::smatch matches; if(keyName.compare("NAXIS")==0 || keyName.compare("naxis")==0) { resultStream << hdu.axes(); } else if(dMDBKeyType.compare("varchar")==0 || dMDBKeyType.compare("char")==0) else if(boost::regex_match(keyName, matches, pattern)) { long axisMax = hdu.axes(); try { long axisReq = boost::lexical_cast<long>(matches[2]) - 1; if(axisReq<0 || axisReq>=axisMax) throw std::runtime_error("Invalid naxis number"); resultStream << hdu.axis(axisReq); } catch(boost::bad_lexical_cast& ex) { throw std::runtime_error("Exception parsing naxis number"); } } else if(keyName.compare("BITPIX")==0 || keyName.compare("bitpix")==0) { result_stream << "\'" << extractKey(phdu, keyName, dMDBKeyType) << "\'"; resultStream << hdu.bitpix(); } else if(dMDBKeyType.compare("date")==0 || dMDBKeyType.compare("datetime")==0 || dMDBKeyType.compare("time")==0 || dMDBKeyType.compare("timestamp")==0) else if(keyName.compare("BSCALE")==0 || keyName.compare("bscale")==0) { resultStream << hdu.scale(); } else if(keyName.compare("BZERO")==0 || keyName.compare("bzero")==0) { result_stream << "\'" << extractKey(phdu, keyName, dMDBKeyType) << "\'"; resultStream << hdu.zero(); } else if(keyType.compare("varchar")==0 || keyType.compare("char")==0) { resultStream << "\'" << readKey(hdu, keyName, keyType) << "\'"; } else if(keyType.compare("date")==0 || keyType.compare("datetime")==0 || keyType.compare("time")==0 || keyType.compare("timestamp")==0) { resultStream << "\'" << readKey(hdu, keyName, keyType) << "\'"; } else { result_stream << extractKey(phdu, keyName, dMDBKeyType); resultStream << readKey(hdu, keyName, keyType); } return result_stream.str(); return resultStream.str(); } //============================================================================== // WorkerThread::extractKey() // WorkerThread::readKey() //============================================================================== std::string WorkerThread::extractKey(CCfits::PHDU& phdu, std::string keyName, std::string dMDBKeyType) throw(CCfits::FitsException, std::runtime_error) std::string WorkerThread::readKey(CCfits::HDU& hdu, std::string keyName, std::string keyType) throw(CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::extractKey()" << endl; DEBUG_STREAM << "WorkerThread::readKey()" << endl; int fitsKeyType = phdu.keyWord(keyName).keytype(); int fitsKeyType = hdu.keyWord(keyName).keytype(); #ifdef VERBOSE_DEBUG INFO_STREAM << "KEY: " << keyName << " " << fitsKeyType << " -> " << dMDBKeyType << endl; INFO_STREAM << "KEY: " << keyName << " " << fitsKeyType << " -> " << keyType << endl; INFO_STREAM << "-------------------------------------------------" << endl; #endif Loading @@ -491,49 +599,49 @@ std::string WorkerThread::extractKey(CCfits::PHDU& phdu, std::string keyName, case TSTRING: { std::string value; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return value; } case TLOGICAL: { bool value; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); } case TINT: case TSHORT: case TSBYTE: { int value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); } case TUINT: case TUSHORT: case TBIT: case TBYTE: { unsigned int value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); } case TULONG: { unsigned long value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); } case TLONG: { long value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); } case TLONGLONG: { long long value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); } case TDOUBLE: case TFLOAT: { double value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); } case TCOMPLEX: case TDBLCOMPLEX: Loading Loading @@ -572,9 +680,8 @@ void WorkerThread::moveFile(boost::filesystem::path& origPath, //Create directory path, date and version part of destination path std::stringstream relPathStream; int month = date.month(); relPathStream << "/" << destination_sp->getDirName() << "/" << date.year() << "/" << month << "/" << date.day() << "/" << duplicateMax; relPathStream << "/" << date.year() << "/" << month << "/" << date.day() << "/" << destination_sp->getDirName() << "/" << duplicateMax; boost::filesystem::path relPath( relPathStream.str() ); //Append date path to full destination path Loading src/WorkerThread.h +13 −5 Original line number Diff line number Diff line Loading @@ -62,12 +62,20 @@ protected: virtual void execQuery(ConnectionManager::SessionSP, Destination::SP, boost::gregorian::date&, int, boost::filesystem::path&, CCfits::PHDU&) throw(CCfits::FitsException, std::runtime_error); boost::shared_ptr<CCfits::FITS>) throw(CCfits::FitsException, std::runtime_error); virtual void fillQueryFromFits(std::stringstream&, std::stringstream&, Destination::SP, boost::shared_ptr<CCfits::FITS>) throw(CCfits::FitsException, std::runtime_error); virtual std::string readKeyFromPHDU(CCfits::PHDU&, std::string, std::string) throw(CCfits::FitsException, std::runtime_error); virtual std::string readKey(CCfits::PHDU&, std::string, std::string) virtual std::string readKeyFromHDU(CCfits::HDU&, std::string, std::string) throw(CCfits::FitsException, std::runtime_error); virtual std::string extractKey(CCfits::PHDU&, std::string, std::string) virtual std::string readKey(CCfits::HDU&, std::string, std::string) throw(CCfits::FitsException, std::runtime_error); virtual void moveFile(boost::filesystem::path&, Destination::SP, Loading Loading
src/DMDBImporter.cpp +14 −10 Original line number Diff line number Diff line Loading @@ -276,8 +276,8 @@ Mapping::SPVector DMDBImporter::getMapping(int destinationId) { DEBUG_STREAM << "DMDBImporter::getMapping()" << endl; soci::rowset< MappingTuple > rows = (m_session_sp->prepare << "SELECT id, column_name, column_type, fits_key_pri, fits_key_sec, mandatory " soci::rowset< MappingTuple > rows = (m_session_sp->prepare << "SELECT id," "column_name, column_type, fits_key_hdu, fits_key_pri, fits_key_sec, mandatory " "FROM " << m_mappingTable << " WHERE dest_id=:id", soci::use(destinationId,"id")); MappingTupleVector mappingTupleList; Loading @@ -301,26 +301,30 @@ Mapping::SPVector DMDBImporter::getMapping(int destinationId) std::string columnType = it->get<2>().get(); if(!it->get<3>()) throw soci::soci_error("FitsPrimary not found for id " + mapId); std::string fitsPrimary = it->get<3>().get(); throw soci::soci_error("FitsHDU not found for id " + mapId); int fitsHDU = it->get<3>().get(); if(!it->get<4>()) throw soci::soci_error("FitsSecondary not found for id " + mapId); std::string fitsSecondary = it->get<4>().get(); throw soci::soci_error("FitsPrimary not found for id " + mapId); std::string fitsPrimary = it->get<4>().get(); if(!it->get<5>()) throw soci::soci_error("FitsSecondary not found for id " + mapId); std::string fitsSecondary = it->get<5>().get(); if(!it->get<6>()) throw soci::soci_error("Mandatory not found for id " + mapId); int mandatory = it->get<5>().get(); int mandatory = it->get<6>().get(); #ifdef VERBOSE_DEBUG INFO_STREAM << "MAPPING: " << mapId << " " << columnName << " " << columnType << " " << fitsPrimary << " " << fitsSecondary << " " << mandatory << endl; << columnType << " " << fitsHDU << " " << fitsPrimary << " " << fitsSecondary << " " << mandatory << endl; INFO_STREAM << "-------------------------------------------------" << endl; #endif mapping_spvector.push_back( Mapping::create(columnName, columnType, fitsPrimary, fitsSecondary, mandatory) ); fitsHDU, fitsPrimary, fitsSecondary, mandatory) ); } return mapping_spvector; Loading
src/DMDBImporter.h +2 −1 Original line number Diff line number Diff line Loading @@ -100,8 +100,9 @@ protected: // [Protected] Mapping tuple typedef //------------------------------------------------------------------------------ typedef boost::tuple< boost::optional<int>, boost::optional<std::string>, boost::optional<std::string>, boost::optional<int>, boost::optional<std::string>, boost::optional<std::string>, boost::optional<std::string>, boost::optional<int> > MappingTuple; boost::optional<int> > MappingTuple; typedef std::vector< MappingTuple > MappingTupleVector; Loading
src/Mapping.h +21 −17 Original line number Diff line number Diff line Loading @@ -21,11 +21,11 @@ private: //------------------------------------------------------------------------------ // [Private] Constructor destructor deleter //------------------------------------------------------------------------------ Mapping(std::string columnName, std::string columnType, std::string fitsPrimary, std::string fitsSecondary, bool mandatory) : m_columnName(columnName), m_columnType(columnType), m_fitsPrimary(fitsPrimary), m_fitsSecondary(fitsSecondary), m_mandatory(mandatory) {} Mapping(std::string columnName, std::string columnType, int fitsHDU, std::string fitsPrimary, std::string fitsSecondary, bool mandatory) : m_columnName(columnName), m_columnType(columnType), m_fitsHDU(fitsHDU), m_fitsPrimary(fitsPrimary), m_fitsSecondary(fitsSecondary), m_mandatory(mandatory) {} ~Mapping() {} class Deleter; Loading @@ -40,17 +40,18 @@ public: //------------------------------------------------------------------------------ // [Public] Users methods //------------------------------------------------------------------------------ static Mapping::SP create(std::string columnName, std::string columnType, std::string fitsPrimary, static Mapping::SP create(std::string columnName, std::string columnType, int fitsHDU, std::string fitsPrimary, std::string fitsSecondary, bool mandatory) { Mapping::SP m_sp( new Mapping( columnName, columnType, fitsPrimary, fitsSecondary, mandatory), Mapping::Deleter()); Mapping::SP m_sp( new Mapping( columnName, columnType, fitsHDU, fitsPrimary, fitsSecondary, mandatory), Mapping::Deleter()); return m_sp; } std::string getColumnName() const { return m_columnName; } std::string getColumnType() const { return m_columnType; } int getFitsHDU() const { return m_fitsHDU; } std::string getFitsPrimary() const { return m_fitsPrimary; } std::string getFitsSecondary() const { return m_fitsSecondary; } bool isMandatory() const { return m_mandatory; } Loading @@ -65,6 +66,9 @@ private: //Metadata table column type const std::string m_columnType; //Metadata fits primary and secondary keys HDU const int m_fitsHDU; //Metadata fits primary key to be stored const std::string m_fitsPrimary; Loading
src/WorkerThread.cpp +195 −88 Original line number Diff line number Diff line Loading @@ -74,10 +74,6 @@ void WorkerThread::workerLoop() boost::shared_ptr<CCfits::FITS> fitsFile_sp( new CCfits::FITS(origPath.string(), CCfits::Write)); //Read all key in primary HDU CCfits::PHDU& phdu = fitsFile_sp->pHDU(); phdu.readAllKeys(); try { //Try to ingest using an instrument in list Loading Loading @@ -154,6 +150,7 @@ void WorkerThread::ingestUsingInstrumentList(boost::filesystem::path& origPath, DEBUG_STREAM << "WorkerThread::ingestUsingInstrumentList()" << endl; CCfits::PHDU& phdu = fitsFile_sp->pHDU(); phdu.readAllKeys(); Instrument::SP instrument_sp; boost::gregorian::date date; Loading Loading @@ -259,7 +256,7 @@ void WorkerThread::processFits(boost::filesystem::path& origPath, } //Insert metadata into database execQuery(session_sp, destination_sp, date, duplicateMax, origPath, phdu); execQuery(session_sp, destination_sp, date, duplicateMax, origPath, fitsFile_sp); //Archive file to storage moveFile(origPath,destination_sp, date, duplicateMax); Loading Loading @@ -312,15 +309,12 @@ int WorkerThread::findDuplicateMax(ConnectionManager::SessionSP // WorkerThread::execQuery() //============================================================================== void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, Destination::SP destination_sp, boost::gregorian::date& date, int duplicateMax, boost::filesystem::path& origPath, CCfits::PHDU& phdu) Destination::SP destination_sp, boost::gregorian::date& date, int duplicateMax, boost::filesystem::path& origPath, boost::shared_ptr<CCfits::FITS> fitsFile_sp) throw(CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::execQuery()" << endl; //Get mapping for a specific instrument Mapping::SPVector mapping_spvector = destination_sp->getMappingSPVector(); //Insert part of the query std::stringstream insertQuery; insertQuery << "INSERT INTO " << destination_sp->getSchema() << "." Loading @@ -337,8 +331,8 @@ void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, //Append file path column to query insertQuery << "," << destination_sp->getTable() << ".file_path"; int month = date.month(); valuesQuery << ",\'/" << destination_sp->getDirName() << "/" << date.year() << "/" << month << "/" << date.day() << "\'"; valuesQuery << ",\'/" << date.year() << "/" << month << "/" << date.day() << "/" << destination_sp->getDirName() << "\'"; //Append file version column to query insertQuery << "," << destination_sp->getTable() << ".file_version"; Loading @@ -349,16 +343,55 @@ void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, valuesQuery << ",\'" << origPath.filename().string() << ".gz\'"; //Search key from mapping and fill query fillQueryFromFits(insertQuery, valuesQuery, destination_sp, fitsFile_sp); //Merge insert and values queries insertQuery << ") " << valuesQuery.rdbuf() << ")"; DEBUG_STREAM << "WorkerThread::execQuery() query " << insertQuery.str() << endl; //Exec query *session_sp << insertQuery.str(); } //============================================================================== // WorkerThread::fillQueryFromFits() //============================================================================== void WorkerThread::fillQueryFromFits(std::stringstream& insertQuery, std::stringstream& valuesQuery, Destination::SP destination_sp, boost::shared_ptr<CCfits::FITS> fitsFile_sp) throw(CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::fillQueryFromFits()" << endl; //Get mapping for a specific instrument Mapping::SPVector mapping_spvector = destination_sp->getMappingSPVector(); //Loop mapping to append metadata columns to query Mapping::SPVector::iterator it; for(it=mapping_spvector.begin(); it!=mapping_spvector.end(); it++) { int fitsHDU = (*it)->getFitsHDU(); std::string value; try { //Search for primary fits key value = readKey(phdu, (*it)->getFitsPrimary(), (*it)->getColumnType()); try { if(fitsHDU == 0) { //Read primary key from PHDU value = readKeyFromPHDU(fitsFile_sp->pHDU(), (*it)->getFitsPrimary(), (*it)->getColumnType()); } else { //Read primary key from extension HDU value = readKeyFromHDU(fitsFile_sp->extension(fitsHDU), (*it)->getFitsPrimary(), (*it)->getColumnType()); } insertQuery << "," << destination_sp->getTable() << "." << (*it)->getColumnName(); Loading @@ -366,12 +399,22 @@ void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, } catch(CCfits::HDU::NoSuchKeyword& ex) { WARN_STREAM << "WorkerThread::execQuery() exception: " << ex.message() << endl; WARN_STREAM << "WorkerThread::execQuery() " << ex.message() << endl; try { //Search for secondary fits key (alias) value = readKey(phdu, (*it)->getFitsSecondary(), (*it)->getColumnType()); if(fitsHDU == 0) { //Read secondary key from PHDU value = readKeyFromPHDU(fitsFile_sp->pHDU(), (*it)->getFitsSecondary(), (*it)->getColumnType()); } else { //Read secondary key from extension HDU value = readKeyFromHDU(fitsFile_sp->extension(fitsHDU), (*it)->getFitsSecondary(), (*it)->getColumnType()); } insertQuery << "," << destination_sp->getTable() << "." << (*it)->getColumnName(); Loading @@ -379,7 +422,7 @@ void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, } catch(CCfits::HDU::NoSuchKeyword& ex) { WARN_STREAM << "WorkerThread::execQuery() exception: " << ex.message() << endl; WARN_STREAM << "WorkerThread::execQuery() " << ex.message() << endl; //If key is mandatory throw and exception if((*it)->isMandatory()) Loading @@ -387,32 +430,35 @@ void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, } } } catch(CCfits::FITS::NoSuchHDU& ex) { WARN_STREAM << "WorkerThread::execQuery() " << ex.message() << endl; //Merge insert and values queries insertQuery << ") " << valuesQuery.rdbuf() << ")"; DEBUG_STREAM << "WorkerThread::execQuery() query " << insertQuery.str() << endl; //Exec query *session_sp << insertQuery.str(); //If key is mandatory throw and exception if((*it)->isMandatory()) throw ex; } } } //============================================================================== // WorkerThread::readKey() // WorkerThread::readKeyFromPHDU() //============================================================================== std::string WorkerThread::readKey(CCfits::PHDU& phdu, std::string keyName, std::string dMDBKeyType) throw(CCfits::FitsException, std::runtime_error) std::string WorkerThread::readKeyFromPHDU(CCfits::PHDU& phdu, std::string keyName, std::string keyType) throw(CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::readKey()" << endl; DEBUG_STREAM << "WorkerThread::readKeyFromPHDU()" << endl; phdu.readAllKeys(); std::stringstream result_stream; std::stringstream resultStream; boost::regex pattern("^(NAXIS)([0-9]+)$", boost::regex::icase); boost::smatch matches; if(keyName.compare("NAXIS")==0 || keyName.compare("naxis")==0) { result_stream << phdu.axes(); resultStream << phdu.axes(); } else if(boost::regex_match(keyName, matches, pattern)) { Loading @@ -425,7 +471,7 @@ std::string WorkerThread::readKey(CCfits::PHDU& phdu, std::string keyName, if(axisReq<0 || axisReq>=axisMax) throw CCfits::HDU::NoSuchKeyword("Invalid naxis number"); result_stream << phdu.axis(axisReq); resultStream << phdu.axis(axisReq); } catch(boost::bad_lexical_cast& ex) { Loading @@ -434,53 +480,115 @@ std::string WorkerThread::readKey(CCfits::PHDU& phdu, std::string keyName, } else if(keyName.compare("BITPIX")==0 || keyName.compare("bitpix")==0) { result_stream << phdu.bitpix(); resultStream << phdu.bitpix(); } else if(keyName.compare("BSCALE")==0 || keyName.compare("bscale")==0) { result_stream << phdu.scale(); resultStream << phdu.scale(); } else if(keyName.compare("BZERO")==0 || keyName.compare("bzero")==0) { result_stream << phdu.zero(); resultStream << phdu.zero(); } else if(keyName.compare("EXTEND")==0 || keyName.compare("extend")==0) { result_stream << phdu.extend(); resultStream << phdu.extend(); } else if(keyType.compare("varchar")==0 || keyType.compare("char")==0) { resultStream << "\'" << readKey(phdu, keyName, keyType) << "\'"; } else if(keyType.compare("date")==0 || keyType.compare("datetime")==0 || keyType.compare("time")==0 || keyType.compare("timestamp")==0) { resultStream << "\'" << readKey(phdu, keyName, keyType) << "\'"; } else { resultStream << readKey(phdu, keyName, keyType); } return resultStream.str(); } //============================================================================== // WorkerThread::readKeyFromHDU() //============================================================================== std::string WorkerThread::readKeyFromHDU(CCfits::HDU& hdu, std::string keyName, std::string keyType) throw(CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::readKeyFromHDU()" << endl; hdu.readAllKeys(); std::stringstream resultStream; boost::regex pattern("^(NAXIS)([0-9]+)$", boost::regex::icase); boost::smatch matches; if(keyName.compare("NAXIS")==0 || keyName.compare("naxis")==0) { resultStream << hdu.axes(); } else if(dMDBKeyType.compare("varchar")==0 || dMDBKeyType.compare("char")==0) else if(boost::regex_match(keyName, matches, pattern)) { long axisMax = hdu.axes(); try { long axisReq = boost::lexical_cast<long>(matches[2]) - 1; if(axisReq<0 || axisReq>=axisMax) throw std::runtime_error("Invalid naxis number"); resultStream << hdu.axis(axisReq); } catch(boost::bad_lexical_cast& ex) { throw std::runtime_error("Exception parsing naxis number"); } } else if(keyName.compare("BITPIX")==0 || keyName.compare("bitpix")==0) { result_stream << "\'" << extractKey(phdu, keyName, dMDBKeyType) << "\'"; resultStream << hdu.bitpix(); } else if(dMDBKeyType.compare("date")==0 || dMDBKeyType.compare("datetime")==0 || dMDBKeyType.compare("time")==0 || dMDBKeyType.compare("timestamp")==0) else if(keyName.compare("BSCALE")==0 || keyName.compare("bscale")==0) { resultStream << hdu.scale(); } else if(keyName.compare("BZERO")==0 || keyName.compare("bzero")==0) { result_stream << "\'" << extractKey(phdu, keyName, dMDBKeyType) << "\'"; resultStream << hdu.zero(); } else if(keyType.compare("varchar")==0 || keyType.compare("char")==0) { resultStream << "\'" << readKey(hdu, keyName, keyType) << "\'"; } else if(keyType.compare("date")==0 || keyType.compare("datetime")==0 || keyType.compare("time")==0 || keyType.compare("timestamp")==0) { resultStream << "\'" << readKey(hdu, keyName, keyType) << "\'"; } else { result_stream << extractKey(phdu, keyName, dMDBKeyType); resultStream << readKey(hdu, keyName, keyType); } return result_stream.str(); return resultStream.str(); } //============================================================================== // WorkerThread::extractKey() // WorkerThread::readKey() //============================================================================== std::string WorkerThread::extractKey(CCfits::PHDU& phdu, std::string keyName, std::string dMDBKeyType) throw(CCfits::FitsException, std::runtime_error) std::string WorkerThread::readKey(CCfits::HDU& hdu, std::string keyName, std::string keyType) throw(CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::extractKey()" << endl; DEBUG_STREAM << "WorkerThread::readKey()" << endl; int fitsKeyType = phdu.keyWord(keyName).keytype(); int fitsKeyType = hdu.keyWord(keyName).keytype(); #ifdef VERBOSE_DEBUG INFO_STREAM << "KEY: " << keyName << " " << fitsKeyType << " -> " << dMDBKeyType << endl; INFO_STREAM << "KEY: " << keyName << " " << fitsKeyType << " -> " << keyType << endl; INFO_STREAM << "-------------------------------------------------" << endl; #endif Loading @@ -491,49 +599,49 @@ std::string WorkerThread::extractKey(CCfits::PHDU& phdu, std::string keyName, case TSTRING: { std::string value; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return value; } case TLOGICAL: { bool value; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); } case TINT: case TSHORT: case TSBYTE: { int value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); } case TUINT: case TUSHORT: case TBIT: case TBYTE: { unsigned int value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); } case TULONG: { unsigned long value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); } case TLONG: { long value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); } case TLONGLONG: { long long value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); } case TDOUBLE: case TFLOAT: { double value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); } case TCOMPLEX: case TDBLCOMPLEX: Loading Loading @@ -572,9 +680,8 @@ void WorkerThread::moveFile(boost::filesystem::path& origPath, //Create directory path, date and version part of destination path std::stringstream relPathStream; int month = date.month(); relPathStream << "/" << destination_sp->getDirName() << "/" << date.year() << "/" << month << "/" << date.day() << "/" << duplicateMax; relPathStream << "/" << date.year() << "/" << month << "/" << date.day() << "/" << destination_sp->getDirName() << "/" << duplicateMax; boost::filesystem::path relPath( relPathStream.str() ); //Append date path to full destination path Loading
src/WorkerThread.h +13 −5 Original line number Diff line number Diff line Loading @@ -62,12 +62,20 @@ protected: virtual void execQuery(ConnectionManager::SessionSP, Destination::SP, boost::gregorian::date&, int, boost::filesystem::path&, CCfits::PHDU&) throw(CCfits::FitsException, std::runtime_error); boost::shared_ptr<CCfits::FITS>) throw(CCfits::FitsException, std::runtime_error); virtual void fillQueryFromFits(std::stringstream&, std::stringstream&, Destination::SP, boost::shared_ptr<CCfits::FITS>) throw(CCfits::FitsException, std::runtime_error); virtual std::string readKeyFromPHDU(CCfits::PHDU&, std::string, std::string) throw(CCfits::FitsException, std::runtime_error); virtual std::string readKey(CCfits::PHDU&, std::string, std::string) virtual std::string readKeyFromHDU(CCfits::HDU&, std::string, std::string) throw(CCfits::FitsException, std::runtime_error); virtual std::string extractKey(CCfits::PHDU&, std::string, std::string) virtual std::string readKey(CCfits::HDU&, std::string, std::string) throw(CCfits::FitsException, std::runtime_error); virtual void moveFile(boost::filesystem::path&, Destination::SP, Loading