KS blog

killins.egloos.com

포토로그



RabbitMQ with Python #6/6 by KillinS

MQ는 동일 시스템에 있는 프로세스들간의 IPC를 위한 수단으로써의 효용가치도 높지만, remote 시스템간의 메시지 교환에 사용될 때 효율성이 더 극대화된다고 할 수 있다. 따라서 Remote 서버에 있는 서비스를 이용하는 형태인 RPC에도 MQ를 사용할 수 있다. RPC에서 MQ가 일반적인 메시지 전달과 다른것은, RPC는 일반적인 메시지 전달이 아닌 프로시저 호출에 따른 결과값이 응답으로 전송되어야 한다는 것이다. 이것은 클라이언트가 응답을 위한 수신용 큐를 생성하고 이 큐에 대한 정보를 프로시즈 요청과 함께 서버에 송신하면, 서버에서 프로시저 호출 결과 응답을 클라이언트에서 생성한 수신용 큐로 송신함으로써 구현할 수 있다. 

RabbitMQ로 구현한 RPC 예제로 클라이언트가 특정 숫자에 대한 피보나치 값을 요청하면, 서버에서 그 응답을 돌려주는 모델을 구현해보도록 한다.

우선 서버 프로세스는 상대적으로 간단하다. 큐를 생성하고 요청 메시지를 수신하여 피보나치 값을 계산한 뒤, 그 응답을 클라이언트가 생성한 큐에 메시지로 송신하면 된다. 클라이언트가 생성한 큐 정보는 클라이언트의 요청시 property 값으로 전달된다.


    def on_request(ch, method, props, body):
        n=int(body)
        print "[.] fib(%s)" % (n,)
        response = fib(n)
        ch.basic_publish(exchange='', routing_key=props.reply_to, body=str(response))
        ch.basic_ack(delivery_tag=method.delivery_tag)

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='rpc_queue')
    channel.basic_consume(on_request, queue='rpc_queue')

    print "[x] Awaiting RPC requests"
    channel.start_consuming()


클라이언트는 다소 복잡해지는데, 요청 메시지를 송신하기 전에 우선 응답을 수신하기 위한 임시 큐를 생성하고 이 큐의 정보를 프로시저 요청 시에 서버에 송신해주어야 한다. 피보나치 수열을 얻는 프로시저를  call로 선언하고, 이 함수를 실행하면 응답 수신큐를 설정한 뒤 요청을 서버로 송신하고 결과를 응답받아 리턴해주도록 한다.


    def call(n):
        connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        channel = connection.channel()
        result = channel.queue_declare(exclusive=True)
        callback_queue = result.method.queue
        channel.basic_publish(exchange='', routing_key='rpc_queue',
                            properties=pika.BasicProperties(reply_to = callback_queue,), body=str(n))
        response = None
        def on_response(ch, method, props, body):
            nonlocal response
            response = body
        channel.basic_consume(on_response, no_ack=True, queue=callback_queue)
        while response is None:
            connection.process_data_events()
        return int(response)

    print "[x] Requesting fib(30)"
    response = call(30)
    print "[.] Got %r " % (response,)


하지만 위와 같이 구현하면 클라이언트가 매 RPC때마다 응답을 수신할 임시 큐를 생성하므로 매우 비효율적이다. 하나의 큐를 생성하여 모든 피보나치 프로시저 콜은 이 큐를 이용하도록 하는 방법이 보다 효율적인데, 이 경우 자신이 요청한 요청에 대한 응답이 왔는지 확인하는 절차가 필요하다. 요청과 응답과의 매칭을 위한 id를 correlation_id라 하고, 이 id 정보 역시 요청 시 property에 set하여 송신하고 응답 수신시에 확인하는 절차를 거치면 된다. 이와 같이 수정한 소스코드는 아래와 같다.


    # 서버
    def on_request(ch, method, props, body):
        n=int(body)
        print "[.] fib(%s)" % (n,)
        response = fib(n)
        ch.basic_publish(exchange='', routing_key=props.reply_to,
                               properties=pika.BasicProperties(correlation_id=props.correlation_id),
                               body=str(response))
        ch.basic_ack(delivery_tag=method.delivery_tag)

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='rpc_queue')
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(on_request, queue='rpc_queue')

    print "[x] Awaiting RPC requests"
    channel.start_consuming()


    # 클라이언트
    class FibonacciRpcClient(object):
        def __init__(self):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
            self.channel = self.connection.channel()
            result = self.channel.queue_declare(exclusive=True)
            self.callback_queue = result.method.queue
            self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)
            def on_response(self, ch, method, props, body):
                if self.corr_id == props.correlation_id:
                    self.response = body

        def call(self, n):
            self.response = None
            self.corr_id = str(uuid.uuid4())
            self.channel.basic_publish(exchange='', routing_key='rpc_queue',
                                    properties=pika.BasicProperties(reply_to = self.callback_queue, correlation_id = self.corr_id,),
                                    body=str(n))
            while self.response is None:
                self.connection.process_data_events()
            return int(self.response)

    fibonacci_rpc = FibonacciRpcClient()

    print "[x] Requesting fib(30)"
    response = fibonacci_rpc.call(30)
    print "[.] Got %r " % (response,)


* 출처 : www.rabbitmq.com