KS blog

killins.egloos.com

포토로그



RabbitMQ with Python #2/6 by KillinS

일반적으로 MQ를 사용하는 이유는 즉각적인 응답이 불가능한, 또는 불필요한 작업들에 대하여 (async) 각각의 작업들을 처리하는 프로세스들에 균등하게 작업을 분배하는, 말그대로 큐잉을 위함이 대부분일 것이다. 예제 #1은 요청에 대한 즉시 응답을 구현해보았으니 이런 MQ의 기본 기능을 덧붙여 보도록 한다.

우선 테스트를 위해 처리하는 작업에 다소 시간이 걸리도록 프로그램을 변경한다. producer는 커맨드 라인으로 입력받은 "."을 포함하는 문자열을 메시지로 보내고, consumer는 이것을 받아 "."의 개수 만큼의 초를 대기한 뒤 수신한 문자열을 표시하는 방식으로 작업을 수행하도록 한다. 이와 같이 구현하기 위해 send.py와 receive.py를 각각 아래와 같이 변경한다.

    #send.py
    import sys
    message = ' '.join(sys.argv[1:]) or "Hello World..."
    channel.basic_publish(exchange='', routing_key='hello', body=message)
    print "[x] Send %r" % message

    #receive.py
    import time
    def callback(ch, method, properties, body):
        print "[x] Received %r" % (body,)
        time.sleep(body.count('.'))
        print "[x] Done"



1. 복수개의 consumer

하나의 큐를 여러개의 consumer에서 처리하도록 한다. 사실 이것을 위해 앞서 작성한 소스코드를 변경할 필요는 없고, 단지 consumer 프로세스를 복수개 기동하기만 하면 브로커에서 알아서 메시지가 들어오는 순서대로 round-robin 방식으로 적절하게 프로세스별로 분배하여 메시지를 전달해준다. 터미널을 3개 이상 띄우고 receive.py를 2개 이상 기동시킨 뒤 send.py를 몇번 실행해보면 차례대로 여러개의 receive 프로세스에 메시지가 전달됨을 확인할 수 있다.



2. 메시지 수신 확인

브로커가 메시지를 consumer에게 전달한 뒤 메시지가 정상적으로 처리되기 전에 consumer가 어떤 문제로 인해 crash 되었다면, 해당 메시지는 영영 처리되지 않은 상태로 잃어버리게 된다. RabbitMQ에서는 이러한 상황을 방지하기 위해 consumer가 메시지를 정상적으로 수신하여 처리까지 완료되었는지를 확인하고, 처리되지 못했을 경우 동일한 메시지를 다른 consumer에게 재송신하는 기능을 갖고 있다. consumer에서 메시지를 처리할 callback 함수를 브로커에 알려줄 때 이 기능의 사용 여부도 no_ack 파라미터로 함께 알려주면 된다. 브로커에서 메시지가 정상처리되었다는것을 알아야 하므로, 콜백함수에서는 정상 처리 여부를 전달하는 부분이 추가되어야 한다. 예제의 receive.py 코드는 아래와 같이 변경된다.

    def callback(ch, method, properties, body):
        print "[x] Received %r" % (body,)
        time.sleep(body.count('.'))
        print "[x] Done"
        ch.basic_ack(delivery_tag=method.delivery_tag)
    channel.basic_consume(callback, queue='hello')

한가지 주의할것은, acknowledge 기능에 timeout이 없다는 것이다. 따라서 브로커는 consumer가 특정 메시지를 수신한 후 정상 처리했다는 응답이 올때까지 무한정 기다리게되고, 이것은 큰 문제를 불러일으킬 소지가 있다. 더군다나 위 소스코드를 보면 알겠지만 기본적으로 no_ack 파라미터를 주지 않으면 RabbitMQ는 acknowledge 기능을 enable 하도록 되어있다. 따라서 RabbitMQ를 사용할 때는 항상 이 기능에 유의하도록 한다.



3. 메시지 Durability

