Commit 0de22e29 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Included redis rpc client and server classes.

parent 0bc6dbce
Loading
Loading
Loading
Loading
+21 −0
Original line number Original line Diff line number Diff line
import json
import redis
import uuid

class RedisRpcClient(object):

    def __init__(self, host, port, db, rpcQueue):
        self.client = redis.Redis(host, port, db)
        self.rpcQueue = rpcQueue

    def call(self, request):
        requestId = uuid.uuid1().hex
        request["id"] = requestId
        self.client.lpush(self.rpcQueue, json.dumps(request))
        channel, response = self.client.brpop(requestId, timeout = 30)
        response = json.loads(response.decode("utf-8"))
        return response


#client = RedisRpcClient("job_cache", "testList")
#client.call()
+44 −0
Original line number Original line Diff line number Diff line
import json
import redis
import threading


class RedisRpcServer(threading.Thread):

    def __init__(self, host, port, db, rpcQueue):
        threading.Thread.__init__(self)
        self.redisUrl = "redis://" + host + ':' + str(port) + '/' + str(db)
        self.client = redis.from_url(self.redisUrl)
        self.rpcQueue = rpcQueue

    def run(self):
        #print("Starting RPC server for " + self.rpcQueue)
        while True:
            channel, request = self.client.brpop(self.rpcQueue)
            channel = channel.decode("utf-8")
            request = json.loads(request.decode("utf-8"))

            #print(channel)
            #print(request)

            #result = 42

            #response = {
            #  "jsonrpc": "2.0",
            #  "result": result,
            #  "id": request["id"]
            #}
            response = self.callback(request)

            self.client.rpush(request["id"], json.dumps(response))
            self.client.expire(request["id"], 30)

    def callback(self, request):
        """
        This method must be implemented by
        inherited classes
        """
        pass

#server = RedisRpcServer("redis://job_cache:6379", "testList")
#server.start()