Loading transfer_service/db_connector.py +101 −42 Original line number Diff line number Diff line Loading @@ -31,7 +31,87 @@ class DbConnector(object): self.cursor.close() self.conn.close() """ Getters """ def nodeExists(self, node): """Checks if a VOSpace node already exists. Returns a boolean.""" if self.conn: nodeOSPath = node.parentPath + '/' + node.name self.cursor.execute("SELECT * FROM node_os_path WHERE os_path = %s;", (nodeOSPath,)) result = self.cursor.fetchall() if result: return True else: return False def getOSPath(self, vospacePath): """Returns the os relative path according to the VOSpace path.""" if self.conn: self.cursor.execute(""" SELECT os_path, tstamp_wrapper_dir FROM node_path np JOIN node n ON np.node_id = n.node_id WHERE vos_path = %s; """, (vospacePath,)) result = self.cursor.fetchall() nodeOSPath = result[0]["os_path"] tstampWrappedDir = result[0]["tstamp_wrapper_dir"] pathElementList = nodeOSPath.split('/') pathElementList.insert(2, tstampWrappedDir) osPath = '/' + '/'.join(filter(None, pathElementList)) return osPath def getVOSpacePathList(self, vospacePath): """Returns the list of VOSpace paths carried by a VOSpace node, according to the node VOSpace path.""" if self.conn: self.cursor.execute(""" SELECT op.vos_path FROM node_vos_path vp JOIN list_of_files l ON l.list_node_id = vp.node_id JOIN node_path op ON op.node_id = l.node_id WHERE vp.vos_path = %s; """, (vospacePath,)) results = self.cursor.fetchall() vospacePathList = [] for el in results: vospacePathList.append(el["vos_path"]) return vospacePathList def getJob(self, jobId): """Returns a JSON object containing job information, according to the job id.""" if self.conn: self.cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) out = open("db_connector_log.txt", "a") result = self.cursor.fetchall() out.write(f"result: {result}\n\n") out.close() if not result: return json.loads('{ "error": "JOB_NOT_FOUND" }') else: return result[0] def getRapId(self, username): """Returns the RAP id for a given user name.""" if self.conn: self.cursor.execute("SELECT rap_id FROM Users WHERE user_name = %s;", (username,)) return self.cursor.fetchall()[0]["rap_id"] def getUserName(self, rapId): """Returns the user name for a given RAP id.""" if self.conn: self.cursor.execute("SELECT user_name FROM Users WHERE rap_id = %s;", (rapId,)) return self.cursor.fetchall()[0]["user_name"] """ Setters """ def insertJob(self, jobObj): """Inserts/updates a job object.""" if self.conn: self.cursor.execute(""" INSERT INTO job(job_id, owner_id, job_type, phase, start_time, end_time, job_info, results) Loading @@ -51,17 +131,29 @@ class DbConnector(object): json.dumps(jobObj.results),)) self.conn.commit() def nodeExists(self, node): def setPhase(self, jobId, phase): """Sets the job 'phase' parameter.""" if self.conn: nodeOSPath = node.parentPath + '/' + node.name self.cursor.execute("SELECT * FROM node_os_path WHERE os_path = %s;", (nodeOSPath,)) result = self.cursor.fetchall() if result: return True else: return False self.cursor.execute(""" UPDATE job SET phase = %s WHERE job_id = %s; """, (phase, jobId,)) self.conn.commit() def setResults(self, jobId, results): """Sets the job 'results' parameter.""" if self.conn: self.cursor.execute(""" UPDATE job SET results = %s WHERE job_id = %s; """, (json.dumps(results), jobId,)) self.conn.commit() def insertNode(self, node): """Inserts a VOSpace node.""" if self.conn: out = open("db_connector_log.txt", "a") #print(f"parentOSPath: {node.parentPath}") Loading Loading @@ -95,6 +187,7 @@ class DbConnector(object): self.conn.commit() def setAsyncTrans(self, nodeOSPath, value): """Sets the 'asyncTrans' flag for a VOSpace node.""" if self.conn: self.cursor.execute(""" UPDATE node SET async_trans = %s Loading @@ -104,37 +197,3 @@ class DbConnector(object): """, (value, nodeOSPath,)) self.conn.commit() def getOSPath(self, vospacePath): if self.conn: self.cursor.execute(""" SELECT os_path, tstamp_wrapper_dir FROM node_path np JOIN node n ON np.node_id = n.node_id WHERE vos_path = %s; """, (vospacePath,)) result = self.cursor.fetchall() nodeOSPath = result[0]["os_path"] tstampWrappedDir = result[0]["tstamp_wrapper_dir"] pathElementList = nodeOSPath.split('/') pathElementList.insert(2, tstampWrappedDir) osPath = '/' + '/'.join(filter(None, pathElementList)) return osPath def getJob(self, jobId): if self.conn: self.cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) out = open("db_connector_log.txt", "a") result = self.cursor.fetchall() out.write(f"result: {result}\n\n") out.close() if not result: return json.loads('{ "error": "JOB_NOT_FOUND" }') else: return result[0] def getRapId(self, username): if self.conn: self.cursor.execute("SELECT rap_id FROM Users WHERE user_name = %s;", (username,)) return self.cursor.fetchall()[0]["rap_id"] Loading
transfer_service/db_connector.py +101 −42 Original line number Diff line number Diff line Loading @@ -31,7 +31,87 @@ class DbConnector(object): self.cursor.close() self.conn.close() """ Getters """ def nodeExists(self, node): """Checks if a VOSpace node already exists. Returns a boolean.""" if self.conn: nodeOSPath = node.parentPath + '/' + node.name self.cursor.execute("SELECT * FROM node_os_path WHERE os_path = %s;", (nodeOSPath,)) result = self.cursor.fetchall() if result: return True else: return False def getOSPath(self, vospacePath): """Returns the os relative path according to the VOSpace path.""" if self.conn: self.cursor.execute(""" SELECT os_path, tstamp_wrapper_dir FROM node_path np JOIN node n ON np.node_id = n.node_id WHERE vos_path = %s; """, (vospacePath,)) result = self.cursor.fetchall() nodeOSPath = result[0]["os_path"] tstampWrappedDir = result[0]["tstamp_wrapper_dir"] pathElementList = nodeOSPath.split('/') pathElementList.insert(2, tstampWrappedDir) osPath = '/' + '/'.join(filter(None, pathElementList)) return osPath def getVOSpacePathList(self, vospacePath): """Returns the list of VOSpace paths carried by a VOSpace node, according to the node VOSpace path.""" if self.conn: self.cursor.execute(""" SELECT op.vos_path FROM node_vos_path vp JOIN list_of_files l ON l.list_node_id = vp.node_id JOIN node_path op ON op.node_id = l.node_id WHERE vp.vos_path = %s; """, (vospacePath,)) results = self.cursor.fetchall() vospacePathList = [] for el in results: vospacePathList.append(el["vos_path"]) return vospacePathList def getJob(self, jobId): """Returns a JSON object containing job information, according to the job id.""" if self.conn: self.cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) out = open("db_connector_log.txt", "a") result = self.cursor.fetchall() out.write(f"result: {result}\n\n") out.close() if not result: return json.loads('{ "error": "JOB_NOT_FOUND" }') else: return result[0] def getRapId(self, username): """Returns the RAP id for a given user name.""" if self.conn: self.cursor.execute("SELECT rap_id FROM Users WHERE user_name = %s;", (username,)) return self.cursor.fetchall()[0]["rap_id"] def getUserName(self, rapId): """Returns the user name for a given RAP id.""" if self.conn: self.cursor.execute("SELECT user_name FROM Users WHERE rap_id = %s;", (rapId,)) return self.cursor.fetchall()[0]["user_name"] """ Setters """ def insertJob(self, jobObj): """Inserts/updates a job object.""" if self.conn: self.cursor.execute(""" INSERT INTO job(job_id, owner_id, job_type, phase, start_time, end_time, job_info, results) Loading @@ -51,17 +131,29 @@ class DbConnector(object): json.dumps(jobObj.results),)) self.conn.commit() def nodeExists(self, node): def setPhase(self, jobId, phase): """Sets the job 'phase' parameter.""" if self.conn: nodeOSPath = node.parentPath + '/' + node.name self.cursor.execute("SELECT * FROM node_os_path WHERE os_path = %s;", (nodeOSPath,)) result = self.cursor.fetchall() if result: return True else: return False self.cursor.execute(""" UPDATE job SET phase = %s WHERE job_id = %s; """, (phase, jobId,)) self.conn.commit() def setResults(self, jobId, results): """Sets the job 'results' parameter.""" if self.conn: self.cursor.execute(""" UPDATE job SET results = %s WHERE job_id = %s; """, (json.dumps(results), jobId,)) self.conn.commit() def insertNode(self, node): """Inserts a VOSpace node.""" if self.conn: out = open("db_connector_log.txt", "a") #print(f"parentOSPath: {node.parentPath}") Loading Loading @@ -95,6 +187,7 @@ class DbConnector(object): self.conn.commit() def setAsyncTrans(self, nodeOSPath, value): """Sets the 'asyncTrans' flag for a VOSpace node.""" if self.conn: self.cursor.execute(""" UPDATE node SET async_trans = %s Loading @@ -104,37 +197,3 @@ class DbConnector(object): """, (value, nodeOSPath,)) self.conn.commit() def getOSPath(self, vospacePath): if self.conn: self.cursor.execute(""" SELECT os_path, tstamp_wrapper_dir FROM node_path np JOIN node n ON np.node_id = n.node_id WHERE vos_path = %s; """, (vospacePath,)) result = self.cursor.fetchall() nodeOSPath = result[0]["os_path"] tstampWrappedDir = result[0]["tstamp_wrapper_dir"] pathElementList = nodeOSPath.split('/') pathElementList.insert(2, tstampWrappedDir) osPath = '/' + '/'.join(filter(None, pathElementList)) return osPath def getJob(self, jobId): if self.conn: self.cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) out = open("db_connector_log.txt", "a") result = self.cursor.fetchall() out.write(f"result: {result}\n\n") out.close() if not result: return json.loads('{ "error": "JOB_NOT_FOUND" }') else: return result[0] def getRapId(self, username): if self.conn: self.cursor.execute("SELECT rap_id FROM Users WHERE user_name = %s;", (username,)) return self.cursor.fetchall()[0]["rap_id"]