KS blog

killins.egloos.com

포토로그



RabbitMQ with Python #4/6 by KillinS

MQ의 핵심 기능은 여러가지 종류의 메시지를 미리 정의된 룰에 따라 적절한 큐에 보내주는 것이다. AMQP는 이런 역할을 하기 위해 exchange와 type이라는 개념을 정의하고 있고, RabbitMQ도 당연히 해당 기능이 구현되어 있다. 이번에는 앞의 로그 예제를 로그 유형에 따라 각각 다른 큐에 전송하여 처리하도록 변경해본다.


1. Direct Exchange 와 binding

AMQP에서는 메시지를 라우팅하는 규칙을 총 4가지 타입으로 정의한다. Direct, topic, header, fanout인데, 앞서 사용한 fanout은 exchange에 연결된 모든 큐에 메시지를 송신하는 규칙이었다. 작성할 예제는 로그 유형에 따라 특정 큐에 메시지를 송신할 것이므로, 특정 메시지를 특정 큐에 전달하는 direct type을 사용하도록 한다.
direct type은 exchange에서 수신한 메시지의 routing key를 보고 어떤 큐에 송신해야할지를 판단하는 규칙이다. 이 때 각 큐들은 어떤 routing key를 갖는 메시지를 수신할 지 정의할 수 있는데, 이것을 binding이라고 한다. producer가 메시지를 송신할 때 routing key를 특정 값으로 set하면, exchange에서는 해당 routing key에 따라 binding된 적절한 큐로 메시지를 송신하게 된다.
당연히 하나의 메시지는 여러개의 큐에 라우팅될 수 있고, 하나의 큐는 여러개의 routing key 메시지를 바인딩할 수 있다.

우선 direct type의 exchange를 아래와 같이 선언한다.

    channel.exchange_declare(exchange='logs_direct', type='direct')

그리고 consumer는 특정 routing key에 대한 메시지만 수신하도록 큐를 설정한다. 예제에서 수신할 로그 유형은 커맨드라인의 인자로 입력받도록 한다.

    severities = sys.argv[1:]
    if not severities:
        print >> sys.stderr, "Error : input severities"
        sys.exit(1)
    channel.exchange_declare(exchange='logs_direct', type='direct')
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    for severity in severities:
        channel.queue_bind(exchange='logs_direct', queue=queue_name, routing_key=severity)



2. 메시지 송신

특정 메시지를 특정 큐에 송신하고자 하면 exchange에 메시지를 송신할 때 routing key를 set하여 송신하면 된다. 예제에서는 로그 유형이 routing key가 되고, 커맨드라인 인자로 입력받도록 한다. 입력이 없을 경우 기본 유형은 "info"를 사용한다.

    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    ....
    channel.basic_publish(exchange='logs_direct', routing_key=severity, body=message)


3. 소스코드

Consumer인 consumer_direct.py는 아래와 같다.

    import pika
    import sys

    def callback(ch, method, properties, body):
        print "[x] %r" % (body,)

    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    severities = sys.argv[1:]
    if not severities:
        print >> sys.stderr, "Error : input severities"
        sys.exit(1)

    channel.exchange_declare(exchange='logs_direct', type='direct')
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    for severity in severities:
        channel.queue_bind(exchange='logs_direct', queue=queue_name, routing_key=severity)
    channel.basic_consume(callback, queue=queue_name, no_ack=True)

    print "[*] Waiting for logs. To exit press CTRL+C"
    channel.start_consuming()

Producer인 producer_direct.py는 아래와 같다.

    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='logs_direct', type='direct')

    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    message = ' '.join(sys.argv[2:]) or "Logs..."
    channel.basic_publish(exchange='logs_direct', routing_key=severity, body=message)
    print "[x] Send %r" % (message,)
    connection.close()


4. 테스트

먼저 두개의 consumer를 기동한다. 하나는 로그유형 info와 error를 모두 수신할 consumer이다

    $ python consumer_direct.py info error

나머지 하나는 로그유형 error만 수신할 consumer이다.

    $ python consumer_direct.py error


이제 로그 유형이 info인 메시지를 송신하면 첫번째 프로세스에서만 로그 메시지를 출력하는것을 볼 수 있다.

    $ python producer_direct.py info InfoMessage

로그 유형이 error인 메시지를 송신하면 두개 프로세스 모두에서 로그 메시지를 출력하는것을 확인할 수 있다.

    $ python producer_direct.py error ErrorMessage



* 출처 : www.rabbitmq.com