Loading transfer_service/db_connector.py +299 −237 Original line number Diff line number Diff line Loading @@ -58,10 +58,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM node_vos_path WHERE vos_path = %s;", (vospacePath,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() print(e) raise else: if result: return True else: Loading @@ -80,10 +81,11 @@ class DbConnector(object): """, (vospacePath,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() print(e) raise else: return result[0]["creator_id"] def getGroupRead(self, vospacePath): Loading @@ -99,10 +101,11 @@ class DbConnector(object): """, (vospacePath,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() print(e) raise else: for i in range(0, len(result)): result[i]["group_read"] = result[i]["group_read"].split("people.")[-1].replace("\\", "") return result Loading @@ -120,10 +123,11 @@ class DbConnector(object): """, (vospacePath,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() print(e) raise else: for i in range(0, len(result)): result[i]["group_write"] = result[i]["group_write"].split("people.")[-1].replace("\\", "") return result Loading @@ -143,10 +147,11 @@ class DbConnector(object): """, (vospacePath,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() print(e) raise else: storageType = result[0]["storage_type"] basePath = result[0]["base_path"] userName = result[0]["user_name"] Loading Loading @@ -181,10 +186,11 @@ class DbConnector(object): """, (vospacePath,)) results = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() print(e) raise else: vospacePathList = [] for el in results: vospacePathList.append(el["vos_path"]) Loading Loading @@ -221,10 +227,11 @@ class DbConnector(object): JOIN storage s ON s.storage_id = l.storage_src_id; """) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() print(e) raise else: return result def nodeIsBusy(self, vospacePath): Loading @@ -239,10 +246,11 @@ class DbConnector(object): """, (vospacePath,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() print(e) raise else: if result[0]["job_id"]: return True else: Loading @@ -257,10 +265,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() print(e) raise else: if result: return True else: Loading @@ -276,9 +285,11 @@ class DbConnector(object): result = cursor.fetchall() #out.write(f"result: {result}\n\n") #out.close() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: if not result: return json.loads('{ "error": "JOB_NOT_FOUND" }') else: Loading @@ -302,9 +313,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT job_info FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["job_info"] def getJobResults(self, jobId): Loading @@ -314,9 +327,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT results FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["results"] def listActiveJobs(self): Loading @@ -339,9 +354,11 @@ class DbConnector(object): ORDER BY creation_time DESC; """) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: for row in result: for idx in row: el = row[idx] Loading Loading @@ -381,9 +398,11 @@ class DbConnector(object): """, (phase,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: for row in result: for idx in row: el = row[idx] Loading @@ -400,9 +419,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM users WHERE user_name = %s;", (username,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: if result: return True else: Loading @@ -415,9 +436,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT user_id FROM users WHERE user_name = %s;", (username,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["user_id"] def getUserName(self, userId): Loading @@ -427,9 +450,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT user_name FROM users WHERE user_id = %s;", (userId,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["user_name"] def getUserEmail(self, userId): Loading @@ -439,9 +464,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT e_mail FROM users WHERE user_id = %s;", (userId,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["e_mail"] ##### Storage ##### Loading @@ -458,9 +485,11 @@ class DbConnector(object): """, (path,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: if result: return result[0]["base_path"] else: Loading @@ -473,9 +502,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT base_path FROM storage WHERE storage_id = %s;", (storageId,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["base_path"] def getStorageList(self): Loading @@ -485,9 +516,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM storage WHERE storage_type <> 'local';") result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: return result def getStorageListByType(self, storageType): Loading @@ -497,9 +530,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM storage WHERE storage_type = %s;", (storageType,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: return result def getStorageType(self, basePath): Loading @@ -509,9 +544,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT storage_type FROM storage WHERE base_path = %s;", (basePath,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: if result: return result[0]["storage_type"] else: Loading @@ -524,9 +561,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT storage_id FROM storage WHERE base_path = %s;", (basePath,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: if result: return result[0]["storage_id"] else: Loading @@ -541,9 +580,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT location_id FROM location WHERE storage_src_id = %s;", (destStorageId,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["location_id"] Loading Loading @@ -602,9 +643,10 @@ class DbConnector(object): jobObj.errorMessage, jobObj.errorType,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise def setStartTime(self, jobId): """Sets the job 'start_time' parameter.""" Loading @@ -618,9 +660,10 @@ class DbConnector(object): """, (startTime, jobId,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise def setEndTime(self, jobId): """Sets the job 'end_time' parameter.""" Loading @@ -634,9 +677,10 @@ class DbConnector(object): """, (endTime, jobId,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise def setPhase(self, jobId, phase): """Sets the job 'phase' parameter.""" Loading @@ -649,9 +693,10 @@ class DbConnector(object): """, (phase, jobId,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise def setTotalBlocks(self, jobId, totalBlocks): """ Loading @@ -667,9 +712,10 @@ class DbConnector(object): """, (totalBlocks, jobId,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise def updateProcessedBlocks(self, jobId, processedBlocks): """ Loading @@ -685,9 +731,10 @@ class DbConnector(object): """, (processedBlocks, jobId,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise def setResults(self, jobId, results): """Sets the job 'results' parameter.""" Loading @@ -701,9 +748,10 @@ class DbConnector(object): (json.dumps(results), jobId,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise ##### Node ##### Loading @@ -730,10 +778,11 @@ class DbConnector(object): #out.write(f"parentLtreePath: {parentLtreePath}\n") #out.write(f"parentPath: {node.parentPath}\n\n") #out.close() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: try: #print(f"parentLtreePath: {parentLtreePath}, type: {type(parentLtreePath)}") cursor.execute(""" Loading Loading @@ -768,13 +817,15 @@ class DbConnector(object): node.contentMD5,)) result = cursor.fetchall() conn.commit() except Exception: if not conn.closed: conn.rollback() raise else: if result: return True else: return False except Exception as e: if not conn.closed: conn.rollback() def setAsyncTrans(self, nodeVOSPath, value): """Sets the 'async_trans' flag for a VOSpace node.""" Loading @@ -788,9 +839,10 @@ class DbConnector(object): """, (value, nodeVOSPath,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise def setJobId(self, nodeVOSPath, value): """Sets the 'job_id' flag for a VOSpace node.""" Loading @@ -804,9 +856,10 @@ class DbConnector(object): """, (value, nodeVOSPath,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise def setPhyDeletedOn(self, nodeId): """Sets the 'phy_deleted_on' flag for a VOSpace deleted node.""" Loading @@ -820,9 +873,10 @@ class DbConnector(object): """, (phyDeletedOn, nodeId,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise def updateGroupRead(self, groupToAdd, groupToRemove, nodeVOSPath): with self.getConnection() as conn: Loading @@ -838,9 +892,10 @@ class DbConnector(object): groupToRemove, nodeVOSPath,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise def updateGroupWrite(self, groupToAdd, groupToRemove, nodeVOSPath): with self.getConnection() as conn: Loading @@ -856,9 +911,10 @@ class DbConnector(object): groupToRemove, nodeVOSPath,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise ##### Storage ##### Loading @@ -880,13 +936,12 @@ class DbConnector(object): basePath, baseUrl, hostname,)) storageSrcId = cursor.fetchall()[0]["storage_id"] except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: if storageType == "cold" or storageType == "hot": try: cursor.execute(""" Loading @@ -897,9 +952,11 @@ class DbConnector(object): AND hostname = 'localhost'; """) storageDestId = cursor.fetchall()[0]["storage_id"] except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: locationType = "async" else: storageDestId = storageSrcId Loading @@ -914,11 +971,12 @@ class DbConnector(object): (locationType, storageSrcId, storageDestId,)) conn.commit() except: except Exception: if not conn.closed: conn.rollback() raise else: return True else: return False Loading @@ -940,10 +998,11 @@ class DbConnector(object): """, (storageId,)) result = cursor.fetchall() except: except Exception: if not conn.closed: conn.rollback() raise else: if result[0]["res"]: return False else: Loading @@ -954,9 +1013,11 @@ class DbConnector(object): """, (storageId,)) conn.commit() except: except Exception: if not conn.closed: conn.rollback() raise else: return True ##### Users ##### Loading @@ -978,6 +1039,7 @@ class DbConnector(object): username, email,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise transfer_service/exceptions.py +9 −1 Original line number Diff line number Diff line Loading @@ -5,7 +5,15 @@ class Error(Exception): pass # RAP exceptions # FileGrouper exceptions class TarFileCreationException(Error): def __init__(self, folder): self.message = "Error: cannot create a .tar for " + folder super(MultipleUsersException, self).__init__(self.message) # RapClient exceptions class MultipleUsersException(Error): def __init__(self): Loading transfer_service/file_grouper.py +3 −1 Original line number Diff line number Diff line Loading @@ -5,6 +5,8 @@ import shutil import subprocess import sys from exceptions import TarFileCreationException class FileGrouper(object): Loading Loading @@ -61,7 +63,7 @@ class FileGrouper(object): os.chdir(parent) sp = subprocess.run(["tar", "-cf", os.path.basename(folder) + ".tar", os.path.basename(folder)], capture_output = True) if(sp.returncode or sp.stderr): sys.exit(f"Error: cannot create a .tar for {folder}") raise(TarFileCreationException(folder)) else: try: shutil.rmtree(folder) Loading transfer_service/job_queue.py +47 −28 Original line number Diff line number Diff line #!/usr/bin/env python # # A FIFO queue based on a Redis list # A FIFO queue based on Redis lists # # Loading @@ -23,7 +23,12 @@ class JobQueue(object): def len(self): """Returns the number of jobs in the current queue.""" return self.redisCli.llen(self.queueName) try: numJobs = self.redisCli.llen(self.queueName) except Exception: raise else: return numJobs def name(self): """Returns the name of the current queue.""" Loading @@ -31,6 +36,7 @@ class JobQueue(object): def getJob(self): """Gets a copy of the first job without moving it out from the current queue.""" try: job = json.loads(self.redisCli.lrange(self.queueName, self.len() - 1, self.len() - 1)[0].decode("utf-8")) jobObj = Job() jobObj.setId(job["jobId"]) Loading @@ -42,6 +48,9 @@ class JobQueue(object): jobObj.setEndTime(job["endTime"]) jobObj.setExecutionDuration(job["executionDuration"]) jobObj.setInfo(job["jobInfo"]) except Exception: raise else: return jobObj def insertJob(self, jobObj): Loading @@ -58,10 +67,14 @@ class JobQueue(object): "parameters": jobObj.parameters, "results": jobObj.results, "jobInfo": jobObj.jobInfo } try: self.redisCli.lpush(self.queueName, json.dumps(data)) except Exception: raise def extractJob(self): """Moves out a job from the end of the current queue.""" try: job = json.loads(self.redisCli.brpop(self.queueName)[1].decode("utf-8")) jobObj = Job() jobObj.setId(job["jobId"]) Loading @@ -73,6 +86,9 @@ class JobQueue(object): jobObj.setEndTime(job["endTime"]) jobObj.setExecutionDuration(job["executionDuration"]) jobObj.setInfo(job["jobInfo"]) except Exception: raise else: return jobObj def moveJobTo(self, nextQueueName): Loading @@ -81,7 +97,10 @@ class JobQueue(object): at the beginning of the next queue (this operation is atomic) DEPRECATED """ try: self.redisCli.brpoplpush(self.queueName, nextQueueName) except Exception: raise # Test Loading transfer_service/mailer.py +5 −8 Original line number Diff line number Diff line Loading @@ -63,17 +63,14 @@ class Mailer(object): try: smtpObj = smtplib.SMTP(self.smtpServer, self.smtpPort) smtpObj.send_message(self.message) logMsg = "E-mail message sent successfully!" self.logger.debug(logMsg) except SMTPConnectError: logMsg = "Cannot connect to SMTP server." self.logger.exception(logMsg) self.logger.exception("Cannot connect to SMTP server.") except TimeoutError: logMsg = "Connection timeout." self.logger.exception(logMsg) self.logger.exception("Connection timeout.") except SMTPException: logMsg = "Cannot send email message." self.logger.exception(logMsg) self.logger.exception("Cannot send email message.") else: self.logger.debug("E-mail message sent successfully!") else: self.logger.debug("E-mail notifications disabled.") Loading Loading
transfer_service/db_connector.py +299 −237 Original line number Diff line number Diff line Loading @@ -58,10 +58,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM node_vos_path WHERE vos_path = %s;", (vospacePath,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() print(e) raise else: if result: return True else: Loading @@ -80,10 +81,11 @@ class DbConnector(object): """, (vospacePath,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() print(e) raise else: return result[0]["creator_id"] def getGroupRead(self, vospacePath): Loading @@ -99,10 +101,11 @@ class DbConnector(object): """, (vospacePath,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() print(e) raise else: for i in range(0, len(result)): result[i]["group_read"] = result[i]["group_read"].split("people.")[-1].replace("\\", "") return result Loading @@ -120,10 +123,11 @@ class DbConnector(object): """, (vospacePath,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() print(e) raise else: for i in range(0, len(result)): result[i]["group_write"] = result[i]["group_write"].split("people.")[-1].replace("\\", "") return result Loading @@ -143,10 +147,11 @@ class DbConnector(object): """, (vospacePath,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() print(e) raise else: storageType = result[0]["storage_type"] basePath = result[0]["base_path"] userName = result[0]["user_name"] Loading Loading @@ -181,10 +186,11 @@ class DbConnector(object): """, (vospacePath,)) results = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() print(e) raise else: vospacePathList = [] for el in results: vospacePathList.append(el["vos_path"]) Loading Loading @@ -221,10 +227,11 @@ class DbConnector(object): JOIN storage s ON s.storage_id = l.storage_src_id; """) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() print(e) raise else: return result def nodeIsBusy(self, vospacePath): Loading @@ -239,10 +246,11 @@ class DbConnector(object): """, (vospacePath,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() print(e) raise else: if result[0]["job_id"]: return True else: Loading @@ -257,10 +265,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() print(e) raise else: if result: return True else: Loading @@ -276,9 +285,11 @@ class DbConnector(object): result = cursor.fetchall() #out.write(f"result: {result}\n\n") #out.close() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: if not result: return json.loads('{ "error": "JOB_NOT_FOUND" }') else: Loading @@ -302,9 +313,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT job_info FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["job_info"] def getJobResults(self, jobId): Loading @@ -314,9 +327,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT results FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["results"] def listActiveJobs(self): Loading @@ -339,9 +354,11 @@ class DbConnector(object): ORDER BY creation_time DESC; """) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: for row in result: for idx in row: el = row[idx] Loading Loading @@ -381,9 +398,11 @@ class DbConnector(object): """, (phase,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: for row in result: for idx in row: el = row[idx] Loading @@ -400,9 +419,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM users WHERE user_name = %s;", (username,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: if result: return True else: Loading @@ -415,9 +436,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT user_id FROM users WHERE user_name = %s;", (username,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["user_id"] def getUserName(self, userId): Loading @@ -427,9 +450,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT user_name FROM users WHERE user_id = %s;", (userId,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["user_name"] def getUserEmail(self, userId): Loading @@ -439,9 +464,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT e_mail FROM users WHERE user_id = %s;", (userId,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["e_mail"] ##### Storage ##### Loading @@ -458,9 +485,11 @@ class DbConnector(object): """, (path,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: if result: return result[0]["base_path"] else: Loading @@ -473,9 +502,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT base_path FROM storage WHERE storage_id = %s;", (storageId,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["base_path"] def getStorageList(self): Loading @@ -485,9 +516,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM storage WHERE storage_type <> 'local';") result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: return result def getStorageListByType(self, storageType): Loading @@ -497,9 +530,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM storage WHERE storage_type = %s;", (storageType,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: return result def getStorageType(self, basePath): Loading @@ -509,9 +544,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT storage_type FROM storage WHERE base_path = %s;", (basePath,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: if result: return result[0]["storage_type"] else: Loading @@ -524,9 +561,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT storage_id FROM storage WHERE base_path = %s;", (basePath,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: if result: return result[0]["storage_id"] else: Loading @@ -541,9 +580,11 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT location_id FROM location WHERE storage_src_id = %s;", (destStorageId,)) result = cursor.fetchall() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["location_id"] Loading Loading @@ -602,9 +643,10 @@ class DbConnector(object): jobObj.errorMessage, jobObj.errorType,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise def setStartTime(self, jobId): """Sets the job 'start_time' parameter.""" Loading @@ -618,9 +660,10 @@ class DbConnector(object): """, (startTime, jobId,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise def setEndTime(self, jobId): """Sets the job 'end_time' parameter.""" Loading @@ -634,9 +677,10 @@ class DbConnector(object): """, (endTime, jobId,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise def setPhase(self, jobId, phase): """Sets the job 'phase' parameter.""" Loading @@ -649,9 +693,10 @@ class DbConnector(object): """, (phase, jobId,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise def setTotalBlocks(self, jobId, totalBlocks): """ Loading @@ -667,9 +712,10 @@ class DbConnector(object): """, (totalBlocks, jobId,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise def updateProcessedBlocks(self, jobId, processedBlocks): """ Loading @@ -685,9 +731,10 @@ class DbConnector(object): """, (processedBlocks, jobId,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise def setResults(self, jobId, results): """Sets the job 'results' parameter.""" Loading @@ -701,9 +748,10 @@ class DbConnector(object): (json.dumps(results), jobId,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise ##### Node ##### Loading @@ -730,10 +778,11 @@ class DbConnector(object): #out.write(f"parentLtreePath: {parentLtreePath}\n") #out.write(f"parentPath: {node.parentPath}\n\n") #out.close() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: try: #print(f"parentLtreePath: {parentLtreePath}, type: {type(parentLtreePath)}") cursor.execute(""" Loading Loading @@ -768,13 +817,15 @@ class DbConnector(object): node.contentMD5,)) result = cursor.fetchall() conn.commit() except Exception: if not conn.closed: conn.rollback() raise else: if result: return True else: return False except Exception as e: if not conn.closed: conn.rollback() def setAsyncTrans(self, nodeVOSPath, value): """Sets the 'async_trans' flag for a VOSpace node.""" Loading @@ -788,9 +839,10 @@ class DbConnector(object): """, (value, nodeVOSPath,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise def setJobId(self, nodeVOSPath, value): """Sets the 'job_id' flag for a VOSpace node.""" Loading @@ -804,9 +856,10 @@ class DbConnector(object): """, (value, nodeVOSPath,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise def setPhyDeletedOn(self, nodeId): """Sets the 'phy_deleted_on' flag for a VOSpace deleted node.""" Loading @@ -820,9 +873,10 @@ class DbConnector(object): """, (phyDeletedOn, nodeId,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise def updateGroupRead(self, groupToAdd, groupToRemove, nodeVOSPath): with self.getConnection() as conn: Loading @@ -838,9 +892,10 @@ class DbConnector(object): groupToRemove, nodeVOSPath,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise def updateGroupWrite(self, groupToAdd, groupToRemove, nodeVOSPath): with self.getConnection() as conn: Loading @@ -856,9 +911,10 @@ class DbConnector(object): groupToRemove, nodeVOSPath,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise ##### Storage ##### Loading @@ -880,13 +936,12 @@ class DbConnector(object): basePath, baseUrl, hostname,)) storageSrcId = cursor.fetchall()[0]["storage_id"] except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: if storageType == "cold" or storageType == "hot": try: cursor.execute(""" Loading @@ -897,9 +952,11 @@ class DbConnector(object): AND hostname = 'localhost'; """) storageDestId = cursor.fetchall()[0]["storage_id"] except Exception as e: except Exception: if not conn.closed: conn.rollback() raise else: locationType = "async" else: storageDestId = storageSrcId Loading @@ -914,11 +971,12 @@ class DbConnector(object): (locationType, storageSrcId, storageDestId,)) conn.commit() except: except Exception: if not conn.closed: conn.rollback() raise else: return True else: return False Loading @@ -940,10 +998,11 @@ class DbConnector(object): """, (storageId,)) result = cursor.fetchall() except: except Exception: if not conn.closed: conn.rollback() raise else: if result[0]["res"]: return False else: Loading @@ -954,9 +1013,11 @@ class DbConnector(object): """, (storageId,)) conn.commit() except: except Exception: if not conn.closed: conn.rollback() raise else: return True ##### Users ##### Loading @@ -978,6 +1039,7 @@ class DbConnector(object): username, email,)) conn.commit() except Exception as e: except Exception: if not conn.closed: conn.rollback() raise
transfer_service/exceptions.py +9 −1 Original line number Diff line number Diff line Loading @@ -5,7 +5,15 @@ class Error(Exception): pass # RAP exceptions # FileGrouper exceptions class TarFileCreationException(Error): def __init__(self, folder): self.message = "Error: cannot create a .tar for " + folder super(MultipleUsersException, self).__init__(self.message) # RapClient exceptions class MultipleUsersException(Error): def __init__(self): Loading
transfer_service/file_grouper.py +3 −1 Original line number Diff line number Diff line Loading @@ -5,6 +5,8 @@ import shutil import subprocess import sys from exceptions import TarFileCreationException class FileGrouper(object): Loading Loading @@ -61,7 +63,7 @@ class FileGrouper(object): os.chdir(parent) sp = subprocess.run(["tar", "-cf", os.path.basename(folder) + ".tar", os.path.basename(folder)], capture_output = True) if(sp.returncode or sp.stderr): sys.exit(f"Error: cannot create a .tar for {folder}") raise(TarFileCreationException(folder)) else: try: shutil.rmtree(folder) Loading
transfer_service/job_queue.py +47 −28 Original line number Diff line number Diff line #!/usr/bin/env python # # A FIFO queue based on a Redis list # A FIFO queue based on Redis lists # # Loading @@ -23,7 +23,12 @@ class JobQueue(object): def len(self): """Returns the number of jobs in the current queue.""" return self.redisCli.llen(self.queueName) try: numJobs = self.redisCli.llen(self.queueName) except Exception: raise else: return numJobs def name(self): """Returns the name of the current queue.""" Loading @@ -31,6 +36,7 @@ class JobQueue(object): def getJob(self): """Gets a copy of the first job without moving it out from the current queue.""" try: job = json.loads(self.redisCli.lrange(self.queueName, self.len() - 1, self.len() - 1)[0].decode("utf-8")) jobObj = Job() jobObj.setId(job["jobId"]) Loading @@ -42,6 +48,9 @@ class JobQueue(object): jobObj.setEndTime(job["endTime"]) jobObj.setExecutionDuration(job["executionDuration"]) jobObj.setInfo(job["jobInfo"]) except Exception: raise else: return jobObj def insertJob(self, jobObj): Loading @@ -58,10 +67,14 @@ class JobQueue(object): "parameters": jobObj.parameters, "results": jobObj.results, "jobInfo": jobObj.jobInfo } try: self.redisCli.lpush(self.queueName, json.dumps(data)) except Exception: raise def extractJob(self): """Moves out a job from the end of the current queue.""" try: job = json.loads(self.redisCli.brpop(self.queueName)[1].decode("utf-8")) jobObj = Job() jobObj.setId(job["jobId"]) Loading @@ -73,6 +86,9 @@ class JobQueue(object): jobObj.setEndTime(job["endTime"]) jobObj.setExecutionDuration(job["executionDuration"]) jobObj.setInfo(job["jobInfo"]) except Exception: raise else: return jobObj def moveJobTo(self, nextQueueName): Loading @@ -81,7 +97,10 @@ class JobQueue(object): at the beginning of the next queue (this operation is atomic) DEPRECATED """ try: self.redisCli.brpoplpush(self.queueName, nextQueueName) except Exception: raise # Test Loading
transfer_service/mailer.py +5 −8 Original line number Diff line number Diff line Loading @@ -63,17 +63,14 @@ class Mailer(object): try: smtpObj = smtplib.SMTP(self.smtpServer, self.smtpPort) smtpObj.send_message(self.message) logMsg = "E-mail message sent successfully!" self.logger.debug(logMsg) except SMTPConnectError: logMsg = "Cannot connect to SMTP server." self.logger.exception(logMsg) self.logger.exception("Cannot connect to SMTP server.") except TimeoutError: logMsg = "Connection timeout." self.logger.exception(logMsg) self.logger.exception("Connection timeout.") except SMTPException: logMsg = "Cannot send email message." self.logger.exception(logMsg) self.logger.exception("Cannot send email message.") else: self.logger.debug("E-mail message sent successfully!") else: self.logger.debug("E-mail notifications disabled.") Loading