Loading transfer_service/import_amqp_server.py +92 −77 Original line number Diff line number Diff line import os import sys import json import re from amqp_server import AMQPServer from checksum import Checksum Loading @@ -16,7 +15,6 @@ class ImportAMQPServer(AMQPServer): def __init__(self, host, port, queue): self.type = "import" self.md5calc = Checksum() self.storeAck = False config = Config("vos_ts.conf") self.params = config.loadSection("file_catalog") self.dbConn = DbConnector(self.params["user"], Loading @@ -24,8 +22,7 @@ class ImportAMQPServer(AMQPServer): self.params["host"], self.params.getint("port"), self.params["db"]) self.params = config.loadSection("transfer_node") self.storageStorePath = self.params["store_path"] self.systemUtils = SystemUtils() self.path = None self.username = None super(ImportAMQPServer, self).__init__(host, port, queue) Loading @@ -35,30 +32,73 @@ class ImportAMQPServer(AMQPServer): if "requestType" not in requestBody or "path" not in requestBody: response = { "errorCode": 1, "errorMsg": "Malformed request, missing parameters." } elif requestBody["requestType"] == "NODE_IMPORT": self.path = requestBody["path"] self.path = os.path.abspath(requestBody["path"]) self.username = requestBody["userName"] out = open("import_amqp_server_log.txt", "a") self.dbConn.connect() #if os.path.isdir(self.path): userInDb = self.dbConn.userExists(self.username) userInfo = self.systemUtils.userInfo(self.username) out = open("import_amqp_server_log.txt", "a") if not userInfo or not userInDb: response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": "The user does not exist or is not registered in the database." } self.dbConn.disconnect() return response userId = self.dbConn.getRapId(self.username) pathPrefix = self.dbConn.storageBasePathIsValid(self.path) if pathPrefix: storageId = self.dbConn.getStorageId(pathPrefix) [ dirs, files ] = self.scanRecursive() tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper") else: response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": "Invalid storage mount point." } self.dbConn.disconnect() return response if not os.path.exists(self.path): response = { "responseType": "ERROR", "errorCode": 4, "errorMsg": "Path not found." } self.dbConn.disconnect() return response elif not os.path.isdir(self.path): response = { "responseType": "ERROR", "errorCode": 5, "errorMsg": "Directory path expected." } self.dbConn.disconnect() return response elif self.username not in self.path: response = { "responseType": "ERROR", "errorCode": 6, "errorMsg": "Directory path does not contain the username." } self.dbConn.disconnect() return response elif os.path.dirname(self.path) != pathPrefix + '/' + self.username: response = { "responseType": "ERROR", "errorCode": 7, "errorMsg": "Invalid path, directory must be located in " + pathPrefix + '/' + self.username } self.dbConn.disconnect() return response else: [ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(self.path)) for dir in dirs: #pathPrefix = self.dbConn.storageBasePathIsValid(dir) out.write(f"DIR dir: {dir}\n") out.write(f"DIR pathPrefix: {pathPrefix}\n\n") if pathPrefix + '/' + self.username in dir and len(pathPrefix + '/' + self.username) < len(dir): if pathPrefix + '/' + self.username in dir: parentPath = os.path.dirname(dir).split(pathPrefix)[1] nodeName = os.path.basename(dir) if parentPath == '/': vospacePath = parentPath + nodeName else: vospacePath = parentPath + '/' + nodeName cnode = Node(nodeName, "container") if not tstampWrapperDirPattern.match("/" + nodeName): if tstampWrapperDirPattern.search(vospacePath): tstampWrapperDir = tstampWrapperDirPattern.search(vospacePath).group(0).lstrip('/') vospacePath = tstampWrapperDirPattern.sub("", vospacePath) cnode.setWrapperDir(tstampWrapperDir) cnode.setParentPath(parentPath) locationId = self.dbConn.getLocationId(storageId) cnode.setLocationId(locationId) Loading @@ -74,7 +114,6 @@ class ImportAMQPServer(AMQPServer): out.write(f"FILE files: {files}\n") out.write(f"FILE flist: {flist}\n") out.write(f"FILE file: {file}\n") #pathPrefix = self.dbConn.storageBasePathIsValid(file) out.write(f"FILE pathPrefix: {pathPrefix}\n") parentPath = os.path.dirname(file).split(pathPrefix)[1] out.write(f"FILE parentPath: {parentPath}\n") Loading @@ -82,10 +121,6 @@ class ImportAMQPServer(AMQPServer): out.write(f"FILE nodeName: {nodeName}\n") vospacePath = parentPath + '/' + nodeName dnode = Node(nodeName, "data") if tstampWrapperDirPattern.search(vospacePath): tstampWrapperDir = tstampWrapperDirPattern.search(vospacePath).group(0).lstrip('/') vospacePath = tstampWrapperDirPattern.sub("", vospacePath) dnode.setWrapperDir(tstampWrapperDir) out.write(f"FILE vospacePath: {vospacePath}\n") dnode.setParentPath(parentPath) storageId = self.dbConn.getStorageId(pathPrefix) Loading @@ -95,39 +130,19 @@ class ImportAMQPServer(AMQPServer): dnode.setCreatorID(userId) dnode.setContentLength(os.path.getsize(file)) dnode.setContentMD5(self.md5calc.getMD5(file)) if not self.dbConn.nodeExists(dnode): self.dbConn.insertNode(dnode) self.dbConn.disconnect() # add a counter to track the number of nodes (files and dirs) + log file response = { "responseType": "IMPORT_DONE" } else: response = { "responseType": "ERROR", "errorCode": 2, "errorCode": 8, "errorMsg": "Unkown request type." } return response def scanRecursive(self): dirList = [] fileList = [] if os.path.isfile(self.path): p = self.path while p != '/': p = os.path.dirname(p) dirList.append(p) dirList.reverse() fileList.append([os.path.abspath(self.path)]) return [ dirList, fileList ] for folder, subfolders, files in os.walk(self.path, topdown = True): cwd = os.path.basename(folder) if folder != self.path: parent = os.path.dirname(folder) dirList.append(parent + '/' + cwd) i = 0 for f in files: files[i] = parent + '/' + cwd + '/' + f i += 1 fileList.append(files) return [ dirList, fileList ] return response def run(self): print(f"Starting AMQP server of type {self.type}...") Loading Loading
transfer_service/import_amqp_server.py +92 −77 Original line number Diff line number Diff line import os import sys import json import re from amqp_server import AMQPServer from checksum import Checksum Loading @@ -16,7 +15,6 @@ class ImportAMQPServer(AMQPServer): def __init__(self, host, port, queue): self.type = "import" self.md5calc = Checksum() self.storeAck = False config = Config("vos_ts.conf") self.params = config.loadSection("file_catalog") self.dbConn = DbConnector(self.params["user"], Loading @@ -24,8 +22,7 @@ class ImportAMQPServer(AMQPServer): self.params["host"], self.params.getint("port"), self.params["db"]) self.params = config.loadSection("transfer_node") self.storageStorePath = self.params["store_path"] self.systemUtils = SystemUtils() self.path = None self.username = None super(ImportAMQPServer, self).__init__(host, port, queue) Loading @@ -35,30 +32,73 @@ class ImportAMQPServer(AMQPServer): if "requestType" not in requestBody or "path" not in requestBody: response = { "errorCode": 1, "errorMsg": "Malformed request, missing parameters." } elif requestBody["requestType"] == "NODE_IMPORT": self.path = requestBody["path"] self.path = os.path.abspath(requestBody["path"]) self.username = requestBody["userName"] out = open("import_amqp_server_log.txt", "a") self.dbConn.connect() #if os.path.isdir(self.path): userInDb = self.dbConn.userExists(self.username) userInfo = self.systemUtils.userInfo(self.username) out = open("import_amqp_server_log.txt", "a") if not userInfo or not userInDb: response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": "The user does not exist or is not registered in the database." } self.dbConn.disconnect() return response userId = self.dbConn.getRapId(self.username) pathPrefix = self.dbConn.storageBasePathIsValid(self.path) if pathPrefix: storageId = self.dbConn.getStorageId(pathPrefix) [ dirs, files ] = self.scanRecursive() tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper") else: response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": "Invalid storage mount point." } self.dbConn.disconnect() return response if not os.path.exists(self.path): response = { "responseType": "ERROR", "errorCode": 4, "errorMsg": "Path not found." } self.dbConn.disconnect() return response elif not os.path.isdir(self.path): response = { "responseType": "ERROR", "errorCode": 5, "errorMsg": "Directory path expected." } self.dbConn.disconnect() return response elif self.username not in self.path: response = { "responseType": "ERROR", "errorCode": 6, "errorMsg": "Directory path does not contain the username." } self.dbConn.disconnect() return response elif os.path.dirname(self.path) != pathPrefix + '/' + self.username: response = { "responseType": "ERROR", "errorCode": 7, "errorMsg": "Invalid path, directory must be located in " + pathPrefix + '/' + self.username } self.dbConn.disconnect() return response else: [ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(self.path)) for dir in dirs: #pathPrefix = self.dbConn.storageBasePathIsValid(dir) out.write(f"DIR dir: {dir}\n") out.write(f"DIR pathPrefix: {pathPrefix}\n\n") if pathPrefix + '/' + self.username in dir and len(pathPrefix + '/' + self.username) < len(dir): if pathPrefix + '/' + self.username in dir: parentPath = os.path.dirname(dir).split(pathPrefix)[1] nodeName = os.path.basename(dir) if parentPath == '/': vospacePath = parentPath + nodeName else: vospacePath = parentPath + '/' + nodeName cnode = Node(nodeName, "container") if not tstampWrapperDirPattern.match("/" + nodeName): if tstampWrapperDirPattern.search(vospacePath): tstampWrapperDir = tstampWrapperDirPattern.search(vospacePath).group(0).lstrip('/') vospacePath = tstampWrapperDirPattern.sub("", vospacePath) cnode.setWrapperDir(tstampWrapperDir) cnode.setParentPath(parentPath) locationId = self.dbConn.getLocationId(storageId) cnode.setLocationId(locationId) Loading @@ -74,7 +114,6 @@ class ImportAMQPServer(AMQPServer): out.write(f"FILE files: {files}\n") out.write(f"FILE flist: {flist}\n") out.write(f"FILE file: {file}\n") #pathPrefix = self.dbConn.storageBasePathIsValid(file) out.write(f"FILE pathPrefix: {pathPrefix}\n") parentPath = os.path.dirname(file).split(pathPrefix)[1] out.write(f"FILE parentPath: {parentPath}\n") Loading @@ -82,10 +121,6 @@ class ImportAMQPServer(AMQPServer): out.write(f"FILE nodeName: {nodeName}\n") vospacePath = parentPath + '/' + nodeName dnode = Node(nodeName, "data") if tstampWrapperDirPattern.search(vospacePath): tstampWrapperDir = tstampWrapperDirPattern.search(vospacePath).group(0).lstrip('/') vospacePath = tstampWrapperDirPattern.sub("", vospacePath) dnode.setWrapperDir(tstampWrapperDir) out.write(f"FILE vospacePath: {vospacePath}\n") dnode.setParentPath(parentPath) storageId = self.dbConn.getStorageId(pathPrefix) Loading @@ -95,39 +130,19 @@ class ImportAMQPServer(AMQPServer): dnode.setCreatorID(userId) dnode.setContentLength(os.path.getsize(file)) dnode.setContentMD5(self.md5calc.getMD5(file)) if not self.dbConn.nodeExists(dnode): self.dbConn.insertNode(dnode) self.dbConn.disconnect() # add a counter to track the number of nodes (files and dirs) + log file response = { "responseType": "IMPORT_DONE" } else: response = { "responseType": "ERROR", "errorCode": 2, "errorCode": 8, "errorMsg": "Unkown request type." } return response def scanRecursive(self): dirList = [] fileList = [] if os.path.isfile(self.path): p = self.path while p != '/': p = os.path.dirname(p) dirList.append(p) dirList.reverse() fileList.append([os.path.abspath(self.path)]) return [ dirList, fileList ] for folder, subfolders, files in os.walk(self.path, topdown = True): cwd = os.path.basename(folder) if folder != self.path: parent = os.path.dirname(folder) dirList.append(parent + '/' + cwd) i = 0 for f in files: files[i] = parent + '/' + cwd + '/' + f i += 1 fileList.append(files) return [ dirList, fileList ] return response def run(self): print(f"Starting AMQP server of type {self.type}...") Loading