Loading transfer_service/db_connector.py +32 −3 Original line number Diff line number Diff line import datetime import json import psycopg2 import psycopg2.extras Loading Loading @@ -99,6 +100,10 @@ class DbConnector(object): if not result: return json.loads('{ "error": "JOB_NOT_FOUND" }') else: for idx in result[0]: el = result[0][idx] if isinstance(el, datetime.datetime): result[0][idx] = str(el) return result[0] def getRapId(self, username): Loading Loading @@ -204,8 +209,17 @@ class DbConnector(object): out.close() #print(f"parentLtreePath: {parentLtreePath}, type: {type(parentLtreePath)}") self.cursor.execute(""" INSERT INTO node(parent_path, parent_relative_path, name, tstamp_wrapper_dir, type, location_id, owner_id, creator_id, content_md5) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s); INSERT INTO node(parent_path, parent_relative_path, name, tstamp_wrapper_dir, type, location_id, busy_state, owner_id, creator_id, content_md5) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """, (parentLtreePath, parentLtreeRelativePath, Loading @@ -213,13 +227,14 @@ class DbConnector(object): node.wrapperDir, node.type, node.locationId, node.busyState, node.ownerID, node.creatorID, node.contentMD5,)) self.conn.commit() def setAsyncTrans(self, nodeVOSPath, value): """Sets the 'asyncTrans' flag for a VOSpace node.""" """Sets the 'async_trans' flag for a VOSpace node.""" if self.conn: self.cursor.execute(""" UPDATE node SET async_trans = %s Loading @@ -231,3 +246,17 @@ class DbConnector(object): """, (value, nodeVOSPath,)) self.conn.commit() def setBusyState(self, nodeVOSPath, value): """Sets the 'busy_state' flag for a VOSpace node.""" if self.conn: self.cursor.execute(""" UPDATE node SET busy_state = %s WHERE path <@ (SELECT path FROM node_path p JOIN node n ON p.node_id = n.node_id WHERE p.vos_path = %s); """, (value, nodeVOSPath,)) self.conn.commit() transfer_service/store_executor.py +1 −4 Original line number Diff line number Diff line Loading @@ -20,10 +20,6 @@ class StoreExecutor(TaskExecutor): self.params["password"]) self.params = config.loadSection("transfer_node") self.storageStorePath = self.params["store_path"] #self.params = config.loadSection("tape_library") #self.tapeStorageBasePath = self.params["base_path"] #self.params = config.loadSection("ia2_server") #self.serverStorageBasePath = self.params["base_path"] self.params = config.loadSection("file_catalog") self.dbConn = DbConnector(self.params["user"], self.params["password"], Loading Loading @@ -82,6 +78,7 @@ class StoreExecutor(TaskExecutor): self.dbConn.connect() for nodeVOSPath in self.nodeList: self.dbConn.setAsyncTrans(nodeVOSPath, True); self.dbConn.setBusyState(nodeVOSPath, False); self.dbConn.disconnect() def run(self): Loading transfer_service/store_preprocessor.py +2 −0 Original line number Diff line number Diff line Loading @@ -160,6 +160,7 @@ class StorePreprocessor(TaskExecutor): cnode.setParentPath(basePath) locationId = self.dbConn.getLocationId(self.storageId) cnode.setLocationId(locationId) cnode.setBusyState(True) cnode.setOwnerID(self.userId) cnode.setCreatorID(self.userId) if not self.dbConn.nodeExists(cnode): Loading Loading @@ -187,6 +188,7 @@ class StorePreprocessor(TaskExecutor): dnode.setParentPath(basePath) locationId = self.dbConn.getLocationId(self.storageId) dnode.setLocationId(locationId) dnode.setBusyState(True) dnode.setOwnerID(self.userId) dnode.setCreatorID(self.userId) dnode.setContentMD5(self.md5calc.getMD5(file)) Loading Loading
transfer_service/db_connector.py +32 −3 Original line number Diff line number Diff line import datetime import json import psycopg2 import psycopg2.extras Loading Loading @@ -99,6 +100,10 @@ class DbConnector(object): if not result: return json.loads('{ "error": "JOB_NOT_FOUND" }') else: for idx in result[0]: el = result[0][idx] if isinstance(el, datetime.datetime): result[0][idx] = str(el) return result[0] def getRapId(self, username): Loading Loading @@ -204,8 +209,17 @@ class DbConnector(object): out.close() #print(f"parentLtreePath: {parentLtreePath}, type: {type(parentLtreePath)}") self.cursor.execute(""" INSERT INTO node(parent_path, parent_relative_path, name, tstamp_wrapper_dir, type, location_id, owner_id, creator_id, content_md5) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s); INSERT INTO node(parent_path, parent_relative_path, name, tstamp_wrapper_dir, type, location_id, busy_state, owner_id, creator_id, content_md5) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """, (parentLtreePath, parentLtreeRelativePath, Loading @@ -213,13 +227,14 @@ class DbConnector(object): node.wrapperDir, node.type, node.locationId, node.busyState, node.ownerID, node.creatorID, node.contentMD5,)) self.conn.commit() def setAsyncTrans(self, nodeVOSPath, value): """Sets the 'asyncTrans' flag for a VOSpace node.""" """Sets the 'async_trans' flag for a VOSpace node.""" if self.conn: self.cursor.execute(""" UPDATE node SET async_trans = %s Loading @@ -231,3 +246,17 @@ class DbConnector(object): """, (value, nodeVOSPath,)) self.conn.commit() def setBusyState(self, nodeVOSPath, value): """Sets the 'busy_state' flag for a VOSpace node.""" if self.conn: self.cursor.execute(""" UPDATE node SET busy_state = %s WHERE path <@ (SELECT path FROM node_path p JOIN node n ON p.node_id = n.node_id WHERE p.vos_path = %s); """, (value, nodeVOSPath,)) self.conn.commit()
transfer_service/store_executor.py +1 −4 Original line number Diff line number Diff line Loading @@ -20,10 +20,6 @@ class StoreExecutor(TaskExecutor): self.params["password"]) self.params = config.loadSection("transfer_node") self.storageStorePath = self.params["store_path"] #self.params = config.loadSection("tape_library") #self.tapeStorageBasePath = self.params["base_path"] #self.params = config.loadSection("ia2_server") #self.serverStorageBasePath = self.params["base_path"] self.params = config.loadSection("file_catalog") self.dbConn = DbConnector(self.params["user"], self.params["password"], Loading Loading @@ -82,6 +78,7 @@ class StoreExecutor(TaskExecutor): self.dbConn.connect() for nodeVOSPath in self.nodeList: self.dbConn.setAsyncTrans(nodeVOSPath, True); self.dbConn.setBusyState(nodeVOSPath, False); self.dbConn.disconnect() def run(self): Loading
transfer_service/store_preprocessor.py +2 −0 Original line number Diff line number Diff line Loading @@ -160,6 +160,7 @@ class StorePreprocessor(TaskExecutor): cnode.setParentPath(basePath) locationId = self.dbConn.getLocationId(self.storageId) cnode.setLocationId(locationId) cnode.setBusyState(True) cnode.setOwnerID(self.userId) cnode.setCreatorID(self.userId) if not self.dbConn.nodeExists(cnode): Loading Loading @@ -187,6 +188,7 @@ class StorePreprocessor(TaskExecutor): dnode.setParentPath(basePath) locationId = self.dbConn.getLocationId(self.storageId) dnode.setLocationId(locationId) dnode.setBusyState(True) dnode.setOwnerID(self.userId) dnode.setCreatorID(self.userId) dnode.setContentMD5(self.md5calc.getMD5(file)) Loading