Loading transfer_service/cleaner.py +1 −2 Original line number Diff line number Diff line Loading @@ -36,10 +36,9 @@ for row in fileList: filePath = basePath + relPath dTime = row["deleted_on"] cTime = datetime.datetime.now() phyDeletedTstamp = row["phy_deleted_on"] nodeId = row["node_id"] delta = cTime - dTime if delta.days >= days and delta.seconds > seconds and phyDeletedTstamp is None: if delta.days >= days and delta.seconds > seconds: os.remove(filePath) print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ' ' + filePath) dbConn.setPhyDeletedOn(nodeId) Loading transfer_service/db_connector.py +28 −42 Original line number Diff line number Diff line Loading @@ -57,13 +57,12 @@ class DbConnector(object): try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, os_path, content_length FROM node_path p JOIN node n ON p.node_id = n.node_id SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, get_os_path(n.node_id) AS os_path, content_length FROM node n JOIN location l ON n.location_id = l.location_id JOIN storage s ON s.storage_id = l.storage_src_id JOIN users u ON u.user_id = n.creator_id WHERE p.vos_path = %s; WHERE n.node_id = id_from_vos_path(%s); """, (vospacePath,)) result = cursor.fetchall() Loading Loading @@ -98,11 +97,10 @@ class DbConnector(object): try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT op.vos_path FROM node_vos_path vp JOIN list_of_files l ON l.list_node_id = vp.node_id JOIN node_path op ON op.node_id = l.node_id WHERE vp.vos_path = %s; SELECT get_vos_path(n.node_id) FROM node n JOIN list_of_files l ON l.node_id = n.node_id WHERE l.list_node_id = id_from_vos_path(%s); """, (vospacePath,)) results = cursor.fetchall() Loading @@ -121,29 +119,23 @@ class DbConnector(object): try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" WITH all_nodes AS ( SELECT name, os_name, node_id, parent_path, path, relative_path, creator_id, location_id, null as deleted_on, null as phy_deleted_on FROM node UNION SELECT name, os_name, node_id, parent_path, path(parent_path, node_id) AS path, path(parent_relative_path, node_id) AS relative_path, creator_id, location_id, deleted_on, phy_deleted_on FROM deleted_node ) SELECT an.node_id, base_path || '/' || creator_id as os_base_path, os_path AS os_rel_path, an.deleted_on, an.phy_deleted_on FROM ( SELECT node_id, '/' || string_agg(name, '/') AS os_path FROM ( SELECT (CASE WHEN os_name IS NOT NULL THEN os_name ELSE name END) AS name, p.node_id FROM all_nodes n JOIN ( SELECT UNNEST(string_to_array(relative_path::varchar, '.')) AS rel_id, node_id FROM all_nodes ) AS p ON n.node_id::varchar = p.rel_id ORDER BY p.node_id, nlevel(n.path) ) AS j GROUP BY node_id ORDER BY os_path ) AS all_paths JOIN all_nodes an ON all_paths.node_id = an.node_id JOIN location l ON an.location_id = l.location_id JOIN storage s ON s.storage_id = l.storage_src_id WHERE all_paths.os_path NOT IN (SELECT os_path FROM node_os_path); WITH RECURSIVE del AS ( SELECT COALESCE(os_name, name) AS os_name, 1 AS level, node_id AS deleted_node_id, path(parent_relative_path, node_id) AS relative_path, parent_relative_path FROM deleted_node WHERE phy_deleted_on IS NULL UNION ALL SELECT COALESCE(n.os_name, n.name), d.level + 1, d.deleted_node_id, n.relative_path, n.parent_relative_path FROM node n JOIN del d ON n.relative_path = d.parent_relative_path WHERE n.parent_relative_path IS NOT NULL ), paths_to_delete AS (SELECT deleted_node_id, '/' || STRING_AGG(os_name, '/' ORDER BY LEVEL DESC) AS os_path FROM del GROUP BY deleted_node_id) SELECT base_path || '/' || creator_id as os_base_path, os_path AS os_rel_path, deleted_on, d.node_id FROM paths_to_delete p JOIN deleted_node d ON d.node_id = p.deleted_node_id JOIN location l ON d.location_id = l.location_id JOIN storage s ON s.storage_id = l.storage_src_id; """) result = cursor.fetchall() except Exception as e: Loading Loading @@ -605,9 +597,7 @@ class DbConnector(object): out.write(f"name: {node.name}\n") cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT path FROM node n JOIN node_vos_path o ON n.node_id = o.node_id WHERE vos_path = %s; SELECT path FROM node WHERE node_id = id_from_vos_path(%s); """, (node.parentPath,)) result = cursor.fetchall() Loading Loading @@ -676,9 +666,7 @@ class DbConnector(object): cursor.execute(""" WITH deleted AS ( DELETE FROM list_of_files WHERE list_node_id = (SELECT node_id FROM node_vos_path WHERE vos_path = %s) WHERE list_node_id = id_from_vos_path(%s) RETURNING list_node_id ) DELETE FROM node WHERE node_id = Loading @@ -699,8 +687,7 @@ class DbConnector(object): cursor.execute(""" UPDATE node c SET async_trans = %s FROM node n JOIN node_vos_path p ON n.node_id = p.node_id WHERE c.path <@ n.path AND p.vos_path = %s; WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s); """, (value, nodeVOSPath,)) conn.commit() Loading @@ -716,8 +703,7 @@ class DbConnector(object): cursor.execute(""" UPDATE node c SET busy_state = %s FROM node n JOIN node_vos_path p ON n.node_id = p.node_id WHERE c.path <@ n.path AND p.vos_path = %s; WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s); """, (value, nodeVOSPath,)) conn.commit() Loading Loading
transfer_service/cleaner.py +1 −2 Original line number Diff line number Diff line Loading @@ -36,10 +36,9 @@ for row in fileList: filePath = basePath + relPath dTime = row["deleted_on"] cTime = datetime.datetime.now() phyDeletedTstamp = row["phy_deleted_on"] nodeId = row["node_id"] delta = cTime - dTime if delta.days >= days and delta.seconds > seconds and phyDeletedTstamp is None: if delta.days >= days and delta.seconds > seconds: os.remove(filePath) print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ' ' + filePath) dbConn.setPhyDeletedOn(nodeId) Loading
transfer_service/db_connector.py +28 −42 Original line number Diff line number Diff line Loading @@ -57,13 +57,12 @@ class DbConnector(object): try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, os_path, content_length FROM node_path p JOIN node n ON p.node_id = n.node_id SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, get_os_path(n.node_id) AS os_path, content_length FROM node n JOIN location l ON n.location_id = l.location_id JOIN storage s ON s.storage_id = l.storage_src_id JOIN users u ON u.user_id = n.creator_id WHERE p.vos_path = %s; WHERE n.node_id = id_from_vos_path(%s); """, (vospacePath,)) result = cursor.fetchall() Loading Loading @@ -98,11 +97,10 @@ class DbConnector(object): try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT op.vos_path FROM node_vos_path vp JOIN list_of_files l ON l.list_node_id = vp.node_id JOIN node_path op ON op.node_id = l.node_id WHERE vp.vos_path = %s; SELECT get_vos_path(n.node_id) FROM node n JOIN list_of_files l ON l.node_id = n.node_id WHERE l.list_node_id = id_from_vos_path(%s); """, (vospacePath,)) results = cursor.fetchall() Loading @@ -121,29 +119,23 @@ class DbConnector(object): try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" WITH all_nodes AS ( SELECT name, os_name, node_id, parent_path, path, relative_path, creator_id, location_id, null as deleted_on, null as phy_deleted_on FROM node UNION SELECT name, os_name, node_id, parent_path, path(parent_path, node_id) AS path, path(parent_relative_path, node_id) AS relative_path, creator_id, location_id, deleted_on, phy_deleted_on FROM deleted_node ) SELECT an.node_id, base_path || '/' || creator_id as os_base_path, os_path AS os_rel_path, an.deleted_on, an.phy_deleted_on FROM ( SELECT node_id, '/' || string_agg(name, '/') AS os_path FROM ( SELECT (CASE WHEN os_name IS NOT NULL THEN os_name ELSE name END) AS name, p.node_id FROM all_nodes n JOIN ( SELECT UNNEST(string_to_array(relative_path::varchar, '.')) AS rel_id, node_id FROM all_nodes ) AS p ON n.node_id::varchar = p.rel_id ORDER BY p.node_id, nlevel(n.path) ) AS j GROUP BY node_id ORDER BY os_path ) AS all_paths JOIN all_nodes an ON all_paths.node_id = an.node_id JOIN location l ON an.location_id = l.location_id JOIN storage s ON s.storage_id = l.storage_src_id WHERE all_paths.os_path NOT IN (SELECT os_path FROM node_os_path); WITH RECURSIVE del AS ( SELECT COALESCE(os_name, name) AS os_name, 1 AS level, node_id AS deleted_node_id, path(parent_relative_path, node_id) AS relative_path, parent_relative_path FROM deleted_node WHERE phy_deleted_on IS NULL UNION ALL SELECT COALESCE(n.os_name, n.name), d.level + 1, d.deleted_node_id, n.relative_path, n.parent_relative_path FROM node n JOIN del d ON n.relative_path = d.parent_relative_path WHERE n.parent_relative_path IS NOT NULL ), paths_to_delete AS (SELECT deleted_node_id, '/' || STRING_AGG(os_name, '/' ORDER BY LEVEL DESC) AS os_path FROM del GROUP BY deleted_node_id) SELECT base_path || '/' || creator_id as os_base_path, os_path AS os_rel_path, deleted_on, d.node_id FROM paths_to_delete p JOIN deleted_node d ON d.node_id = p.deleted_node_id JOIN location l ON d.location_id = l.location_id JOIN storage s ON s.storage_id = l.storage_src_id; """) result = cursor.fetchall() except Exception as e: Loading Loading @@ -605,9 +597,7 @@ class DbConnector(object): out.write(f"name: {node.name}\n") cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT path FROM node n JOIN node_vos_path o ON n.node_id = o.node_id WHERE vos_path = %s; SELECT path FROM node WHERE node_id = id_from_vos_path(%s); """, (node.parentPath,)) result = cursor.fetchall() Loading Loading @@ -676,9 +666,7 @@ class DbConnector(object): cursor.execute(""" WITH deleted AS ( DELETE FROM list_of_files WHERE list_node_id = (SELECT node_id FROM node_vos_path WHERE vos_path = %s) WHERE list_node_id = id_from_vos_path(%s) RETURNING list_node_id ) DELETE FROM node WHERE node_id = Loading @@ -699,8 +687,7 @@ class DbConnector(object): cursor.execute(""" UPDATE node c SET async_trans = %s FROM node n JOIN node_vos_path p ON n.node_id = p.node_id WHERE c.path <@ n.path AND p.vos_path = %s; WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s); """, (value, nodeVOSPath,)) conn.commit() Loading @@ -716,8 +703,7 @@ class DbConnector(object): cursor.execute(""" UPDATE node c SET busy_state = %s FROM node n JOIN node_vos_path p ON n.node_id = p.node_id WHERE c.path <@ n.path AND p.vos_path = %s; WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s); """, (value, nodeVOSPath,)) conn.commit() Loading