Commit 95d3b29e authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Removed 'setSticky()', 'nodeExists()' methods from DbConnector + updated...


Removed 'setSticky()', 'nodeExists()' methods from DbConnector + updated 'insertNode()' + decreased num of connections to db for each class, added 'storageType' param to DataRpcServer + added basic handling for TimeoutError exception in Mailer class + code cleanup.

Signed-off-by: default avatarCristiano Urban <cristiano.urban@inaf.it>
parent 21d1566f
Loading
Loading
Loading
Loading
+5 −1
Original line number Diff line number Diff line
@@ -53,6 +53,7 @@ DESCRIPTION
            sys.exit("FATAL: Malformed response, storage acknowledge expected.\n")
        elif storeResponse["responseType"] == "STORE_ACK":
            storageList = storeResponse["storageList"]
            storageType = storageList[0]["storage_type"]
            if not storageList:
                sys.exit("No storage point found. Please add a storage point using the 'vos_storage' command.\n")
            print("Choose one of the following storage locations:")
@@ -89,7 +90,10 @@ DESCRIPTION
                except EOFError:
                    print("\nPlease, use CTRL+C to quit.")
            if confirm == "yes":
                confirmRequest = { "requestType": "STORE_CON", "userName": username, "storageId": storageId }
                confirmRequest = { "requestType": "STORE_CON", 
                                   "userName": username, 
                                   "storageId": storageId, 
                                   "storageType": storageType }
                confirmResponse = self.call(confirmRequest)
                if "responseType" not in confirmResponse:
                    sys.exit("\nFATAL: Malformed response, storage confirmation expected.\n")
+2 −2
Original line number Diff line number Diff line
@@ -16,8 +16,8 @@ class AbortJobRPCServer(RedisRPCServer):
                                  self.params["host"],
                                  self.params.getint("port"),
                                  self.params["db"],
                                  8,
                                  16)
                                  4,
                                  8)
        super(AbortJobRPCServer, self).__init__(host, port, db, rpcQueue)

    def callback(self, requestBody):
+3 −2
Original line number Diff line number Diff line
@@ -30,8 +30,8 @@ class DataRPCServer(RedisRPCServer):
                                  self.params["host"],
                                  self.params.getint("port"),
                                  self.params["db"],
                                  8,
                                  16)
                                  4,
                                  8)
        self.params = config.loadSection("transfer_node")
        self.storageStorePath = self.params["store_path"]
        self.params = config.loadSection("scheduling")
@@ -100,6 +100,7 @@ class DataRPCServer(RedisRPCServer):
            self.dbConn.insertJob(job)
            dbResponse = self.dbConn.getJob(job.jobId)
            job.jobInfo["storageId"] = requestBody["storageId"]
            job.jobInfo["storageType"] = requestBody["storageType"]
            self.pendingQueueWrite.insertJob(job)
            if "error" in dbResponse:
                response = { "responseType": "ERROR",
+12 −39
Original line number Diff line number Diff line
@@ -51,23 +51,6 @@ class DbConnector(object):

    ##### Node #####

    def nodeExists(self, node):
        """Checks if a VOSpace node already exists. Returns a boolean."""
        with self.getConnection() as conn:
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                nodeVOSPath = node.parentPath + '/' + node.name
                cursor.execute("SELECT * FROM node_vos_path WHERE vos_path = %s;", (nodeVOSPath,))
                result = cursor.fetchall()
            except Exception as e:
                if not conn.closed:
                    conn.rollback()
                print(e)
        if result:
            return True
        else:
            return False

    def getOSPath(self, vospacePath):
        """Returns a list containing full path, storage type and username for a VOSpace path."""
        with self.getConnection() as conn:
@@ -614,7 +597,7 @@ class DbConnector(object):
    ##### Node #####

    def insertNode(self, node):
        """Inserts a VOSpace node."""
        """Inserts a VOSpace node. Returns 'True' on success, 'False' otherwise."""
        with self.getConnection() as conn:
            try:
                out = open("db_connector_log.txt", "a")
@@ -652,11 +635,15 @@ class DbConnector(object):
                                     tstamp_wrapper_dir,
                                     type,
                                     location_id,
                                     sticky,
                                     busy_state,
                                     creator_id,
                                     content_length,
                                     content_md5)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                    ON CONFLICT
                    DO NOTHING
                    RETURNING node_id;
                    """,
                    (parentLtreePath,
                     parentLtreeRelativePath,
@@ -664,11 +651,17 @@ class DbConnector(object):
                     node.wrapperDir,
                     node.type,
                     node.locationId,
                     node.sticky,
                     node.busyState,
                     node.creatorID,
                     node.contentLength,
                     node.contentMD5,))
                result = cursor.fetchall()
                conn.commit()
                if result:
                    return True
                else:
                    return False
            except Exception as e:
                if not conn.closed:
                    conn.rollback()
@@ -715,26 +708,6 @@ class DbConnector(object):
                if not conn.closed:
                    conn.rollback()

    def setSticky(self, nodeVOSPath, value):
        """Sets the 'sticky' flag for a VOSpace node."""
        with self.getConnection() as conn:
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    UPDATE node SET sticky = %s
                    WHERE path <@
                    (SELECT path
                     FROM node_path p
                     JOIN node n ON p.node_id = n.node_id
                     WHERE p.vos_path = %s);
                    """,
                    (value, nodeVOSPath,))
                conn.commit()
            except Exception as e:
                if not conn.closed:
                    conn.rollback()


    def setBusyState(self, nodeVOSPath, value):
        """Sets the 'busy_state' flag for a VOSpace node."""
        with self.getConnection() as conn:
+2 −2
Original line number Diff line number Diff line
@@ -18,8 +18,8 @@ class GetJobRPCServer(RedisRPCServer):
                                  self.params["host"],
                                  self.params.getint("port"),
                                  self.params["db"],
                                  8,
                                  16)
                                  4,
                                  8)
        super(GetJobRPCServer, self).__init__(host, port, db, rpcQueue)

    def callback(self, requestBody):
Loading