Loading src/DMDBImporter.cpp +14 −10 Original line number Original line Diff line number Diff line Loading @@ -276,8 +276,8 @@ Mapping::SPVector DMDBImporter::getMapping(int destinationId) { { DEBUG_STREAM << "DMDBImporter::getMapping()" << endl; DEBUG_STREAM << "DMDBImporter::getMapping()" << endl; soci::rowset< MappingTuple > rows = (m_session_sp->prepare << soci::rowset< MappingTuple > rows = (m_session_sp->prepare << "SELECT id," "SELECT id, column_name, column_type, fits_key_pri, fits_key_sec, mandatory " "column_name, column_type, fits_key_hdu, fits_key_pri, fits_key_sec, mandatory " "FROM " << m_mappingTable << " WHERE dest_id=:id", soci::use(destinationId,"id")); "FROM " << m_mappingTable << " WHERE dest_id=:id", soci::use(destinationId,"id")); MappingTupleVector mappingTupleList; MappingTupleVector mappingTupleList; Loading @@ -301,26 +301,30 @@ Mapping::SPVector DMDBImporter::getMapping(int destinationId) std::string columnType = it->get<2>().get(); std::string columnType = it->get<2>().get(); if(!it->get<3>()) if(!it->get<3>()) throw soci::soci_error("FitsPrimary not found for id " + mapId); throw soci::soci_error("FitsHDU not found for id " + mapId); std::string fitsPrimary = it->get<3>().get(); int fitsHDU = it->get<3>().get(); if(!it->get<4>()) if(!it->get<4>()) throw soci::soci_error("FitsSecondary not found for id " + mapId); throw soci::soci_error("FitsPrimary not found for id " + mapId); std::string fitsSecondary = it->get<4>().get(); std::string fitsPrimary = it->get<4>().get(); if(!it->get<5>()) if(!it->get<5>()) throw soci::soci_error("FitsSecondary not found for id " + mapId); std::string fitsSecondary = it->get<5>().get(); if(!it->get<6>()) throw soci::soci_error("Mandatory not found for id " + mapId); throw soci::soci_error("Mandatory not found for id " + mapId); int mandatory = it->get<5>().get(); int mandatory = it->get<6>().get(); #ifdef VERBOSE_DEBUG #ifdef VERBOSE_DEBUG INFO_STREAM << "MAPPING: " << mapId << " " << columnName << " " INFO_STREAM << "MAPPING: " << mapId << " " << columnName << " " << columnType << " " << fitsPrimary << " " << fitsSecondary << " " << columnType << " " << fitsHDU << " " << fitsPrimary << " " << mandatory << endl; << fitsSecondary << " " << mandatory << endl; INFO_STREAM << "-------------------------------------------------" << endl; INFO_STREAM << "-------------------------------------------------" << endl; #endif #endif mapping_spvector.push_back( Mapping::create(columnName, columnType, mapping_spvector.push_back( Mapping::create(columnName, columnType, fitsPrimary, fitsSecondary, mandatory) ); fitsHDU, fitsPrimary, fitsSecondary, mandatory) ); } } return mapping_spvector; return mapping_spvector; Loading src/DMDBImporter.h +2 −1 Original line number Original line Diff line number Diff line Loading @@ -100,8 +100,9 @@ protected: // [Protected] Mapping tuple typedef // [Protected] Mapping tuple typedef //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ typedef boost::tuple< boost::optional<int>, boost::optional<std::string>, typedef boost::tuple< boost::optional<int>, boost::optional<std::string>, boost::optional<std::string>, boost::optional<int>, boost::optional<std::string>, boost::optional<std::string>, boost::optional<std::string>, boost::optional<std::string>, boost::optional<std::string>, boost::optional<int> > MappingTuple; boost::optional<int> > MappingTuple; typedef std::vector< MappingTuple > MappingTupleVector; typedef std::vector< MappingTuple > MappingTupleVector; Loading src/Mapping.h +21 −17 Original line number Original line Diff line number Diff line Loading @@ -21,11 +21,11 @@ private: //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ // [Private] Constructor destructor deleter // [Private] Constructor destructor deleter //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ Mapping(std::string columnName, std::string columnType, Mapping(std::string columnName, std::string columnType, int fitsHDU, std::string fitsPrimary, std::string fitsSecondary, std::string fitsPrimary, std::string fitsSecondary, bool mandatory) : bool mandatory) : m_columnName(columnName), m_columnName(columnName), m_columnType(columnType), m_fitsHDU(fitsHDU), m_columnType(columnType), m_fitsPrimary(fitsPrimary), m_fitsPrimary(fitsPrimary), m_fitsSecondary(fitsSecondary), m_fitsSecondary(fitsSecondary), m_mandatory(mandatory) {} m_mandatory(mandatory) {} ~Mapping() {} ~Mapping() {} class Deleter; class Deleter; Loading @@ -40,17 +40,18 @@ public: //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ // [Public] Users methods // [Public] Users methods //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ static Mapping::SP create(std::string columnName, static Mapping::SP create(std::string columnName, std::string columnType, std::string columnType, std::string fitsPrimary, int fitsHDU, std::string fitsPrimary, std::string fitsSecondary, bool mandatory) std::string fitsSecondary, bool mandatory) { { Mapping::SP m_sp( new Mapping( columnName, columnType, fitsPrimary, Mapping::SP m_sp( new Mapping( columnName, columnType, fitsHDU, fitsSecondary, mandatory), Mapping::Deleter()); fitsPrimary, fitsSecondary, mandatory), Mapping::Deleter()); return m_sp; return m_sp; } } std::string getColumnName() const { return m_columnName; } std::string getColumnName() const { return m_columnName; } std::string getColumnType() const { return m_columnType; } std::string getColumnType() const { return m_columnType; } int getFitsHDU() const { return m_fitsHDU; } std::string getFitsPrimary() const { return m_fitsPrimary; } std::string getFitsPrimary() const { return m_fitsPrimary; } std::string getFitsSecondary() const { return m_fitsSecondary; } std::string getFitsSecondary() const { return m_fitsSecondary; } bool isMandatory() const { return m_mandatory; } bool isMandatory() const { return m_mandatory; } Loading @@ -65,6 +66,9 @@ private: //Metadata table column type //Metadata table column type const std::string m_columnType; const std::string m_columnType; //Metadata fits primary and secondary keys HDU const int m_fitsHDU; //Metadata fits primary key to be stored //Metadata fits primary key to be stored const std::string m_fitsPrimary; const std::string m_fitsPrimary; Loading src/WorkerThread.cpp +190 −82 Original line number Original line Diff line number Diff line Loading @@ -74,10 +74,6 @@ void WorkerThread::workerLoop() boost::shared_ptr<CCfits::FITS> fitsFile_sp( boost::shared_ptr<CCfits::FITS> fitsFile_sp( new CCfits::FITS(origPath.string(), CCfits::Write)); new CCfits::FITS(origPath.string(), CCfits::Write)); //Read all key in primary HDU CCfits::PHDU& phdu = fitsFile_sp->pHDU(); phdu.readAllKeys(); try try { { //Try to ingest using an instrument in list //Try to ingest using an instrument in list Loading Loading @@ -154,6 +150,7 @@ void WorkerThread::ingestUsingInstrumentList(boost::filesystem::path& origPath, DEBUG_STREAM << "WorkerThread::ingestUsingInstrumentList()" << endl; DEBUG_STREAM << "WorkerThread::ingestUsingInstrumentList()" << endl; CCfits::PHDU& phdu = fitsFile_sp->pHDU(); CCfits::PHDU& phdu = fitsFile_sp->pHDU(); phdu.readAllKeys(); Instrument::SP instrument_sp; Instrument::SP instrument_sp; boost::gregorian::date date; boost::gregorian::date date; Loading Loading @@ -259,7 +256,7 @@ void WorkerThread::processFits(boost::filesystem::path& origPath, } } //Insert metadata into database //Insert metadata into database execQuery(session_sp, destination_sp, date, duplicateMax, origPath, phdu); execQuery(session_sp, destination_sp, date, duplicateMax, origPath, fitsFile_sp); //Archive file to storage //Archive file to storage moveFile(origPath,destination_sp, date, duplicateMax); moveFile(origPath,destination_sp, date, duplicateMax); Loading Loading @@ -312,15 +309,12 @@ int WorkerThread::findDuplicateMax(ConnectionManager::SessionSP // WorkerThread::execQuery() // WorkerThread::execQuery() //============================================================================== //============================================================================== void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, Destination::SP destination_sp, boost::gregorian::date& date, Destination::SP destination_sp, boost::gregorian::date& date, int duplicateMax, int duplicateMax, boost::filesystem::path& origPath, CCfits::PHDU& phdu) boost::filesystem::path& origPath, boost::shared_ptr<CCfits::FITS> fitsFile_sp) throw(CCfits::FitsException, std::runtime_error) throw(CCfits::FitsException, std::runtime_error) { { DEBUG_STREAM << "WorkerThread::execQuery()" << endl; DEBUG_STREAM << "WorkerThread::execQuery()" << endl; //Get mapping for a specific instrument Mapping::SPVector mapping_spvector = destination_sp->getMappingSPVector(); //Insert part of the query //Insert part of the query std::stringstream insertQuery; std::stringstream insertQuery; insertQuery << "INSERT INTO " << destination_sp->getSchema() << "." insertQuery << "INSERT INTO " << destination_sp->getSchema() << "." Loading Loading @@ -349,16 +343,55 @@ void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, valuesQuery << ",\'" << origPath.filename().string() << ".gz\'"; valuesQuery << ",\'" << origPath.filename().string() << ".gz\'"; //Search key from mapping and fill query fillQueryFromFits(insertQuery, valuesQuery, destination_sp, fitsFile_sp); //Merge insert and values queries insertQuery << ") " << valuesQuery.rdbuf() << ")"; DEBUG_STREAM << "WorkerThread::execQuery() query " << insertQuery.str() << endl; //Exec query *session_sp << insertQuery.str(); } //============================================================================== // WorkerThread::fillQueryFromFits() //============================================================================== void WorkerThread::fillQueryFromFits(std::stringstream& insertQuery, std::stringstream& valuesQuery, Destination::SP destination_sp, boost::shared_ptr<CCfits::FITS> fitsFile_sp) throw(CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::fillQueryFromFits()" << endl; //Get mapping for a specific instrument Mapping::SPVector mapping_spvector = destination_sp->getMappingSPVector(); //Loop mapping to append metadata columns to query //Loop mapping to append metadata columns to query Mapping::SPVector::iterator it; Mapping::SPVector::iterator it; for(it=mapping_spvector.begin(); it!=mapping_spvector.end(); it++) for(it=mapping_spvector.begin(); it!=mapping_spvector.end(); it++) { { int fitsHDU = (*it)->getFitsHDU(); std::string value; std::string value; try try { { //Search for primary fits key try value = readKey(phdu, (*it)->getFitsPrimary(), (*it)->getColumnType()); { if(fitsHDU == 0) { //Read primary key from PHDU value = readKeyFromPHDU(fitsFile_sp->pHDU(), (*it)->getFitsPrimary(), (*it)->getColumnType()); } else { //Read primary key from extension HDU value = readKeyFromHDU(fitsFile_sp->extension(fitsHDU), (*it)->getFitsPrimary(), (*it)->getColumnType()); } insertQuery << "," << destination_sp->getTable() << "." insertQuery << "," << destination_sp->getTable() << "." << (*it)->getColumnName(); << (*it)->getColumnName(); Loading @@ -366,12 +399,22 @@ void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, } } catch(CCfits::HDU::NoSuchKeyword& ex) catch(CCfits::HDU::NoSuchKeyword& ex) { { WARN_STREAM << "WorkerThread::execQuery() exception: " << ex.message() << endl; WARN_STREAM << "WorkerThread::execQuery() " << ex.message() << endl; try try { { //Search for secondary fits key (alias) if(fitsHDU == 0) value = readKey(phdu, (*it)->getFitsSecondary(), (*it)->getColumnType()); { //Read secondary key from PHDU value = readKeyFromPHDU(fitsFile_sp->pHDU(), (*it)->getFitsSecondary(), (*it)->getColumnType()); } else { //Read secondary key from extension HDU value = readKeyFromHDU(fitsFile_sp->extension(fitsHDU), (*it)->getFitsSecondary(), (*it)->getColumnType()); } insertQuery << "," << destination_sp->getTable() << "." insertQuery << "," << destination_sp->getTable() << "." << (*it)->getColumnName(); << (*it)->getColumnName(); Loading @@ -379,7 +422,7 @@ void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, } } catch(CCfits::HDU::NoSuchKeyword& ex) catch(CCfits::HDU::NoSuchKeyword& ex) { { WARN_STREAM << "WorkerThread::execQuery() exception: " << ex.message() << endl; WARN_STREAM << "WorkerThread::execQuery() " << ex.message() << endl; //If key is mandatory throw and exception //If key is mandatory throw and exception if((*it)->isMandatory()) if((*it)->isMandatory()) Loading @@ -387,32 +430,35 @@ void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, } } } } } } catch(CCfits::FITS::NoSuchHDU& ex) { WARN_STREAM << "WorkerThread::execQuery() " << ex.message() << endl; //Merge insert and values queries //If key is mandatory throw and exception insertQuery << ") " << valuesQuery.rdbuf() << ")"; if((*it)->isMandatory()) throw ex; DEBUG_STREAM << "WorkerThread::execQuery() query " << insertQuery.str() << endl; } } //Exec query *session_sp << insertQuery.str(); } } //============================================================================== //============================================================================== // WorkerThread::readKey() // WorkerThread::readKeyFromPHDU() //============================================================================== //============================================================================== std::string WorkerThread::readKey(CCfits::PHDU& phdu, std::string keyName, std::string WorkerThread::readKeyFromPHDU(CCfits::PHDU& phdu, std::string keyName, std::string dMDBKeyType) throw(CCfits::FitsException, std::runtime_error) std::string keyType) throw(CCfits::FitsException, std::runtime_error) { { DEBUG_STREAM << "WorkerThread::readKey()" << endl; DEBUG_STREAM << "WorkerThread::readKeyFromPHDU()" << endl; phdu.readAllKeys(); std::stringstream result_stream; std::stringstream resultStream; boost::regex pattern("^(NAXIS)([0-9]+)$", boost::regex::icase); boost::regex pattern("^(NAXIS)([0-9]+)$", boost::regex::icase); boost::smatch matches; boost::smatch matches; if(keyName.compare("NAXIS")==0 || keyName.compare("naxis")==0) if(keyName.compare("NAXIS")==0 || keyName.compare("naxis")==0) { { result_stream << phdu.axes(); resultStream << phdu.axes(); } } else if(boost::regex_match(keyName, matches, pattern)) else if(boost::regex_match(keyName, matches, pattern)) { { Loading @@ -425,7 +471,7 @@ std::string WorkerThread::readKey(CCfits::PHDU& phdu, std::string keyName, if(axisReq<0 || axisReq>=axisMax) if(axisReq<0 || axisReq>=axisMax) throw std::runtime_error("Invalid naxis number"); throw std::runtime_error("Invalid naxis number"); result_stream << phdu.axis(axisReq); resultStream << phdu.axis(axisReq); } } catch(boost::bad_lexical_cast& ex) catch(boost::bad_lexical_cast& ex) { { Loading @@ -434,53 +480,115 @@ std::string WorkerThread::readKey(CCfits::PHDU& phdu, std::string keyName, } } else if(keyName.compare("BITPIX")==0 || keyName.compare("bitpix")==0) else if(keyName.compare("BITPIX")==0 || keyName.compare("bitpix")==0) { { result_stream << phdu.bitpix(); resultStream << phdu.bitpix(); } } else if(keyName.compare("BSCALE")==0 || keyName.compare("bscale")==0) else if(keyName.compare("BSCALE")==0 || keyName.compare("bscale")==0) { { result_stream << phdu.scale(); resultStream << phdu.scale(); } } else if(keyName.compare("BZERO")==0 || keyName.compare("bzero")==0) else if(keyName.compare("BZERO")==0 || keyName.compare("bzero")==0) { { result_stream << phdu.zero(); resultStream << phdu.zero(); } } else if(keyName.compare("EXTEND")==0 || keyName.compare("extend")==0) else if(keyName.compare("EXTEND")==0 || keyName.compare("extend")==0) { { result_stream << phdu.extend(); resultStream << phdu.extend(); } else if(keyType.compare("varchar")==0 || keyType.compare("char")==0) { resultStream << "\'" << readKey(phdu, keyName, keyType) << "\'"; } else if(keyType.compare("date")==0 || keyType.compare("datetime")==0 || keyType.compare("time")==0 || keyType.compare("timestamp")==0) { resultStream << "\'" << readKey(phdu, keyName, keyType) << "\'"; } else { resultStream << readKey(phdu, keyName, keyType); } return resultStream.str(); } //============================================================================== // WorkerThread::readKeyFromHDU() //============================================================================== std::string WorkerThread::readKeyFromHDU(CCfits::HDU& hdu, std::string keyName, std::string keyType) throw(CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::readKeyFromHDU()" << endl; hdu.readAllKeys(); std::stringstream resultStream; boost::regex pattern("^(NAXIS)([0-9]+)$", boost::regex::icase); boost::smatch matches; if(keyName.compare("NAXIS")==0 || keyName.compare("naxis")==0) { resultStream << hdu.axes(); } else if(boost::regex_match(keyName, matches, pattern)) { long axisMax = hdu.axes(); try { long axisReq = boost::lexical_cast<long>(matches[2]) - 1; if(axisReq<0 || axisReq>=axisMax) throw std::runtime_error("Invalid naxis number"); resultStream << hdu.axis(axisReq); } catch(boost::bad_lexical_cast& ex) { throw std::runtime_error("Exception parsing naxis number"); } } } else if(dMDBKeyType.compare("varchar")==0 || else if(keyName.compare("BITPIX")==0 || keyName.compare("bitpix")==0) dMDBKeyType.compare("char")==0) { { result_stream << "\'" << extractKey(phdu, keyName, dMDBKeyType) << "\'"; resultStream << hdu.bitpix(); } } else if(dMDBKeyType.compare("date")==0 || else if(keyName.compare("BSCALE")==0 || keyName.compare("bscale")==0) dMDBKeyType.compare("datetime")==0 || { dMDBKeyType.compare("time")==0 || resultStream << hdu.scale(); dMDBKeyType.compare("timestamp")==0) } else if(keyName.compare("BZERO")==0 || keyName.compare("bzero")==0) { { result_stream << "\'" << extractKey(phdu, keyName, dMDBKeyType) << "\'"; resultStream << hdu.zero(); } else if(keyType.compare("varchar")==0 || keyType.compare("char")==0) { resultStream << "\'" << readKey(hdu, keyName, keyType) << "\'"; } else if(keyType.compare("date")==0 || keyType.compare("datetime")==0 || keyType.compare("time")==0 || keyType.compare("timestamp")==0) { resultStream << "\'" << readKey(hdu, keyName, keyType) << "\'"; } } else else { { result_stream << extractKey(phdu, keyName, dMDBKeyType); resultStream << readKey(hdu, keyName, keyType); } } return result_stream.str(); return resultStream.str(); } } //============================================================================== //============================================================================== // WorkerThread::extractKey() // WorkerThread::readKey() //============================================================================== //============================================================================== std::string WorkerThread::extractKey(CCfits::PHDU& phdu, std::string keyName, std::string WorkerThread::readKey(CCfits::HDU& hdu, std::string keyName, std::string dMDBKeyType) throw(CCfits::FitsException, std::runtime_error) std::string keyType) throw(CCfits::FitsException, std::runtime_error) { { DEBUG_STREAM << "WorkerThread::extractKey()" << endl; DEBUG_STREAM << "WorkerThread::readKey()" << endl; int fitsKeyType = phdu.keyWord(keyName).keytype(); int fitsKeyType = hdu.keyWord(keyName).keytype(); #ifdef VERBOSE_DEBUG #ifdef VERBOSE_DEBUG INFO_STREAM << "KEY: " << keyName << " " << fitsKeyType INFO_STREAM << "KEY: " << keyName << " " << fitsKeyType << " -> " << keyType << endl; << " -> " << dMDBKeyType << endl; INFO_STREAM << "-------------------------------------------------" << endl; INFO_STREAM << "-------------------------------------------------" << endl; #endif #endif Loading @@ -491,43 +599,43 @@ std::string WorkerThread::extractKey(CCfits::PHDU& phdu, std::string keyName, case TSTRING: case TSTRING: { { std::string value; std::string value; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return value; return value; } } case TINT: case TSHORT: case TLOGICAL: case TSBYTE: case TINT: case TSHORT: case TLOGICAL: case TSBYTE: { { int value = 0; int value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); return boost::lexical_cast<std::string>(value); } } case TUINT: case TUSHORT: case TBIT: case TBYTE: case TUINT: case TUSHORT: case TBIT: case TBYTE: { { unsigned int value = 0; unsigned int value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); return boost::lexical_cast<std::string>(value); } } case TULONG: case TULONG: { { unsigned long value = 0; unsigned long value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); return boost::lexical_cast<std::string>(value); } } case TLONG: case TLONG: { { long value = 0; long value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); return boost::lexical_cast<std::string>(value); } } case TLONGLONG: case TLONGLONG: { { long long value = 0; long long value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); return boost::lexical_cast<std::string>(value); } } case TDOUBLE: case TFLOAT: case TDOUBLE: case TFLOAT: { { double value = 0; double value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); return boost::lexical_cast<std::string>(value); } } case TCOMPLEX: case TDBLCOMPLEX: case TCOMPLEX: case TDBLCOMPLEX: Loading src/WorkerThread.h +13 −5 Original line number Original line Diff line number Diff line Loading @@ -62,12 +62,20 @@ protected: virtual void execQuery(ConnectionManager::SessionSP, Destination::SP, virtual void execQuery(ConnectionManager::SessionSP, Destination::SP, boost::gregorian::date&, int, boost::filesystem::path&, boost::gregorian::date&, int, boost::filesystem::path&, CCfits::PHDU&) throw(CCfits::FitsException, std::runtime_error); boost::shared_ptr<CCfits::FITS>) throw(CCfits::FitsException, std::runtime_error); virtual void fillQueryFromFits(std::stringstream&, std::stringstream&, Destination::SP, boost::shared_ptr<CCfits::FITS>) throw(CCfits::FitsException, std::runtime_error); virtual std::string readKeyFromPHDU(CCfits::PHDU&, std::string, std::string) throw(CCfits::FitsException, std::runtime_error); virtual std::string readKey(CCfits::PHDU&, std::string, std::string) virtual std::string readKeyFromHDU(CCfits::HDU&, std::string, std::string) throw(CCfits::FitsException, std::runtime_error); throw(CCfits::FitsException, std::runtime_error); virtual std::string extractKey(CCfits::PHDU&, std::string, std::string) virtual std::string readKey(CCfits::HDU&, std::string, std::string) throw(CCfits::FitsException, std::runtime_error); throw(CCfits::FitsException, std::runtime_error); virtual void moveFile(boost::filesystem::path&, Destination::SP, virtual void moveFile(boost::filesystem::path&, Destination::SP, Loading Loading
src/DMDBImporter.cpp +14 −10 Original line number Original line Diff line number Diff line Loading @@ -276,8 +276,8 @@ Mapping::SPVector DMDBImporter::getMapping(int destinationId) { { DEBUG_STREAM << "DMDBImporter::getMapping()" << endl; DEBUG_STREAM << "DMDBImporter::getMapping()" << endl; soci::rowset< MappingTuple > rows = (m_session_sp->prepare << soci::rowset< MappingTuple > rows = (m_session_sp->prepare << "SELECT id," "SELECT id, column_name, column_type, fits_key_pri, fits_key_sec, mandatory " "column_name, column_type, fits_key_hdu, fits_key_pri, fits_key_sec, mandatory " "FROM " << m_mappingTable << " WHERE dest_id=:id", soci::use(destinationId,"id")); "FROM " << m_mappingTable << " WHERE dest_id=:id", soci::use(destinationId,"id")); MappingTupleVector mappingTupleList; MappingTupleVector mappingTupleList; Loading @@ -301,26 +301,30 @@ Mapping::SPVector DMDBImporter::getMapping(int destinationId) std::string columnType = it->get<2>().get(); std::string columnType = it->get<2>().get(); if(!it->get<3>()) if(!it->get<3>()) throw soci::soci_error("FitsPrimary not found for id " + mapId); throw soci::soci_error("FitsHDU not found for id " + mapId); std::string fitsPrimary = it->get<3>().get(); int fitsHDU = it->get<3>().get(); if(!it->get<4>()) if(!it->get<4>()) throw soci::soci_error("FitsSecondary not found for id " + mapId); throw soci::soci_error("FitsPrimary not found for id " + mapId); std::string fitsSecondary = it->get<4>().get(); std::string fitsPrimary = it->get<4>().get(); if(!it->get<5>()) if(!it->get<5>()) throw soci::soci_error("FitsSecondary not found for id " + mapId); std::string fitsSecondary = it->get<5>().get(); if(!it->get<6>()) throw soci::soci_error("Mandatory not found for id " + mapId); throw soci::soci_error("Mandatory not found for id " + mapId); int mandatory = it->get<5>().get(); int mandatory = it->get<6>().get(); #ifdef VERBOSE_DEBUG #ifdef VERBOSE_DEBUG INFO_STREAM << "MAPPING: " << mapId << " " << columnName << " " INFO_STREAM << "MAPPING: " << mapId << " " << columnName << " " << columnType << " " << fitsPrimary << " " << fitsSecondary << " " << columnType << " " << fitsHDU << " " << fitsPrimary << " " << mandatory << endl; << fitsSecondary << " " << mandatory << endl; INFO_STREAM << "-------------------------------------------------" << endl; INFO_STREAM << "-------------------------------------------------" << endl; #endif #endif mapping_spvector.push_back( Mapping::create(columnName, columnType, mapping_spvector.push_back( Mapping::create(columnName, columnType, fitsPrimary, fitsSecondary, mandatory) ); fitsHDU, fitsPrimary, fitsSecondary, mandatory) ); } } return mapping_spvector; return mapping_spvector; Loading
src/DMDBImporter.h +2 −1 Original line number Original line Diff line number Diff line Loading @@ -100,8 +100,9 @@ protected: // [Protected] Mapping tuple typedef // [Protected] Mapping tuple typedef //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ typedef boost::tuple< boost::optional<int>, boost::optional<std::string>, typedef boost::tuple< boost::optional<int>, boost::optional<std::string>, boost::optional<std::string>, boost::optional<int>, boost::optional<std::string>, boost::optional<std::string>, boost::optional<std::string>, boost::optional<std::string>, boost::optional<std::string>, boost::optional<int> > MappingTuple; boost::optional<int> > MappingTuple; typedef std::vector< MappingTuple > MappingTupleVector; typedef std::vector< MappingTuple > MappingTupleVector; Loading
src/Mapping.h +21 −17 Original line number Original line Diff line number Diff line Loading @@ -21,11 +21,11 @@ private: //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ // [Private] Constructor destructor deleter // [Private] Constructor destructor deleter //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ Mapping(std::string columnName, std::string columnType, Mapping(std::string columnName, std::string columnType, int fitsHDU, std::string fitsPrimary, std::string fitsSecondary, std::string fitsPrimary, std::string fitsSecondary, bool mandatory) : bool mandatory) : m_columnName(columnName), m_columnName(columnName), m_columnType(columnType), m_fitsHDU(fitsHDU), m_columnType(columnType), m_fitsPrimary(fitsPrimary), m_fitsPrimary(fitsPrimary), m_fitsSecondary(fitsSecondary), m_fitsSecondary(fitsSecondary), m_mandatory(mandatory) {} m_mandatory(mandatory) {} ~Mapping() {} ~Mapping() {} class Deleter; class Deleter; Loading @@ -40,17 +40,18 @@ public: //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ // [Public] Users methods // [Public] Users methods //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ static Mapping::SP create(std::string columnName, static Mapping::SP create(std::string columnName, std::string columnType, std::string columnType, std::string fitsPrimary, int fitsHDU, std::string fitsPrimary, std::string fitsSecondary, bool mandatory) std::string fitsSecondary, bool mandatory) { { Mapping::SP m_sp( new Mapping( columnName, columnType, fitsPrimary, Mapping::SP m_sp( new Mapping( columnName, columnType, fitsHDU, fitsSecondary, mandatory), Mapping::Deleter()); fitsPrimary, fitsSecondary, mandatory), Mapping::Deleter()); return m_sp; return m_sp; } } std::string getColumnName() const { return m_columnName; } std::string getColumnName() const { return m_columnName; } std::string getColumnType() const { return m_columnType; } std::string getColumnType() const { return m_columnType; } int getFitsHDU() const { return m_fitsHDU; } std::string getFitsPrimary() const { return m_fitsPrimary; } std::string getFitsPrimary() const { return m_fitsPrimary; } std::string getFitsSecondary() const { return m_fitsSecondary; } std::string getFitsSecondary() const { return m_fitsSecondary; } bool isMandatory() const { return m_mandatory; } bool isMandatory() const { return m_mandatory; } Loading @@ -65,6 +66,9 @@ private: //Metadata table column type //Metadata table column type const std::string m_columnType; const std::string m_columnType; //Metadata fits primary and secondary keys HDU const int m_fitsHDU; //Metadata fits primary key to be stored //Metadata fits primary key to be stored const std::string m_fitsPrimary; const std::string m_fitsPrimary; Loading
src/WorkerThread.cpp +190 −82 Original line number Original line Diff line number Diff line Loading @@ -74,10 +74,6 @@ void WorkerThread::workerLoop() boost::shared_ptr<CCfits::FITS> fitsFile_sp( boost::shared_ptr<CCfits::FITS> fitsFile_sp( new CCfits::FITS(origPath.string(), CCfits::Write)); new CCfits::FITS(origPath.string(), CCfits::Write)); //Read all key in primary HDU CCfits::PHDU& phdu = fitsFile_sp->pHDU(); phdu.readAllKeys(); try try { { //Try to ingest using an instrument in list //Try to ingest using an instrument in list Loading Loading @@ -154,6 +150,7 @@ void WorkerThread::ingestUsingInstrumentList(boost::filesystem::path& origPath, DEBUG_STREAM << "WorkerThread::ingestUsingInstrumentList()" << endl; DEBUG_STREAM << "WorkerThread::ingestUsingInstrumentList()" << endl; CCfits::PHDU& phdu = fitsFile_sp->pHDU(); CCfits::PHDU& phdu = fitsFile_sp->pHDU(); phdu.readAllKeys(); Instrument::SP instrument_sp; Instrument::SP instrument_sp; boost::gregorian::date date; boost::gregorian::date date; Loading Loading @@ -259,7 +256,7 @@ void WorkerThread::processFits(boost::filesystem::path& origPath, } } //Insert metadata into database //Insert metadata into database execQuery(session_sp, destination_sp, date, duplicateMax, origPath, phdu); execQuery(session_sp, destination_sp, date, duplicateMax, origPath, fitsFile_sp); //Archive file to storage //Archive file to storage moveFile(origPath,destination_sp, date, duplicateMax); moveFile(origPath,destination_sp, date, duplicateMax); Loading Loading @@ -312,15 +309,12 @@ int WorkerThread::findDuplicateMax(ConnectionManager::SessionSP // WorkerThread::execQuery() // WorkerThread::execQuery() //============================================================================== //============================================================================== void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, Destination::SP destination_sp, boost::gregorian::date& date, Destination::SP destination_sp, boost::gregorian::date& date, int duplicateMax, int duplicateMax, boost::filesystem::path& origPath, CCfits::PHDU& phdu) boost::filesystem::path& origPath, boost::shared_ptr<CCfits::FITS> fitsFile_sp) throw(CCfits::FitsException, std::runtime_error) throw(CCfits::FitsException, std::runtime_error) { { DEBUG_STREAM << "WorkerThread::execQuery()" << endl; DEBUG_STREAM << "WorkerThread::execQuery()" << endl; //Get mapping for a specific instrument Mapping::SPVector mapping_spvector = destination_sp->getMappingSPVector(); //Insert part of the query //Insert part of the query std::stringstream insertQuery; std::stringstream insertQuery; insertQuery << "INSERT INTO " << destination_sp->getSchema() << "." insertQuery << "INSERT INTO " << destination_sp->getSchema() << "." Loading Loading @@ -349,16 +343,55 @@ void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, valuesQuery << ",\'" << origPath.filename().string() << ".gz\'"; valuesQuery << ",\'" << origPath.filename().string() << ".gz\'"; //Search key from mapping and fill query fillQueryFromFits(insertQuery, valuesQuery, destination_sp, fitsFile_sp); //Merge insert and values queries insertQuery << ") " << valuesQuery.rdbuf() << ")"; DEBUG_STREAM << "WorkerThread::execQuery() query " << insertQuery.str() << endl; //Exec query *session_sp << insertQuery.str(); } //============================================================================== // WorkerThread::fillQueryFromFits() //============================================================================== void WorkerThread::fillQueryFromFits(std::stringstream& insertQuery, std::stringstream& valuesQuery, Destination::SP destination_sp, boost::shared_ptr<CCfits::FITS> fitsFile_sp) throw(CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::fillQueryFromFits()" << endl; //Get mapping for a specific instrument Mapping::SPVector mapping_spvector = destination_sp->getMappingSPVector(); //Loop mapping to append metadata columns to query //Loop mapping to append metadata columns to query Mapping::SPVector::iterator it; Mapping::SPVector::iterator it; for(it=mapping_spvector.begin(); it!=mapping_spvector.end(); it++) for(it=mapping_spvector.begin(); it!=mapping_spvector.end(); it++) { { int fitsHDU = (*it)->getFitsHDU(); std::string value; std::string value; try try { { //Search for primary fits key try value = readKey(phdu, (*it)->getFitsPrimary(), (*it)->getColumnType()); { if(fitsHDU == 0) { //Read primary key from PHDU value = readKeyFromPHDU(fitsFile_sp->pHDU(), (*it)->getFitsPrimary(), (*it)->getColumnType()); } else { //Read primary key from extension HDU value = readKeyFromHDU(fitsFile_sp->extension(fitsHDU), (*it)->getFitsPrimary(), (*it)->getColumnType()); } insertQuery << "," << destination_sp->getTable() << "." insertQuery << "," << destination_sp->getTable() << "." << (*it)->getColumnName(); << (*it)->getColumnName(); Loading @@ -366,12 +399,22 @@ void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, } } catch(CCfits::HDU::NoSuchKeyword& ex) catch(CCfits::HDU::NoSuchKeyword& ex) { { WARN_STREAM << "WorkerThread::execQuery() exception: " << ex.message() << endl; WARN_STREAM << "WorkerThread::execQuery() " << ex.message() << endl; try try { { //Search for secondary fits key (alias) if(fitsHDU == 0) value = readKey(phdu, (*it)->getFitsSecondary(), (*it)->getColumnType()); { //Read secondary key from PHDU value = readKeyFromPHDU(fitsFile_sp->pHDU(), (*it)->getFitsSecondary(), (*it)->getColumnType()); } else { //Read secondary key from extension HDU value = readKeyFromHDU(fitsFile_sp->extension(fitsHDU), (*it)->getFitsSecondary(), (*it)->getColumnType()); } insertQuery << "," << destination_sp->getTable() << "." insertQuery << "," << destination_sp->getTable() << "." << (*it)->getColumnName(); << (*it)->getColumnName(); Loading @@ -379,7 +422,7 @@ void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, } } catch(CCfits::HDU::NoSuchKeyword& ex) catch(CCfits::HDU::NoSuchKeyword& ex) { { WARN_STREAM << "WorkerThread::execQuery() exception: " << ex.message() << endl; WARN_STREAM << "WorkerThread::execQuery() " << ex.message() << endl; //If key is mandatory throw and exception //If key is mandatory throw and exception if((*it)->isMandatory()) if((*it)->isMandatory()) Loading @@ -387,32 +430,35 @@ void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp, } } } } } } catch(CCfits::FITS::NoSuchHDU& ex) { WARN_STREAM << "WorkerThread::execQuery() " << ex.message() << endl; //Merge insert and values queries //If key is mandatory throw and exception insertQuery << ") " << valuesQuery.rdbuf() << ")"; if((*it)->isMandatory()) throw ex; DEBUG_STREAM << "WorkerThread::execQuery() query " << insertQuery.str() << endl; } } //Exec query *session_sp << insertQuery.str(); } } //============================================================================== //============================================================================== // WorkerThread::readKey() // WorkerThread::readKeyFromPHDU() //============================================================================== //============================================================================== std::string WorkerThread::readKey(CCfits::PHDU& phdu, std::string keyName, std::string WorkerThread::readKeyFromPHDU(CCfits::PHDU& phdu, std::string keyName, std::string dMDBKeyType) throw(CCfits::FitsException, std::runtime_error) std::string keyType) throw(CCfits::FitsException, std::runtime_error) { { DEBUG_STREAM << "WorkerThread::readKey()" << endl; DEBUG_STREAM << "WorkerThread::readKeyFromPHDU()" << endl; phdu.readAllKeys(); std::stringstream result_stream; std::stringstream resultStream; boost::regex pattern("^(NAXIS)([0-9]+)$", boost::regex::icase); boost::regex pattern("^(NAXIS)([0-9]+)$", boost::regex::icase); boost::smatch matches; boost::smatch matches; if(keyName.compare("NAXIS")==0 || keyName.compare("naxis")==0) if(keyName.compare("NAXIS")==0 || keyName.compare("naxis")==0) { { result_stream << phdu.axes(); resultStream << phdu.axes(); } } else if(boost::regex_match(keyName, matches, pattern)) else if(boost::regex_match(keyName, matches, pattern)) { { Loading @@ -425,7 +471,7 @@ std::string WorkerThread::readKey(CCfits::PHDU& phdu, std::string keyName, if(axisReq<0 || axisReq>=axisMax) if(axisReq<0 || axisReq>=axisMax) throw std::runtime_error("Invalid naxis number"); throw std::runtime_error("Invalid naxis number"); result_stream << phdu.axis(axisReq); resultStream << phdu.axis(axisReq); } } catch(boost::bad_lexical_cast& ex) catch(boost::bad_lexical_cast& ex) { { Loading @@ -434,53 +480,115 @@ std::string WorkerThread::readKey(CCfits::PHDU& phdu, std::string keyName, } } else if(keyName.compare("BITPIX")==0 || keyName.compare("bitpix")==0) else if(keyName.compare("BITPIX")==0 || keyName.compare("bitpix")==0) { { result_stream << phdu.bitpix(); resultStream << phdu.bitpix(); } } else if(keyName.compare("BSCALE")==0 || keyName.compare("bscale")==0) else if(keyName.compare("BSCALE")==0 || keyName.compare("bscale")==0) { { result_stream << phdu.scale(); resultStream << phdu.scale(); } } else if(keyName.compare("BZERO")==0 || keyName.compare("bzero")==0) else if(keyName.compare("BZERO")==0 || keyName.compare("bzero")==0) { { result_stream << phdu.zero(); resultStream << phdu.zero(); } } else if(keyName.compare("EXTEND")==0 || keyName.compare("extend")==0) else if(keyName.compare("EXTEND")==0 || keyName.compare("extend")==0) { { result_stream << phdu.extend(); resultStream << phdu.extend(); } else if(keyType.compare("varchar")==0 || keyType.compare("char")==0) { resultStream << "\'" << readKey(phdu, keyName, keyType) << "\'"; } else if(keyType.compare("date")==0 || keyType.compare("datetime")==0 || keyType.compare("time")==0 || keyType.compare("timestamp")==0) { resultStream << "\'" << readKey(phdu, keyName, keyType) << "\'"; } else { resultStream << readKey(phdu, keyName, keyType); } return resultStream.str(); } //============================================================================== // WorkerThread::readKeyFromHDU() //============================================================================== std::string WorkerThread::readKeyFromHDU(CCfits::HDU& hdu, std::string keyName, std::string keyType) throw(CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::readKeyFromHDU()" << endl; hdu.readAllKeys(); std::stringstream resultStream; boost::regex pattern("^(NAXIS)([0-9]+)$", boost::regex::icase); boost::smatch matches; if(keyName.compare("NAXIS")==0 || keyName.compare("naxis")==0) { resultStream << hdu.axes(); } else if(boost::regex_match(keyName, matches, pattern)) { long axisMax = hdu.axes(); try { long axisReq = boost::lexical_cast<long>(matches[2]) - 1; if(axisReq<0 || axisReq>=axisMax) throw std::runtime_error("Invalid naxis number"); resultStream << hdu.axis(axisReq); } catch(boost::bad_lexical_cast& ex) { throw std::runtime_error("Exception parsing naxis number"); } } } else if(dMDBKeyType.compare("varchar")==0 || else if(keyName.compare("BITPIX")==0 || keyName.compare("bitpix")==0) dMDBKeyType.compare("char")==0) { { result_stream << "\'" << extractKey(phdu, keyName, dMDBKeyType) << "\'"; resultStream << hdu.bitpix(); } } else if(dMDBKeyType.compare("date")==0 || else if(keyName.compare("BSCALE")==0 || keyName.compare("bscale")==0) dMDBKeyType.compare("datetime")==0 || { dMDBKeyType.compare("time")==0 || resultStream << hdu.scale(); dMDBKeyType.compare("timestamp")==0) } else if(keyName.compare("BZERO")==0 || keyName.compare("bzero")==0) { { result_stream << "\'" << extractKey(phdu, keyName, dMDBKeyType) << "\'"; resultStream << hdu.zero(); } else if(keyType.compare("varchar")==0 || keyType.compare("char")==0) { resultStream << "\'" << readKey(hdu, keyName, keyType) << "\'"; } else if(keyType.compare("date")==0 || keyType.compare("datetime")==0 || keyType.compare("time")==0 || keyType.compare("timestamp")==0) { resultStream << "\'" << readKey(hdu, keyName, keyType) << "\'"; } } else else { { result_stream << extractKey(phdu, keyName, dMDBKeyType); resultStream << readKey(hdu, keyName, keyType); } } return result_stream.str(); return resultStream.str(); } } //============================================================================== //============================================================================== // WorkerThread::extractKey() // WorkerThread::readKey() //============================================================================== //============================================================================== std::string WorkerThread::extractKey(CCfits::PHDU& phdu, std::string keyName, std::string WorkerThread::readKey(CCfits::HDU& hdu, std::string keyName, std::string dMDBKeyType) throw(CCfits::FitsException, std::runtime_error) std::string keyType) throw(CCfits::FitsException, std::runtime_error) { { DEBUG_STREAM << "WorkerThread::extractKey()" << endl; DEBUG_STREAM << "WorkerThread::readKey()" << endl; int fitsKeyType = phdu.keyWord(keyName).keytype(); int fitsKeyType = hdu.keyWord(keyName).keytype(); #ifdef VERBOSE_DEBUG #ifdef VERBOSE_DEBUG INFO_STREAM << "KEY: " << keyName << " " << fitsKeyType INFO_STREAM << "KEY: " << keyName << " " << fitsKeyType << " -> " << keyType << endl; << " -> " << dMDBKeyType << endl; INFO_STREAM << "-------------------------------------------------" << endl; INFO_STREAM << "-------------------------------------------------" << endl; #endif #endif Loading @@ -491,43 +599,43 @@ std::string WorkerThread::extractKey(CCfits::PHDU& phdu, std::string keyName, case TSTRING: case TSTRING: { { std::string value; std::string value; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return value; return value; } } case TINT: case TSHORT: case TLOGICAL: case TSBYTE: case TINT: case TSHORT: case TLOGICAL: case TSBYTE: { { int value = 0; int value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); return boost::lexical_cast<std::string>(value); } } case TUINT: case TUSHORT: case TBIT: case TBYTE: case TUINT: case TUSHORT: case TBIT: case TBYTE: { { unsigned int value = 0; unsigned int value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); return boost::lexical_cast<std::string>(value); } } case TULONG: case TULONG: { { unsigned long value = 0; unsigned long value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); return boost::lexical_cast<std::string>(value); } } case TLONG: case TLONG: { { long value = 0; long value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); return boost::lexical_cast<std::string>(value); } } case TLONGLONG: case TLONGLONG: { { long long value = 0; long long value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); return boost::lexical_cast<std::string>(value); } } case TDOUBLE: case TFLOAT: case TDOUBLE: case TFLOAT: { { double value = 0; double value = 0; phdu.readKey(keyName, value); hdu.readKey(keyName, value); return boost::lexical_cast<std::string>(value); return boost::lexical_cast<std::string>(value); } } case TCOMPLEX: case TDBLCOMPLEX: case TCOMPLEX: case TDBLCOMPLEX: Loading
src/WorkerThread.h +13 −5 Original line number Original line Diff line number Diff line Loading @@ -62,12 +62,20 @@ protected: virtual void execQuery(ConnectionManager::SessionSP, Destination::SP, virtual void execQuery(ConnectionManager::SessionSP, Destination::SP, boost::gregorian::date&, int, boost::filesystem::path&, boost::gregorian::date&, int, boost::filesystem::path&, CCfits::PHDU&) throw(CCfits::FitsException, std::runtime_error); boost::shared_ptr<CCfits::FITS>) throw(CCfits::FitsException, std::runtime_error); virtual void fillQueryFromFits(std::stringstream&, std::stringstream&, Destination::SP, boost::shared_ptr<CCfits::FITS>) throw(CCfits::FitsException, std::runtime_error); virtual std::string readKeyFromPHDU(CCfits::PHDU&, std::string, std::string) throw(CCfits::FitsException, std::runtime_error); virtual std::string readKey(CCfits::PHDU&, std::string, std::string) virtual std::string readKeyFromHDU(CCfits::HDU&, std::string, std::string) throw(CCfits::FitsException, std::runtime_error); throw(CCfits::FitsException, std::runtime_error); virtual std::string extractKey(CCfits::PHDU&, std::string, std::string) virtual std::string readKey(CCfits::HDU&, std::string, std::string) throw(CCfits::FitsException, std::runtime_error); throw(CCfits::FitsException, std::runtime_error); virtual void moveFile(boost::filesystem::path&, Destination::SP, virtual void moveFile(boost::filesystem::path&, Destination::SP, Loading