KS blog

killins.egloos.com

포토로그



RabbitMQ with Python #3/6 by KillinS

RabbitMQ는 AMQP를 구현한것이기 때문에 AMQP에서 정의된 exchange type을 지원한다. 그 중 하나인 fanout을 이용해서 exchang를 생성하고, publish/subscribe 모델을 간단하게 작성해본다. (AMQP 관련 내용은 여기를 참고)
예제로 작성할 pub/sub 모델은 로그이다. publisher가 로그 메시지를 브로커에 보내면, 브로커는 해당 메시지를 모든 subscriber에 송신하고 각 subscriber들은 이 로그를 화면에 뿌려주거나 파일에 기록하는 일을 수행하도록 한다.



1. Exchange 생성

지금까지는 producer에서 메시지를 보내는 메소드로 channel.basic_publish를 사용할 때, exchange 인자를 ''로 주었다. 이것은 별도의 exchange를 생성하지 않고 기본 exchange를 사용하겠다는 것이고, 여기에 별도의 exchange 명을 써준다면 해당 exchange를 사용할 수 있다. 물론 그 이전에 사용할 exchange를 선언해 주어야 한다.
exchange를 선언할 때는 exchange의 이름과 type을 인자로 사용하여 생성하면 된다. exchange 선언 및 해당 exchange에 메시지를 송신하기 위한 코드는 다음과 같다.

    channel.exchange_declare(exchange='logs', type='fanout')
    ...
    channel.basic_publish(exchange='logs', routing_key='log', body=message)

여기서 routing_key를 log로 지정했는데, exchange의 type이 fanout이기 때문에 사실상 routing_key는 무시된다.
예제에서는 exchange를 생성하는 코드도 produer와 consumer 양쪽에 모두 넣어줄것인데, 이렇게 하면 queue를 생성할때와 마찬가지로 기존에 같은 이름의 exchange가 존재하면 그것을 사용하고 존재하지 않는다면 신규 exchange를 생성하게 된다.



2. 임시 큐 생성 및 바인딩

새로 생성한 exchange의 type이 fanout이기 때문에, 이 exchange와 연결되는 모든 큐에는 exchange가 수신한 메시지가 전달된다. 로그 메시지를 수신하여 그것을 여러가지 형태로 기록하는 subscriber 프로세스 여러개를 기동할 것이므로, 이번에는 프로세스가 시작되면 자동으로 큐를 생성하여 exchange에 바인딩하고, 프로세스가 종료되면 큐를 제거하는 방식을 사용해본다.
먼저 큐를 생성하는 코드는 아래와 같다.

    result = channel.queue_declare()

위와 같이 큐 이름을 주지 않고 큐를 생성하면 브로커가 임의로 이름을 생성하게 되고, 이것은 result.method.queue로 접근하여 알 수 있다. 여기서 프로세스가 종료되면, 즉 subscriber가 연결을 끊으면 큐를 자동으로 제거하려면 큐를 생성할 때 아래와 같이 exclusive 인자를 주면 된다.

    result = channel.queue_declare(exclusive=True)

이렇게 생성한 큐를 exchange에 아래와 같이 binding 해주면 된다.

    channel.queue_bind(exchange='logs', queue=result.method.queue)



3. 소스코드

전체 publisher.py 소스코드는 아래와 같다.

    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='logs', type='fanout')

    message = ' '.join(sys.argv[1:]) or "Logs..."
    channel.basic_publish(exchange='logs', routing_key='key', body=message)
    print "[x] Send %r" % (message,)
    connection.close()


전체 subscriber.py 소스코드는 아래와 같다.

    import pika

    def callback(ch, method, properties, body):
        print "[x] %r" % (body,)

    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='logs', type='fanout')
    result = channel.queue_declare(exclusive=True)
    channel.queue_bind(exchange='logs', queue=result.method.queue)
    channel.basic_consume(callback, queue=result.method.queue, no_ack=True)

    print "[*] Waiting for logs. To exit press CTRL+C"
    channel.start_consuming()


4. 테스트

이제 복수개의 subscriber를 기동시키고, publisher로 로그 메시지를 보내보면 모든 subscriber에서 해당 메시지를 출력함을 알 수 있다. 아래와 같이 파일에 기록하도록 하여 테스트해보는것도 좋다.

    python subscriber.py > test.log



* 출처 : www.rabbitmq.com