위에 언급된 방법을 사용하면 consumer가 crash 됨으로 인한 사고는 막을 수 있지만, 기본적으로 RabbitMQ가 메모리를 사용하기 때문에 RabbitMQ 자체의 crash로 인한 메시지 및 큐의 손실은 방지할 수 없다. RabbitMQ에서는 이런 사고를 막기 위해 큐와 메시지를 디스크에 저장하는 기능을 제공한다. 큐를 선언할 때 durable 파라미터를 True로 주면 해당 큐의 전문들을 메모리가 아닌 디스크에 쓰는것이 가능해지고, producer가 메시지를 송신할 때 delivery_mode를 2로 주면 해당 메시지는 디스크에 쓰여지게 된다. 당연히 메시지를 디스크에 쓰기 위해서는 우선 해당 큐가 durable 하게 선언되어 있어야 한다.

Durable 큐를 만들기 위해서는 큐를 선언하는 소스 코드를 아래와 같이 변경하면 된다.

    channel.queue_declare(queue='hello', durable=True)

하지만 위와 같이 코드를 변경하고 프로그램을 실행하면 오류가 발생하는데, 이것은 한번 생성한 큐의 속성은 변경 불가능하기 때문이다. 이미 앞의 예제에서 durable하지 않은 hello라는 이름의 큐를 생성했으므로, durable한 새로운 이름의 큐를 생성하거나 hello 큐를 제거하고 다시 생성해야 한다. 여기서는 새로운 큐를 생성하도록 한다.

    channel.queue_declare(queue='task_queue', durable=True)

그리고 durable 메시지를 보내기 위해서는 메시지 송신 시 아래와 같이 호출하면 된다.

    channel.basic_publish(exchange='', routing_key='task_queue', body=message'
                properties=pika.BasicProperties(delivery_mode=2,))

물론 위와 같은 방법이 100% 큐 및 메시지 손실을 방지할 수 있는것은 아니다. 브로커가 메시지를 수신하여 저장하기까지의 시간은 무방비 상태일 수 밖에 없고, 또한 RabbitMQ가 모든 메시지에 대해 fsync를 사용하지 않기 때문에 캐시에서 디스크에 쓰기까지의 시간도 위험하다. 좀 더 강력한 메시지 유실 방지를 위해서는 트랜잭션을 사용하는 방법도 검토할 수 있다.



4. 공평한 메시지 분배

앞에서 살펴본것처럼 RabbitMQ는 메시지를 복수의 consumer에 분배할때 기본적으로 라운드 로빈 방식을 사용하는데, 사실 이것은 효율적인 방법이 아니다. 처리 시간이 오래 걸리는 메시지들이 특정 큐에 몰릴 가능성도 존재하기 때문에, 이것을 사전에 방지하기 위해서는 consumer가 메시지를 처리 중인지를 판단하여 처리중일 경우 다른 consumer에서 메시지를 fetch하는 방법을 사용해야 한다. 이것을 위해서는 consumer에서 브로커와의 연결을 설정할 때 basic_qos 함수를 이용하여 prefetch_count 값을 설정해주면 된다. 만약 라운드 로빈에 의한 다음 consumer가 prefetch_count만큼의 메시지를 수신하여 처리중이라면, 브로커는 다음 메시지를 한가한 consumer에 송신하고, 모든 consumer가 busy한 상태라면 이용 가능한 consumer가 생길때까지 큐에 계속 메시지를 보관하게 된다. 이것은 메시지의 처리 여부를 브로커가 알아야 하므로 acknowledge 기능과 함께 사용해야한다. 소스코드는 아래와 같다.

    channel.basic_qos(prefetch_count=1)


위에서 나온 모든 기능을 추가한 소스코드는 아래와 같다.

    #send.py
    #!/usr/bin/env python
    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='task_queue', durable=True)

    message = ' '.join(sys.argv[1:]) or "Hello World!"
    channel.basic_publish(exchange='', routing_key='task_queue', body=message,
                      properties=pika.BasicProperties(delivery_mode = 2,))
    print " [x] Sent %r" % (message,)
    connection.close()

    #receive.py
    #!/usr/bin/env python
    import pika
    import time

    def callback(ch, method, properties, body):
        print " [x] Received %r" % (body,)
        time.sleep( body.count('.') )
        print " [x] Done"
        ch.basic_ack(delivery_tag = method.delivery_tag)

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='task_queue', durable=True)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback,queue='task_queue')

    print ' [*] Waiting for messages. To exit press CTRL+C'
    channel.start_consuming()



* 출처 : www.rabbitmq.com