Loading transfer_service/group_rw_executor.py +86 −48 Original line number Diff line number Diff line Loading @@ -48,26 +48,36 @@ class GroupRwExecutor(TaskExecutor): self.realGroupName = None super(GroupRwExecutor, self).__init__() def updateGroupRw(self): def execute(self): """This method adds/removes groups to/from 'group_read' and 'group_write'.""" try: self.logger.info("++++++++++ Start of execution phase ++++++++++") self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) if self.requestType == "GRPR_ADD": self.logger.info(f"CMD: vos_group read add {self.groupName} {self.vospacePath}") self.dbConn.updateGroupRead(self.groupName, [], self.vospacePath) elif self.requestType == "GRPR_DEL": self.logger.info(f"CMD: vos_group read del {self.groupName} {self.vospacePath}") self.dbConn.updateGroupRead([], self.groupName, self.vospacePath) elif self.requestType == "GRPW_ADD": self.logger.info(f"CMD: vos_group write add {self.groupName} {self.vospacePath}") self.dbConn.updateGroupWrite(self.groupName, [], self.vospacePath) elif self.requestType == "GRPW_DEL": self.logger.info(f"CMD: vos_group write del {self.groupName} {self.vospacePath}") self.dbConn.updateGroupWrite([], self.groupName, self.vospacePath) except Exception: self.logger.exception("FATAL: something went wrong during the execution phase.") return False else: self.logger.info("++++++++++ End of execution phase ++++++++++") return True def update(self, status): try: results = [{"target": ""}] self.dbConn.setResults(self.jobId, results) self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) # Send e-mail notification m = Mailer(self.logger) Loading @@ -76,6 +86,12 @@ class GroupRwExecutor(TaskExecutor): if userEmail != self.adminEmail: m.addRecipient(userEmail) if status == "OK": self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) self.logger.info("Job phase updated to COMPLETED.") if self.requestType == "GRPR_ADD": msg = f""" Added '{self.realGroupName}' to 'group_read' for {self.vospacePath} and any child nodes. Loading @@ -92,8 +108,18 @@ Added '{self.realGroupName}' to 'group_write' for {self.vospacePath} and any chi msg = f""" Removed '{self.realGroupName}' from 'group_write' for {self.vospacePath} and any child nodes. """ else: msg = "FATAL: something went wrong during the execution phase." self.jobObj.setPhase("ERROR") self.jobObj.setErrorType("fatal") self.jobObj.setErrorMessage(msg) self.dbConn.insertJob(self.jobObj) self.dbConn.setEndTime(self.jobId) self.logger.info("Job phase updated to ERROR.") m.setMessage("VOSpace group_rw notification", msg) m.send() except Exception: self.logger.exception(f"FATAL: unable to update the database, job ID: {self.jobId}") def run(self): self.logger.info("Starting group_rw executor...") Loading @@ -101,7 +127,13 @@ Removed '{self.realGroupName}' from 'group_write' for {self.vospacePath} and any self.setDestinationQueueName("group_rw_terminated") while True: self.wait() if self.srcQueue.len() > 0: try: srcQueueLen = self.srcQueue.len() destQueueLen = self.destQueue.len() except Exception: self.logger.exception("Cache error: failed to retrieve queue length.") else: if srcQueueLen > 0: self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.userId = self.jobObj.ownerId Loading @@ -109,10 +141,16 @@ Removed '{self.realGroupName}' from 'group_write' for {self.vospacePath} and any self.vospacePath = self.jobObj.jobInfo["vospacePath"] self.groupName = [ self.jobObj.jobInfo["groupName"] ] self.realGroupName = self.groupName[0].split("people.")[-1].replace("\\", "") self.updateGroupRw() if self.destQueue.len() >= self.maxTerminatedJobs: if self.execute(): self.update("OK") else: self.update("ERROR") try: if self.destQueueLen >= self.maxTerminatedJobs: self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() except Exception: self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") else: self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") Loading
transfer_service/group_rw_executor.py +86 −48 Original line number Diff line number Diff line Loading @@ -48,26 +48,36 @@ class GroupRwExecutor(TaskExecutor): self.realGroupName = None super(GroupRwExecutor, self).__init__() def updateGroupRw(self): def execute(self): """This method adds/removes groups to/from 'group_read' and 'group_write'.""" try: self.logger.info("++++++++++ Start of execution phase ++++++++++") self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) if self.requestType == "GRPR_ADD": self.logger.info(f"CMD: vos_group read add {self.groupName} {self.vospacePath}") self.dbConn.updateGroupRead(self.groupName, [], self.vospacePath) elif self.requestType == "GRPR_DEL": self.logger.info(f"CMD: vos_group read del {self.groupName} {self.vospacePath}") self.dbConn.updateGroupRead([], self.groupName, self.vospacePath) elif self.requestType == "GRPW_ADD": self.logger.info(f"CMD: vos_group write add {self.groupName} {self.vospacePath}") self.dbConn.updateGroupWrite(self.groupName, [], self.vospacePath) elif self.requestType == "GRPW_DEL": self.logger.info(f"CMD: vos_group write del {self.groupName} {self.vospacePath}") self.dbConn.updateGroupWrite([], self.groupName, self.vospacePath) except Exception: self.logger.exception("FATAL: something went wrong during the execution phase.") return False else: self.logger.info("++++++++++ End of execution phase ++++++++++") return True def update(self, status): try: results = [{"target": ""}] self.dbConn.setResults(self.jobId, results) self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) # Send e-mail notification m = Mailer(self.logger) Loading @@ -76,6 +86,12 @@ class GroupRwExecutor(TaskExecutor): if userEmail != self.adminEmail: m.addRecipient(userEmail) if status == "OK": self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) self.logger.info("Job phase updated to COMPLETED.") if self.requestType == "GRPR_ADD": msg = f""" Added '{self.realGroupName}' to 'group_read' for {self.vospacePath} and any child nodes. Loading @@ -92,8 +108,18 @@ Added '{self.realGroupName}' to 'group_write' for {self.vospacePath} and any chi msg = f""" Removed '{self.realGroupName}' from 'group_write' for {self.vospacePath} and any child nodes. """ else: msg = "FATAL: something went wrong during the execution phase." self.jobObj.setPhase("ERROR") self.jobObj.setErrorType("fatal") self.jobObj.setErrorMessage(msg) self.dbConn.insertJob(self.jobObj) self.dbConn.setEndTime(self.jobId) self.logger.info("Job phase updated to ERROR.") m.setMessage("VOSpace group_rw notification", msg) m.send() except Exception: self.logger.exception(f"FATAL: unable to update the database, job ID: {self.jobId}") def run(self): self.logger.info("Starting group_rw executor...") Loading @@ -101,7 +127,13 @@ Removed '{self.realGroupName}' from 'group_write' for {self.vospacePath} and any self.setDestinationQueueName("group_rw_terminated") while True: self.wait() if self.srcQueue.len() > 0: try: srcQueueLen = self.srcQueue.len() destQueueLen = self.destQueue.len() except Exception: self.logger.exception("Cache error: failed to retrieve queue length.") else: if srcQueueLen > 0: self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.userId = self.jobObj.ownerId Loading @@ -109,10 +141,16 @@ Removed '{self.realGroupName}' from 'group_write' for {self.vospacePath} and any self.vospacePath = self.jobObj.jobInfo["vospacePath"] self.groupName = [ self.jobObj.jobInfo["groupName"] ] self.realGroupName = self.groupName[0].split("people.")[-1].replace("\\", "") self.updateGroupRw() if self.destQueue.len() >= self.maxTerminatedJobs: if self.execute(): self.update("OK") else: self.update("ERROR") try: if self.destQueueLen >= self.maxTerminatedJobs: self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() except Exception: self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") else: self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")