Loading transfer_service/db_connector.py +57 −54 Original line number Diff line number Diff line Loading @@ -77,8 +77,8 @@ class DbConnector(object): def nodeExists(self, vospacePath): """Checks if a VOSpace node already exists. Returns a boolean.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM node_vos_path WHERE vos_path = %s;", (vospacePath,)) result = cursor.fetchall() Loading @@ -97,8 +97,8 @@ class DbConnector(object): def getCreatorId(self, vospacePath): """Returns the creator ID for a given vospace path representing a node.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT creator_id Loading @@ -120,8 +120,8 @@ class DbConnector(object): def getGroupRead(self, vospacePath): """Returns the 'group_read' for a given VOSpace path representing a node.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT unnest(group_read) as group_read Loading @@ -145,8 +145,8 @@ class DbConnector(object): def getGroupWrite(self, vospacePath): """Returns the 'group_write' for a given VOSpace path representing a node.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT unnest(group_write) as group_write Loading @@ -170,8 +170,8 @@ class DbConnector(object): def getOSPath(self, vospacePath): """Returns a list containing full path, storage type and username for a VOSpace path.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, get_os_path(n.node_id) AS os_path, content_length Loading Loading @@ -214,8 +214,8 @@ class DbConnector(object): def getVOSpacePathList(self, vospacePath): """Returns the list of VOSpace paths carried by a VOSpace node, according to the node VOSpace path.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT get_vos_path(n.node_id) Loading @@ -240,8 +240,8 @@ class DbConnector(object): def getNodesToBeDeleted(self): "Returns a path list of files to be deleted with also the corresponding deletion timestamp." conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" WITH RECURSIVE all_nodes AS ( Loading Loading @@ -281,8 +281,8 @@ class DbConnector(object): def nodeIsBusy(self, vospacePath): """Returns 'True' if the VOSpace node is busy, 'False' otherwise.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT job_id Loading @@ -308,8 +308,8 @@ class DbConnector(object): def jobExists(self, jobId): """Checks if a job already exists. Returns a boolean.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() Loading @@ -328,8 +328,8 @@ class DbConnector(object): def getJob(self, jobId): """Returns a JSON object containing job information, according to the job id.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() Loading Loading @@ -357,8 +357,8 @@ class DbConnector(object): def getJobPhase(self, jobId): """Returns the 'phase' field, according to the UWS specification.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT phase FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() Loading @@ -374,8 +374,8 @@ class DbConnector(object): def getJobInfo(self, jobId): """Returns the 'job_info' field, according to the UWS specification.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT job_info FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() Loading @@ -391,8 +391,8 @@ class DbConnector(object): def getJobResults(self, jobId): """Returns the 'results' field, according to the UWS specification.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT results FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() Loading @@ -408,8 +408,8 @@ class DbConnector(object): def getActiveJobs(self): """Returns some info about active jobs.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT job_id, Loading Loading @@ -450,8 +450,8 @@ class DbConnector(object): def getJobsByPhase(self, phase): """Returns some info about jobs according to the phase.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) if phase in [ "PENDING", "QUEUED", "EXECUTING", "HELD", "SUSPENDED" ]: cursor.execute(""" Loading Loading @@ -511,8 +511,8 @@ class DbConnector(object): def searchJobs(self, searchStr): "Performs a search on jobs." conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT job_id, Loading Loading @@ -557,8 +557,8 @@ class DbConnector(object): def userExists(self, username): """Checks if a user already exists. Returns a boolean.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM users WHERE user_name = %s;", (username,)) result = cursor.fetchall() Loading @@ -577,8 +577,8 @@ class DbConnector(object): def getUserId(self, username): """Returns the user id for a given user name (if any), 'False' otherwise .""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT user_id FROM users WHERE user_name = %s;", (username,)) result = cursor.fetchall() Loading @@ -596,9 +596,9 @@ class DbConnector(object): self.connPool.putconn(conn, close = False) def getUserName(self, userId): """Returns the user name for a given user id.""" conn = self.getConnection() """Returns the user name for a given user id, if any, 'False' otherwise.""" try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT user_name FROM users WHERE user_id = %s;", (userId,)) result = cursor.fetchall() Loading @@ -608,14 +608,17 @@ class DbConnector(object): conn.rollback() raise else: if result: return result[0]["user_name"] else: return False finally: self.connPool.putconn(conn, close = False) def getUserEmail(self, userId): """Returns the user email address for a given user id.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT e_mail FROM users WHERE user_id = %s;", (userId,)) result = cursor.fetchall() Loading @@ -631,8 +634,8 @@ class DbConnector(object): def getUserList(self): """Returns the full user list.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM users;") result = cursor.fetchall() Loading @@ -648,8 +651,8 @@ class DbConnector(object): def searchUsers(self, searchStr): "Performs a search on users." conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT user_id, Loading @@ -676,8 +679,8 @@ class DbConnector(object): def storageBasePathIsValid(self, path): """Checks if the base path of a physical path is valid. If true, returns the base path, else returns 'False'.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT base_path Loading @@ -701,8 +704,8 @@ class DbConnector(object): def getStorageBasePath(self, storageId): """Returns the storage base path for a give storage id.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT base_path FROM storage WHERE storage_id = %s;", (storageId,)) result = cursor.fetchall() Loading @@ -718,8 +721,8 @@ class DbConnector(object): def getStorageList(self): """Returns the full storage base list. Local storage points are excluded by default.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM storage WHERE storage_type <> 'local';") result = cursor.fetchall() Loading @@ -735,8 +738,8 @@ class DbConnector(object): def getStorageListByType(self, storageType): """Returns a list of storage locations for a given storage type.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM storage WHERE storage_type = %s;", (storageType,)) result = cursor.fetchall() Loading @@ -752,8 +755,8 @@ class DbConnector(object): def getStorageType(self, basePath): """Returns the storage type for a given storage base path, if any. Otherwise it returns 'False'.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT storage_type FROM storage WHERE base_path = %s;", (basePath,)) result = cursor.fetchall() Loading @@ -772,8 +775,8 @@ class DbConnector(object): def getStorageId(self, basePath): """Returns the storage id for a given storage base path, if any. Otherwise it returns 'False'.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT storage_id FROM storage WHERE base_path = %s;", (basePath,)) result = cursor.fetchall() Loading @@ -792,8 +795,8 @@ class DbConnector(object): def getStorageHostname(self, storageId): """Returns the storage hostname for a given storage id, if any. Otherwise it returns 'False'.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT hostname FROM storage WHERE storage_id = %s;", (storageId,)) result = cursor.fetchall() Loading @@ -814,8 +817,8 @@ class DbConnector(object): def getLocationId(self, destStorageId): """Returns the location id according to the storage id of the destination.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT location_id FROM location WHERE storage_src_id = %s;", (destStorageId,)) result = cursor.fetchall() Loading @@ -838,8 +841,8 @@ class DbConnector(object): def insertJob(self, jobObj): """Inserts/updates a job object.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" INSERT INTO job(job_id, Loading Loading @@ -899,8 +902,8 @@ class DbConnector(object): def setStartTime(self, jobId): """Sets the job 'start_time' parameter.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) startTime = datetime.datetime.today().isoformat() cursor.execute(""" Loading @@ -919,8 +922,8 @@ class DbConnector(object): def setEndTime(self, jobId): """Sets the job 'end_time' parameter.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) endTime = datetime.datetime.today().isoformat() cursor.execute(""" Loading @@ -939,8 +942,8 @@ class DbConnector(object): def setPhase(self, jobId, phase): """Sets the job 'phase' parameter.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE job SET phase = %s Loading @@ -961,8 +964,8 @@ class DbConnector(object): Sets the job 'total_blocks' parameter for a data retrieve operation. """ conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE job SET total_blocks = %s Loading @@ -983,8 +986,8 @@ class DbConnector(object): Updates the job 'processed_blocks' parameter for a data retrieve operation. """ conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE job SET processed_blocks = %s Loading @@ -1002,8 +1005,8 @@ class DbConnector(object): def setResults(self, jobId, results): """Sets the job 'results' parameter.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE job SET results = %s Loading @@ -1024,8 +1027,8 @@ class DbConnector(object): def insertNode(self, node): """Inserts a VOSpace node. Returns 'True' on success, 'False' otherwise.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT path FROM node WHERE node_id = id_from_vos_path(%s); Loading Loading @@ -1089,8 +1092,8 @@ class DbConnector(object): def deleteNodesByJobId(self, jobId): """Deletes all VOSpace nodes having a certain 'job_id'.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" DELETE FROM node WHERE job_id = %s; Loading @@ -1107,8 +1110,8 @@ class DbConnector(object): def setAsyncTrans(self, nodeVOSPath, value): """Sets the 'async_trans' flag for a VOSpace node.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE node c SET async_trans = %s Loading @@ -1127,8 +1130,8 @@ class DbConnector(object): def setJobId(self, nodeVOSPath, value): """Sets the 'job_id' flag for a VOSpace node.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE node c SET job_id = %s Loading @@ -1147,8 +1150,8 @@ class DbConnector(object): def setPhyDeletedOn(self, nodeId): """Sets the 'phy_deleted_on' flag for a VOSpace deleted node.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) phyDeletedOn = datetime.datetime.now().isoformat() cursor.execute(""" Loading @@ -1166,8 +1169,8 @@ class DbConnector(object): self.connPool.putconn(conn, close = False) def updateGroupRead(self, groupToAdd, groupToRemove, nodeVOSPath): conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE node c Loading @@ -1188,8 +1191,8 @@ class DbConnector(object): self.connPool.putconn(conn, close = False) def updateGroupWrite(self, groupToAdd, groupToRemove, nodeVOSPath): conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE node c Loading @@ -1213,9 +1216,9 @@ class DbConnector(object): def insertStorage(self, storageType, basePath, baseUrl, hostname): """Inserts a storage point.""" conn = self.getConnection() if not self.getStorageId(basePath): try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" INSERT INTO storage(storage_type, Loading Loading @@ -1279,8 +1282,8 @@ class DbConnector(object): def deleteStorage(self, storageId): """Deletes a storage point.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT count(*) > 0 AS res Loading Loading @@ -1326,9 +1329,9 @@ class DbConnector(object): Inserts a user in the database. Returns 'True' on success, 'False' otherwise. """ conn = self.getConnection() if not self.getUserId(username): if not self.getUserName(userId): try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" INSERT INTO users(user_id, Loading Loading @@ -1356,8 +1359,8 @@ class DbConnector(object): def deleteUser(self, userId): """Deletes a user from the database.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" DELETE FROM users Loading Loading
transfer_service/db_connector.py +57 −54 Original line number Diff line number Diff line Loading @@ -77,8 +77,8 @@ class DbConnector(object): def nodeExists(self, vospacePath): """Checks if a VOSpace node already exists. Returns a boolean.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM node_vos_path WHERE vos_path = %s;", (vospacePath,)) result = cursor.fetchall() Loading @@ -97,8 +97,8 @@ class DbConnector(object): def getCreatorId(self, vospacePath): """Returns the creator ID for a given vospace path representing a node.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT creator_id Loading @@ -120,8 +120,8 @@ class DbConnector(object): def getGroupRead(self, vospacePath): """Returns the 'group_read' for a given VOSpace path representing a node.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT unnest(group_read) as group_read Loading @@ -145,8 +145,8 @@ class DbConnector(object): def getGroupWrite(self, vospacePath): """Returns the 'group_write' for a given VOSpace path representing a node.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT unnest(group_write) as group_write Loading @@ -170,8 +170,8 @@ class DbConnector(object): def getOSPath(self, vospacePath): """Returns a list containing full path, storage type and username for a VOSpace path.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, get_os_path(n.node_id) AS os_path, content_length Loading Loading @@ -214,8 +214,8 @@ class DbConnector(object): def getVOSpacePathList(self, vospacePath): """Returns the list of VOSpace paths carried by a VOSpace node, according to the node VOSpace path.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT get_vos_path(n.node_id) Loading @@ -240,8 +240,8 @@ class DbConnector(object): def getNodesToBeDeleted(self): "Returns a path list of files to be deleted with also the corresponding deletion timestamp." conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" WITH RECURSIVE all_nodes AS ( Loading Loading @@ -281,8 +281,8 @@ class DbConnector(object): def nodeIsBusy(self, vospacePath): """Returns 'True' if the VOSpace node is busy, 'False' otherwise.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT job_id Loading @@ -308,8 +308,8 @@ class DbConnector(object): def jobExists(self, jobId): """Checks if a job already exists. Returns a boolean.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() Loading @@ -328,8 +328,8 @@ class DbConnector(object): def getJob(self, jobId): """Returns a JSON object containing job information, according to the job id.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() Loading Loading @@ -357,8 +357,8 @@ class DbConnector(object): def getJobPhase(self, jobId): """Returns the 'phase' field, according to the UWS specification.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT phase FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() Loading @@ -374,8 +374,8 @@ class DbConnector(object): def getJobInfo(self, jobId): """Returns the 'job_info' field, according to the UWS specification.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT job_info FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() Loading @@ -391,8 +391,8 @@ class DbConnector(object): def getJobResults(self, jobId): """Returns the 'results' field, according to the UWS specification.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT results FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() Loading @@ -408,8 +408,8 @@ class DbConnector(object): def getActiveJobs(self): """Returns some info about active jobs.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT job_id, Loading Loading @@ -450,8 +450,8 @@ class DbConnector(object): def getJobsByPhase(self, phase): """Returns some info about jobs according to the phase.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) if phase in [ "PENDING", "QUEUED", "EXECUTING", "HELD", "SUSPENDED" ]: cursor.execute(""" Loading Loading @@ -511,8 +511,8 @@ class DbConnector(object): def searchJobs(self, searchStr): "Performs a search on jobs." conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT job_id, Loading Loading @@ -557,8 +557,8 @@ class DbConnector(object): def userExists(self, username): """Checks if a user already exists. Returns a boolean.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM users WHERE user_name = %s;", (username,)) result = cursor.fetchall() Loading @@ -577,8 +577,8 @@ class DbConnector(object): def getUserId(self, username): """Returns the user id for a given user name (if any), 'False' otherwise .""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT user_id FROM users WHERE user_name = %s;", (username,)) result = cursor.fetchall() Loading @@ -596,9 +596,9 @@ class DbConnector(object): self.connPool.putconn(conn, close = False) def getUserName(self, userId): """Returns the user name for a given user id.""" conn = self.getConnection() """Returns the user name for a given user id, if any, 'False' otherwise.""" try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT user_name FROM users WHERE user_id = %s;", (userId,)) result = cursor.fetchall() Loading @@ -608,14 +608,17 @@ class DbConnector(object): conn.rollback() raise else: if result: return result[0]["user_name"] else: return False finally: self.connPool.putconn(conn, close = False) def getUserEmail(self, userId): """Returns the user email address for a given user id.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT e_mail FROM users WHERE user_id = %s;", (userId,)) result = cursor.fetchall() Loading @@ -631,8 +634,8 @@ class DbConnector(object): def getUserList(self): """Returns the full user list.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM users;") result = cursor.fetchall() Loading @@ -648,8 +651,8 @@ class DbConnector(object): def searchUsers(self, searchStr): "Performs a search on users." conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT user_id, Loading @@ -676,8 +679,8 @@ class DbConnector(object): def storageBasePathIsValid(self, path): """Checks if the base path of a physical path is valid. If true, returns the base path, else returns 'False'.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT base_path Loading @@ -701,8 +704,8 @@ class DbConnector(object): def getStorageBasePath(self, storageId): """Returns the storage base path for a give storage id.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT base_path FROM storage WHERE storage_id = %s;", (storageId,)) result = cursor.fetchall() Loading @@ -718,8 +721,8 @@ class DbConnector(object): def getStorageList(self): """Returns the full storage base list. Local storage points are excluded by default.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM storage WHERE storage_type <> 'local';") result = cursor.fetchall() Loading @@ -735,8 +738,8 @@ class DbConnector(object): def getStorageListByType(self, storageType): """Returns a list of storage locations for a given storage type.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM storage WHERE storage_type = %s;", (storageType,)) result = cursor.fetchall() Loading @@ -752,8 +755,8 @@ class DbConnector(object): def getStorageType(self, basePath): """Returns the storage type for a given storage base path, if any. Otherwise it returns 'False'.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT storage_type FROM storage WHERE base_path = %s;", (basePath,)) result = cursor.fetchall() Loading @@ -772,8 +775,8 @@ class DbConnector(object): def getStorageId(self, basePath): """Returns the storage id for a given storage base path, if any. Otherwise it returns 'False'.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT storage_id FROM storage WHERE base_path = %s;", (basePath,)) result = cursor.fetchall() Loading @@ -792,8 +795,8 @@ class DbConnector(object): def getStorageHostname(self, storageId): """Returns the storage hostname for a given storage id, if any. Otherwise it returns 'False'.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT hostname FROM storage WHERE storage_id = %s;", (storageId,)) result = cursor.fetchall() Loading @@ -814,8 +817,8 @@ class DbConnector(object): def getLocationId(self, destStorageId): """Returns the location id according to the storage id of the destination.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT location_id FROM location WHERE storage_src_id = %s;", (destStorageId,)) result = cursor.fetchall() Loading @@ -838,8 +841,8 @@ class DbConnector(object): def insertJob(self, jobObj): """Inserts/updates a job object.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" INSERT INTO job(job_id, Loading Loading @@ -899,8 +902,8 @@ class DbConnector(object): def setStartTime(self, jobId): """Sets the job 'start_time' parameter.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) startTime = datetime.datetime.today().isoformat() cursor.execute(""" Loading @@ -919,8 +922,8 @@ class DbConnector(object): def setEndTime(self, jobId): """Sets the job 'end_time' parameter.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) endTime = datetime.datetime.today().isoformat() cursor.execute(""" Loading @@ -939,8 +942,8 @@ class DbConnector(object): def setPhase(self, jobId, phase): """Sets the job 'phase' parameter.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE job SET phase = %s Loading @@ -961,8 +964,8 @@ class DbConnector(object): Sets the job 'total_blocks' parameter for a data retrieve operation. """ conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE job SET total_blocks = %s Loading @@ -983,8 +986,8 @@ class DbConnector(object): Updates the job 'processed_blocks' parameter for a data retrieve operation. """ conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE job SET processed_blocks = %s Loading @@ -1002,8 +1005,8 @@ class DbConnector(object): def setResults(self, jobId, results): """Sets the job 'results' parameter.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE job SET results = %s Loading @@ -1024,8 +1027,8 @@ class DbConnector(object): def insertNode(self, node): """Inserts a VOSpace node. Returns 'True' on success, 'False' otherwise.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT path FROM node WHERE node_id = id_from_vos_path(%s); Loading Loading @@ -1089,8 +1092,8 @@ class DbConnector(object): def deleteNodesByJobId(self, jobId): """Deletes all VOSpace nodes having a certain 'job_id'.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" DELETE FROM node WHERE job_id = %s; Loading @@ -1107,8 +1110,8 @@ class DbConnector(object): def setAsyncTrans(self, nodeVOSPath, value): """Sets the 'async_trans' flag for a VOSpace node.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE node c SET async_trans = %s Loading @@ -1127,8 +1130,8 @@ class DbConnector(object): def setJobId(self, nodeVOSPath, value): """Sets the 'job_id' flag for a VOSpace node.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE node c SET job_id = %s Loading @@ -1147,8 +1150,8 @@ class DbConnector(object): def setPhyDeletedOn(self, nodeId): """Sets the 'phy_deleted_on' flag for a VOSpace deleted node.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) phyDeletedOn = datetime.datetime.now().isoformat() cursor.execute(""" Loading @@ -1166,8 +1169,8 @@ class DbConnector(object): self.connPool.putconn(conn, close = False) def updateGroupRead(self, groupToAdd, groupToRemove, nodeVOSPath): conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE node c Loading @@ -1188,8 +1191,8 @@ class DbConnector(object): self.connPool.putconn(conn, close = False) def updateGroupWrite(self, groupToAdd, groupToRemove, nodeVOSPath): conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE node c Loading @@ -1213,9 +1216,9 @@ class DbConnector(object): def insertStorage(self, storageType, basePath, baseUrl, hostname): """Inserts a storage point.""" conn = self.getConnection() if not self.getStorageId(basePath): try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" INSERT INTO storage(storage_type, Loading Loading @@ -1279,8 +1282,8 @@ class DbConnector(object): def deleteStorage(self, storageId): """Deletes a storage point.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT count(*) > 0 AS res Loading Loading @@ -1326,9 +1329,9 @@ class DbConnector(object): Inserts a user in the database. Returns 'True' on success, 'False' otherwise. """ conn = self.getConnection() if not self.getUserId(username): if not self.getUserName(userId): try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" INSERT INTO users(user_id, Loading Loading @@ -1356,8 +1359,8 @@ class DbConnector(object): def deleteUser(self, userId): """Deletes a user from the database.""" conn = self.getConnection() try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" DELETE FROM users Loading