Commit ba6b75b2 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Fixed race conditions on concurrent requests.

parent 9711d1f4
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -13,7 +13,7 @@ class AMQPServer(threading.Thread):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = self.host, port = self.port))
        self.channel = self.connection.channel();
        self.channel.queue_declare(queue = self.queue)
        self.channel.basic_qos(prefetch_count = 1)
        self.channel.basic_qos(prefetch_count = 32)
        threading.Thread(target = self.channel.basic_consume(queue = self.queue, on_message_callback = self.on_request))

    def on_request(self, ch, method, props, body):
+28 −34
Original line number Diff line number Diff line
@@ -34,9 +34,9 @@ class StoreAMQPServer(AMQPServer):
        self.storageStorePath = self.params["store_path"]
        self.pendingQueueWrite = JobQueue("write_pending")
        self.systemUtils = SystemUtils()
        self.job = None
        self.username = None
        self.path = None
        #self.job = None
        #self.username = None
        #self.path = None
        super(StoreAMQPServer, self).__init__(host, port, queue)

    def execute_callback(self, requestBody):
@@ -45,10 +45,6 @@ class StoreAMQPServer(AMQPServer):
            response = { "errorCode": 1, "errorMsg": "Malformed request, missing parameters." }
        elif requestBody["requestType"] == "CSTORE" or requestBody["requestType"] == "HSTORE":
            user = requestBody["userName"]            
            self.job = Job()
            self.job.setType("other")
            self.job.setInfo(requestBody)
            self.job.setPhase("PENDING")
            #self.dbConn.connect()
            userInDb = self.dbConn.userExists(user)
            if requestBody["requestType"] == "CSTORE":
@@ -76,7 +72,6 @@ class StoreAMQPServer(AMQPServer):
                    if os.access(folderPath, os.W_OK) and os.listdir(folderPath):
                        response = { "responseType": "STORE_ACK",
                                     "storageList": storageList }
                        self.storeAck = True
                    elif os.access(folderPath, os.W_OK) and not os.listdir(folderPath):
                        response = { "responseType": "ERROR",
                                     "errorCode": 3,
@@ -90,31 +85,31 @@ class StoreAMQPServer(AMQPServer):
                                 "errorCode": 5,
                                 "errorMsg": "Permission denied." }
        elif requestBody["requestType"] == "STORE_CON":
            if self.storeAck:
                self.storeAck = False
                user = requestBody["userName"]
                self.prepare(user)
            #if self.storeAck:
                #self.storeAck = False
            username = requestBody["userName"]
            self.prepare(username)
            #self.dbConn.connect()
                self.job.setOwnerId(self.dbConn.getRapId(self.username))
                self.dbConn.insertJob(self.job)
                dbResponse = self.dbConn.getJob(self.job.jobId)
            job = Job()
            job.setType("other")
            job.setInfo(requestBody)
            job.setPhase("PENDING")
            job.setOwnerId(self.dbConn.getRapId(username))
            self.dbConn.insertJob(job)
            dbResponse = self.dbConn.getJob(job.jobId)
            #self.dbConn.disconnect()
                self.job.jobInfo["storageId"] = requestBody["storageId"]
                self.pendingQueueWrite.insertJob(self.job)
            job.jobInfo["storageId"] = requestBody["storageId"]
            self.pendingQueueWrite.insertJob(job)
            if "error" in dbResponse:
                response = { "responseType": "ERROR",
                             "errorCode": 6,
                             "errorMsg": "Job creation failed." }
            else:
                response = { "responseType": "STORE_RUN", 
                                 "jobId": self.job.jobId }
                             "jobId": job.jobId }
        else:
            response = { "responseType": "ERROR",
                         "errorCode": 7,
                             "errorMsg": "Store request not acknowledged." }
        else:
            response = { "responseType": "ERROR",
                         "errorCode": 8,
                         "errorMsg": "Unkown request type." }

        return response
@@ -122,10 +117,9 @@ class StoreAMQPServer(AMQPServer):
    # to be removed from store_preprocessor.py
    # or simply add a chmod -x here, to be faster?
    def prepare(self, username):
        self.username = username
        #self.path = "/home/" + username + "/store"
        self.path = self.storageStorePath.replace("{username}", self.username)
        for folder, subfolders, files in os.walk(self.path):
        path = self.storageStorePath.replace("{username}", username)
        for folder, subfolders, files in os.walk(path):
            os.chown(folder, 0, 0)
            os.chmod(folder, 0o555)
            for s in subfolders: