Commit 6bb14521 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Added cleanup query for temporary data nodes + fixed 'nodeList' update in...


Added cleanup query for temporary data nodes + fixed 'nodeList' update in 'StorePreprocessor' class.

Signed-off-by: default avatarCristiano Urban <cristiano.urban@inaf.it>
parent f7a71539
Loading
Loading
Loading
Loading
+20 −2
Original line number Diff line number Diff line
@@ -39,8 +39,8 @@ class DbConnector(object):
    def nodeExists(self, node):
        """Checks if a VOSpace node already exists. Returns a boolean."""
        if self.conn:
            nodeOSPath = node.parentPath + '/' + node.name
            self.cursor.execute("SELECT * FROM node_os_path WHERE os_path = %s;", (nodeOSPath,))
            nodeVOSPath = node.parentPath + '/' + node.name
            self.cursor.execute("SELECT * FROM node_vos_path WHERE vos_path = %s;", (nodeVOSPath,))
            result = self.cursor.fetchall()
            if result:
                return True
@@ -270,6 +270,24 @@ class DbConnector(object):
                 node.contentMD5,))
            self.conn.commit()
            
    def deleteTmpDataNode(self, vospacePath):
        """Deletes a temporary VOSpace data node"""
        if self.conn:
            self.cursor.execute("""
                WITH deleted AS (
                DELETE FROM list_of_files 
                WHERE list_node_id =
                (SELECT node_id FROM node_vos_path 
                 WHERE vos_path = %s)
                 RETURNING list_node_id
                ) DELETE FROM node 
                  WHERE node_id = 
                  (SELECT DISTINCT(list_node_id) 
                   FROM deleted);
                """,
                (vospacePath,))
            self.conn.commit()

    def setAsyncTrans(self, nodeVOSPath, value):
        """Sets the 'async_trans' flag for a VOSpace node."""
        if self.conn:
+7 −1
Original line number Diff line number Diff line
@@ -73,7 +73,12 @@ class RetrieveExecutor(TaskExecutor):
        self.dbConn.disconnect()
        
    def cleanup(self):
        pass
        nodeType = self.jobObj.jobInfo["transfer"]["protocols"][0]["param"][0]["value"]
        vospacePath = self.jobObj.jobInfo["transfer"]["target"].split("!vospace")[1]
        if nodeType == "list":
            self.dbConn.connect()
            self.dbConn.deleteTmpDataNode(vospacePath)
            self.dbConn.disconnect()

    def run(self):
        print("Starting retrieve executor...")
@@ -88,6 +93,7 @@ class RetrieveExecutor(TaskExecutor):
                result = self.retrieveData()
                if result:
                    self.updateJobStatus()
                    self.cleanup()
                else:
                    sys.exit("Failed to retrieve data!")
                self.destQueue.insertJob(self.jobObj)
+3 −0
Original line number Diff line number Diff line
@@ -77,12 +77,15 @@ class StoreExecutor(TaskExecutor):

    def update(self):
        self.dbConn.connect()
        out = open("store_executor_log.txt", "a")
        for nodeVOSPath in self.nodeList:
            out.write(f"nodeListElement: {nodeVOSPath}\n")
            self.dbConn.setAsyncTrans(nodeVOSPath, True);
            self.dbConn.setSticky(nodeVOSPath, True);
            self.dbConn.setBusyState(nodeVOSPath, False);
        self.jobObj.setPhase("COMPLETED")
        self.dbConn.setPhase(self.jobId, "COMPLETED")
        out.close()
        self.dbConn.disconnect()

    def run(self):
+2 −1
Original line number Diff line number Diff line
@@ -223,7 +223,8 @@ class StorePreprocessor(TaskExecutor):
                self.prepare(self.username)
                self.execute()
                self.update()
                self.jobObj.jobInfo["nodeList"] = self.nodeList
                self.jobObj.jobInfo["nodeList"] = self.nodeList.copy()
                self.nodeList.clear()
                self.destQueue.insertJob(self.jobObj)
                self.srcQueue.extractJob()                
                print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")