Commit 1e4c0cf4 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Added VOSpace nodes cleanup procedure on 'vos_data' job failure.

parent f6ae9604
Loading
Loading
Loading
Loading
+18 −0
Original line number Diff line number Diff line
@@ -996,6 +996,24 @@ class DbConnector(object):
        finally:
            self.connPool.putconn(conn, close = False)
            
    def deleteNodesByJobId(self, jobId):
        """Deletes all VOSpace nodes having a certain 'job_id'."""
        conn = self.getConnection()
        try:
            cursor = conn.cursor(cursor_factory = RealDictCursor)
            cursor.execute("""
                DELETE FROM node WHERE job_id = %s;
                """,
                (jobId,))
            conn.commit()
            cursor.close()
        except Exception:
            if not conn.closed:
                conn.rollback()
            raise
        finally:
            self.connPool.putconn(conn, close = False)

    def setAsyncTrans(self, nodeVOSPath, value):
        """Sets the 'async_trans' flag for a VOSpace node."""
        conn = self.getConnection()
+3 −0
Original line number Diff line number Diff line
@@ -195,6 +195,9 @@ class StoreExecutor(TaskExecutor):
                self.jobObj.setEndTime(datetime.datetime.now().isoformat())
                self.dbConn.insertJob(self.jobObj)
                self.logger.info("Job phase updated to ERROR.")
                self.logger.info("Removing VOSpace nodes from the database...")
                self.dbConn.deleteNodesByJobId(self.jobId)
                self.logger.info("Database cleanup completed")
            
                msg = f"""
        ########## VOSpace data storage procedure summary ##########
+3 −0
Original line number Diff line number Diff line
@@ -253,6 +253,9 @@ class StorePreprocessor(TaskExecutor):
                self.dbConn.insertJob(self.jobObj)
                self.setDestinationQueueName("write_terminated")
                self.logger.info("Job phase updated to ERROR.")
                self.logger.info("Removing VOSpace nodes from the database...")
                self.dbConn.deleteNodesByJobId(self.jobId)
                self.logger.info("Database cleanup completed")
                
                msg = f"""
        Dear user,