Commit a255dd88 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Added 'listActiveJobs()' and 'listJobsByPhase()' methods.

parent 69f3da72
Loading
Loading
Loading
Loading
+130 −71
Original line number Diff line number Diff line
@@ -36,7 +36,7 @@ class DbConnector(object):
    Getters
    """

    ### Node
    ##### Node #####

    def nodeExists(self, node):
        """Checks if a VOSpace node already exists. Returns a boolean."""
@@ -91,7 +91,7 @@ class DbConnector(object):
                vospacePathList.append(el["vos_path"])
            return vospacePathList

    ### Job
    ##### Job #####

    def getJob(self, jobId):
        """Returns a JSON object containing job information, according to the job id."""
@@ -117,7 +117,66 @@ class DbConnector(object):
                out.close()
                return job

    ### Users
    def listActiveJobs(self):
        """Returns some info about active jobs."""
        if self.conn:
            self.cursor.execute("""
                SELECT job_id,
                       job_type,
                       phase,
                       creation_time,
                       start_time,
                       owner_id
                       FROM job
                       WHERE phase
                       NOT IN ('ABORTED',
                               'COMPLETED',
                               'ERROR');
                """)
            result = self.cursor.fetchall()
            for row in result:
                for idx in row:
                    el = row[idx]
                    if isinstance(el, datetime.datetime):
                        row[idx] = el.isoformat()
            return result

    def listJobsByPhase(self, phase):
        """Returns some info about jobs according to the phase."""
        if self.conn:
            if phase in [ "PENDING", "QUEUED", "EXECUTING" ]:
                self.cursor.execute("""
                    SELECT job_id,
                           job_type,
                           phase,
                           creation_time,
                           start_time,
                           owner_id
                           FROM job
                           WHERE phase = %s;
                    """,
                    (phase,))
            else:
                self.cursor.execute("""
                    SELECT job_id,
                           job_type,
                           phase,
                           start_time,
                           end_time,
                           owner_id
                           FROM job
                           WHERE phase = %s;
                    """,
                    (phase,))
            result = self.cursor.fetchall()
            for row in result:
                for idx in row:
                    el = row[idx]
                    if isinstance(el, datetime.datetime):
                        row[idx] = el.isoformat()
            return result

    ##### User #####

    def userExists(self, username):
        """Checks if a user already exists. Returns a boolean."""
@@ -141,7 +200,7 @@ class DbConnector(object):
            self.cursor.execute("SELECT user_name FROM users WHERE rap_id = %s;", (rapId,))
            return self.cursor.fetchall()[0]["user_name"]

    ### Storage
    ##### Storage #####

    def storageBasePathIsValid(self, path):
        """Checks if the base path of a physical path is valid. If true, returns the base path, else returns 'False'."""
@@ -197,7 +256,7 @@ class DbConnector(object):
            else:
                return False

    ### Location
    ##### Location #####

    def getLocationId(self, destStorageId):
        """Returns the location id according to the storage id of the destination."""
@@ -210,7 +269,7 @@ class DbConnector(object):
    Setters
    """

    ### Job
    ##### Job #####

    def insertJob(self, jobObj):
        """Inserts/updates a job object."""
@@ -272,7 +331,7 @@ class DbConnector(object):
                 jobId,))
            self.conn.commit()

    ### Node
    ##### Node #####

    def insertNode(self, node):
        """Inserts a VOSpace node."""
@@ -386,7 +445,7 @@ class DbConnector(object):
                (value, nodeVOSPath,))
            self.conn.commit()

    ### Storage
    ##### Storage #####

    def insertStorage(self, storageType, basePath, hostname):
        """Inserts a storage point."""