Loading transfer_service/import_amqp_server.py +46 −20 Original line number Diff line number Diff line import os import re from amqp_server import AMQPServer from config import Config from checksum import Checksum from db_connector import DbConnector from node import Node from system_utils import SystemUtils from config import Config from tape_client import TapeClient class ImportAMQPServer(AMQPServer): Loading @@ -19,6 +22,11 @@ class ImportAMQPServer(AMQPServer): self.params["host"], self.params.getint("port"), self.params["db"]) self.params = config.loadSection("spectrum_archive") self.tapeClient = TapeClient(self.params["host"], self.params.getint("port"), self.params["user"], self.params["password"]) self.systemUtils = SystemUtils() self.path = None self.username = None Loading Loading @@ -48,6 +56,7 @@ class ImportAMQPServer(AMQPServer): if pathPrefix: storageId = self.dbConn.getStorageId(pathPrefix) storageType = self.dbConn.getStorageType(pathPrefix) else: response = { "responseType": "ERROR", "errorCode": 3, Loading Loading @@ -80,8 +89,12 @@ class ImportAMQPServer(AMQPServer): self.dbConn.disconnect() return response else: if storageType == "cold": self.tapeClient.recallCheksumFiles(self.path) [ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(self.path)) tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper") for dir in dirs: out.write(f"DIR dir: {dir}\n") out.write(f"DIR pathPrefix: {pathPrefix}\n\n") Loading @@ -90,12 +103,19 @@ class ImportAMQPServer(AMQPServer): parentPath = os.path.dirname(dir).split(pathPrefix)[1] nodeName = os.path.basename(dir) cnode = Node(nodeName, "container") if not tstampWrapperDirPattern.match("/" + nodeName): if tstampWrapperDirPattern.search(parentPath): tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/') parentPath = tstampWrapperDirPattern.sub("", parentPath) cnode.setWrapperDir(tstampWrapperDir) if parentPath == '/': vospacePath = parentPath + nodeName else: vospacePath = parentPath + '/' + nodeName cnode = Node(nodeName, "container") cnode.setParentPath(parentPath) locationId = self.dbConn.getLocationId(storageId) cnode.setLocationId(locationId) Loading @@ -118,8 +138,14 @@ class ImportAMQPServer(AMQPServer): out.write(f"FILE parentPath: {parentPath}\n") nodeName = os.path.basename(file) out.write(f"FILE nodeName: {nodeName}\n") vospacePath = parentPath + '/' + nodeName dnode = Node(nodeName, "data") if tstampWrapperDirPattern.search(parentPath): tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/') parentPath = tstampWrapperDirPattern.sub("", parentPath) dnode.setWrapperDir(tstampWrapperDir) vospacePath = parentPath + '/' + nodeName out.write(f"FILE vospacePath: {vospacePath}\n") dnode.setParentPath(parentPath) storageId = self.dbConn.getStorageId(pathPrefix) Loading Loading
transfer_service/import_amqp_server.py +46 −20 Original line number Diff line number Diff line import os import re from amqp_server import AMQPServer from config import Config from checksum import Checksum from db_connector import DbConnector from node import Node from system_utils import SystemUtils from config import Config from tape_client import TapeClient class ImportAMQPServer(AMQPServer): Loading @@ -19,6 +22,11 @@ class ImportAMQPServer(AMQPServer): self.params["host"], self.params.getint("port"), self.params["db"]) self.params = config.loadSection("spectrum_archive") self.tapeClient = TapeClient(self.params["host"], self.params.getint("port"), self.params["user"], self.params["password"]) self.systemUtils = SystemUtils() self.path = None self.username = None Loading Loading @@ -48,6 +56,7 @@ class ImportAMQPServer(AMQPServer): if pathPrefix: storageId = self.dbConn.getStorageId(pathPrefix) storageType = self.dbConn.getStorageType(pathPrefix) else: response = { "responseType": "ERROR", "errorCode": 3, Loading Loading @@ -80,8 +89,12 @@ class ImportAMQPServer(AMQPServer): self.dbConn.disconnect() return response else: if storageType == "cold": self.tapeClient.recallCheksumFiles(self.path) [ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(self.path)) tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper") for dir in dirs: out.write(f"DIR dir: {dir}\n") out.write(f"DIR pathPrefix: {pathPrefix}\n\n") Loading @@ -90,12 +103,19 @@ class ImportAMQPServer(AMQPServer): parentPath = os.path.dirname(dir).split(pathPrefix)[1] nodeName = os.path.basename(dir) cnode = Node(nodeName, "container") if not tstampWrapperDirPattern.match("/" + nodeName): if tstampWrapperDirPattern.search(parentPath): tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/') parentPath = tstampWrapperDirPattern.sub("", parentPath) cnode.setWrapperDir(tstampWrapperDir) if parentPath == '/': vospacePath = parentPath + nodeName else: vospacePath = parentPath + '/' + nodeName cnode = Node(nodeName, "container") cnode.setParentPath(parentPath) locationId = self.dbConn.getLocationId(storageId) cnode.setLocationId(locationId) Loading @@ -118,8 +138,14 @@ class ImportAMQPServer(AMQPServer): out.write(f"FILE parentPath: {parentPath}\n") nodeName = os.path.basename(file) out.write(f"FILE nodeName: {nodeName}\n") vospacePath = parentPath + '/' + nodeName dnode = Node(nodeName, "data") if tstampWrapperDirPattern.search(parentPath): tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/') parentPath = tstampWrapperDirPattern.sub("", parentPath) dnode.setWrapperDir(tstampWrapperDir) vospacePath = parentPath + '/' + nodeName out.write(f"FILE vospacePath: {vospacePath}\n") dnode.setParentPath(parentPath) storageId = self.dbConn.getStorageId(pathPrefix) Loading