Commit 4760c750 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Added code to handle 'jobId' parameter for a VOSpace node. This will likely...


Added code to handle 'jobId' parameter for a VOSpace node. This will likely replace the 'busyState' parameter.

Signed-off-by: default avatarCristiano Urban <cristiano.urban@inaf.it>
parent df2f7a04
Loading
Loading
Loading
Loading
+32 −15
Original line number Diff line number Diff line
@@ -168,7 +168,7 @@ class DbConnector(object):
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,))
                out = open("db_connector_log.txt", "a")
                #out = open("db_connector_log.txt", "a")
                result = cursor.fetchall()
                #out.write(f"result: {result}\n\n")
                #out.close()
@@ -187,8 +187,8 @@ class DbConnector(object):
                el = job[idx]
                if isinstance(el, datetime.datetime):
                    job[idx] = el.isoformat()
            out.write(f"job: {job}\n\n")
            out.close()
            #out.write(f"job: {job}\n\n")
            #out.close()
            return job

    def getJobInfo(self, jobId):
@@ -592,26 +592,25 @@ class DbConnector(object):
        """Inserts a VOSpace node. Returns 'True' on success, 'False' otherwise."""
        with self.getConnection() as conn:
            try:
                out = open("db_connector_log.txt", "a")
                out.write(f"parentOSPath: {node.parentPath}\n")
                out.write(f"name: {node.name}\n")
                #out = open("db_connector_log.txt", "a")
                #out.write(f"parentOSPath: {node.parentPath}\n")
                #out.write(f"name: {node.name}\n")
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    SELECT path FROM node WHERE node_id = id_from_vos_path(%s);
                    """,
                    (node.parentPath,))
                result = cursor.fetchall()
                for i in result:
                    out.write(f"queryResult: {i}\n")
                #parentLtreePath = self.cursor.fetchone()[0]
                #for i in result:
                #    out.write(f"queryResult: {i}\n")
                parentLtreePath = result[0]["path"]
                parentLtreeRelativePath = ""
                if "." in parentLtreePath:
                    parentLtreeRelativePath = ".".join(parentLtreePath.strip(".").split('.')[1:])
                out.write(f"parentLtreeRelativePath: {parentLtreeRelativePath}\n")
                out.write(f"parentLtreePath: {parentLtreePath}\n")
                out.write(f"parentPath: {node.parentPath}\n\n")
                out.close()
                #out.write(f"parentLtreeRelativePath: {parentLtreeRelativePath}\n")
                #out.write(f"parentLtreePath: {parentLtreePath}\n")
                #out.write(f"parentPath: {node.parentPath}\n\n")
                #out.close()
            except Exception as e:
                if not conn.closed:
                    conn.rollback()
@@ -628,10 +627,11 @@ class DbConnector(object):
                                     async_trans,
                                     sticky,
                                     busy_state,
                                     job_id,
                                     creator_id,
                                     content_length,
                                     content_md5)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                    ON CONFLICT
                    DO NOTHING
                    RETURNING node_id;
@@ -645,7 +645,8 @@ class DbConnector(object):
                     node.asyncTrans,
                     node.sticky,
                     node.busyState,
                     node.creatorID,
                     node.jobId,
                     node.creatorId,
                     node.contentLength,
                     node.contentMD5,))
                result = cursor.fetchall()
@@ -711,6 +712,22 @@ class DbConnector(object):
                if not conn.closed:
                    conn.rollback()
                    
    def setJobId(self, nodeVOSPath, value):
        """Sets the 'job_id' flag for a VOSpace node."""
        with self.getConnection() as conn:
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    UPDATE node c SET job_id = %s
                    FROM node n
                    WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s);
                    """,
                    (value, nodeVOSPath,))
                conn.commit()
            except Exception as e:
                if not conn.closed:
                    conn.rollback()
                    
    def setPhyDeletedOn(self, nodeId):
        """Sets the 'phy_deleted_on' flag for a VOSpace deleted node."""
        with self.getConnection() as conn:
+4 −2
Original line number Diff line number Diff line
@@ -88,7 +88,8 @@ class ImportExecutor(TaskExecutor):
                    cnode.setParentPath(parentPath)
                    locationId = self.dbConn.getLocationId(self.storageId)
                    cnode.setLocationId(locationId)
                    cnode.setCreatorID(self.userId)
                    cnode.setBusyState(False)
                    cnode.setCreatorId(self.userId)
                    cnode.setContentLength(0)
                    cnode.setAsyncTrans(True)
                    cnode.setSticky(True)
@@ -127,7 +128,8 @@ class ImportExecutor(TaskExecutor):
                    self.storageId = self.dbConn.getStorageId(self.pathPrefix)
                    locationId = self.dbConn.getLocationId(self.storageId)
                    dnode.setLocationId(locationId)
                    dnode.setCreatorID(self.userId)
                    dnode.setBusyState(False)
                    dnode.setCreatorId(self.userId)
                    dnode.setContentLength(os.path.getsize(file))
                    dnode.setContentMD5(self.md5calc.getMD5(file))
                    dnode.setAsyncTrans(True)
+7 −3
Original line number Diff line number Diff line
@@ -17,7 +17,8 @@ class Node(object):
        self.asyncTrans = False
        self.sticky = False
        self.busyState = None
        self.creatorID = None
        self.jobId = None
        self.creatorId = None
        self.groupRead = []
        self.groupWrite = []
        self.visibility = False
@@ -75,8 +76,11 @@ class Node(object):
    def setBusyState(self, busyState):
        self.busyState = busyState
        
    def setCreatorID(self, creatorID):
        self.creatorID = creatorID
    def setJobId(self, jobId):
        self.jobId = jobId

    def setCreatorId(self, creatorId):
        self.creatorId = creatorId

    def setGroupRead(self, groupRead):
        self.groupRead = groupRead
+3 −2
Original line number Diff line number Diff line
@@ -78,8 +78,9 @@ class StoreExecutor(TaskExecutor):
        self.dbConn.setResults(self.jobId, results)
        for nodeVOSPath in self.nodeList:
            out.write(f"nodeListElement: {nodeVOSPath}\n")
            self.dbConn.setAsyncTrans(nodeVOSPath, True);
            self.dbConn.setBusyState(nodeVOSPath, False);
            self.dbConn.setAsyncTrans(nodeVOSPath, True)
            self.dbConn.setBusyState(nodeVOSPath, False)
            self.dbConn.setJobId(nodeVOSPath, None)
        self.jobObj.setPhase("COMPLETED")
        self.dbConn.setPhase(self.jobId, "COMPLETED")
        self.dbConn.setEndTime(self.jobId)
+4 −2
Original line number Diff line number Diff line
@@ -138,7 +138,8 @@ class StorePreprocessor(TaskExecutor):
                locationId = self.dbConn.getLocationId(self.storageId)
                cnode.setLocationId(locationId)
                cnode.setBusyState(True)
                cnode.setCreatorID(self.userId)
                cnode.setJobId(self.jobId)
                cnode.setCreatorId(self.userId)
                cnode.setContentLength(0)
                cnode.setSticky(True)

@@ -173,7 +174,8 @@ class StorePreprocessor(TaskExecutor):
                    locationId = self.dbConn.getLocationId(self.storageId)
                    dnode.setLocationId(locationId)
                    dnode.setBusyState(True)
                    dnode.setCreatorID(self.userId)
                    dnode.setJobId(self.jobId)
                    dnode.setCreatorId(self.userId)
                    dnode.setContentLength(os.path.getsize(file))
                    dnode.setContentMD5(self.md5calc.getMD5(file))
                    dnode.setSticky(True)