20. kombu和消息队列总结


消息队列是OpenStack的重要组成部分,自己之前翻译过一篇 rabbitmq教程 , 但是看nova代码时,依然很多地方云里雾里,感觉不太清晰;并且该教程基于pika库,而OpenStack 默认 是使用kombu连接rabbitmq服务器,因此自己重新了解了下kombu库,并总结。

该文档是结合文档、代码示例再加上自己的理解整理而成,可能存在不准确的地方,欢迎指正!

20.1. 术语

Producers

Producers给exchanges发送消息。

Exchanges

消息发送给exchages。交换机可以被命名,可以通过路由算法进行配置(注:个人理解是, 可以在声明交换机时,指定交换机名字和交换机类型,如 topic)。

交换机通过匹配消息的 routing_key 和 binding_key来转发消息,binding_key 是consumer 声明队列时与交换机的绑定关系。

Consumers

消费者声明一个队列、并和某个交换机绑定,然后从队列中接收消息!

Queues

队列接收发往交换机的消息,它由消费者声明。

注:在pika库中,队列可以重复声明,重复声明队列时,只有一个会被创建, 但是,不能重复声明类型不同的队列!

Routing keys

每个消息都有一个routing_key,对 routing_key 的解释取决于交换机类型。 AMQP 标准定义了四种交换机类型(注:topicfanoutdirectheader)。 还可以自定义类型。

  • direct : 消息的 routing_key 属性和消费者的 routing_key 相同才递交消息;

  • fanout : 总是递交消息,即使队列和交换机banding没有 routing_key;

  • topic : 通过某种语义匹配模式匹配消息的 routing_key;消息的 routing_key 可以通过点号 . 分割,还可以包括两种特殊字符:*, #* 可以匹配一个单词,# 可以匹配0个或多个单词。

    注:这段对topic交换机的描述来源于 kombu 官网,和 rabbitmq官网描述有一些出入。 rabbitmq官网中,topic交换机,消息的 routing_key 是确切的,然后 交换机和 和队列的 banding_key 可以包含该两个特殊字符。到底哪种描述是正确的,目前还不清楚。

20.2. 通过amqp收发消息模型

通过amqp 收发消息,上面的描述已经很清晰了。这里再简单重复下:

  • 消息从来不直接发送给队列,甚至 Producers 都可能不知道队列的存在。 消息是发送给交换机,给交换机发送消息时,需要指定消息的 routing_key 属性!
  • 交换机收到消息后,根据 交换机的类型,或直接发送给队列 (fanout), 或匹配消息的 routing_key 和 队列与交换机之间的 banding_key ; 而topic类型 交换机匹配时,具有一些额外的特性,可以根据一些特殊字符进行匹配。 如果匹配,则递交消息给队列!
  • Consumers 从队列取得消息;

即:消息发布者 Publisher 将 Message 发送给 Exchange 并且说明 Routing Key。 Exchange 负责根据 Message 的 Routing Key 进行路由,将 Message 正确地 转发给相应的 Message Queue。监听在 Message Queue 上的 Consumer 将会从 Queue 中 读取消息。Routing Key 是 Exchange 转发信息的依据,因此每个消息都有一个 Routing Key 表明可以接受消息的目的地址,而每个 Message Queue 都可以通过将自己想要接收的 Routing Key 告诉 Exchange 进行 binding,这样 Exchange 就可以将消息正确地转发给相应的 Message Queue。

20.3. 其他要点

待补充…

20.4. 代码示例

20.4.1. 示例一

公共文件 kombu_entity.py

#!/usr/bin/env python
# coding:utf-8

from kombu import Exchange, Queue

#定义了一个exchange
#task_exchange = Exchange('tasks', type='direct')
task_exchange = Exchange('tasks_fanout', type='fanout')

#在这里进行了exchange和queue的绑定,并且指定了这个queue的routing_key
task_queue = Queue('piap', task_exchange, routing_key='suo_piao')
#task_queue = Queue('piap', task_exchange)

消息发送端 kombu_send.py

#!/usr/bin/env python
# coding:utf-8

from kombu import Exchange, Queue
from kombu import Connection
from kombu.messaging import Producer
from kombu.transport.base import Message

from kombu_entity import task_exchange
#task_queue = Queue('piap', task_exchange, routing_key='suo_piao')


connection = Connection('amqp://guest:httc123@10.10.10.10:5672//')
channel = connection.channel()

message=Message(channel, body='Hello Kombu')

producer = Producer(channel, exchange=task_exchange)
producer.publish(message.body, routing_key='suo_piao')
#producer.publish(message.body)

消息接收端 kombu_recv.py

#!/usr/bin/env python
# coding:utf-8

from kombu import Connection
from kombu.messaging import Consumer
from kombu_entity import task_queue

import logging
logging.basicConfig(level=logging.DEBUG)

#connection = Connection("amqp://guest:httc123@10.10.10.10:5672/")
connection = Connection("amqp://guest:httc123@localhost:5672/")
channel = connection.channel()

def process_media(body, message):#body是某种格式的数据,message是一个Message对象,这两个参数必须提供
    print "recv: %s"%body
    message.ack()

# 定义回调函数的两种方式。
# 定义消费者,并定义回调函数!
#consumer = Consumer(channel, task_queue)
#consumer.register_callback(process_media)

# 也可以在定义消费者对象时直接传递回调参数。
consumer = Consumer(channel, task_queue, callbacks=[process_media])
consumer.consume()

while True:
    try:
        connection.drain_events()
    except KeyboardInterrupt:
        #connection.release()
        print "stopped"
        break

Error

这个示例程序有一点问题:在 kombu_entity.py 中定义了 fanout 交换机,然后开启 两个 kombu_recv.py 进程,但是然后运行 kombu_send 发送消息。这里两个接收进程 并没有都收到消息,而是使用轮转分发的方式。待解决!

20.4.2. 示例二

该示例来源于 kombu 官网。可是,我在 ubuntu-14.04 Linux 下运行该示例代码,遇到一些小问题, 经过修改,可用正确运行。主要更改地方如下:

  • 取消更改相对导入;
  • client 端注释掉压缩参数;

公共文件 queues.py

from kombu import Exchange, Queue

task_exchange = Exchange('tasks', type='direct')
task_queues = [Queue('hipri', task_exchange, routing_key='hipri'),
               Queue('midpri', task_exchange, routing_key='midpri'),
               Queue('lopri', task_exchange, routing_key='lopri')]

消息接收端 worker.py

#!/usr/bin/env python
from kombu.mixins import ConsumerMixin
from kombu.log import get_logger
from kombu.utils import kwdict, reprcall

#from .queues import task_queues
from queues import task_queues

logger = get_logger(__name__)


class Worker(ConsumerMixin):

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=task_queues,
                         accept=['pickle', 'json'],
                         callbacks=[self.process_task])]

    def process_task(self, body, message):
        fun = body['fun']
        args = body['args']
        kwargs = body['kwargs']
        logger.info('Got task: %s', reprcall(fun.__name__, args, kwargs))
        try:
            fun(*args, **kwdict(kwargs))
        except Exception as exc:
            logger.error('task raised exception: %r', exc)
        message.ack()

if __name__ == '__main__':
    from kombu import Connection
    from kombu.utils.debug import setup_logging
    # setup root logger
    setup_logging(loglevel='INFO', loggers=[''])

    with Connection('amqp://guest:httc123@localhost:5672//') as conn:
        try:
            worker = Worker(conn)
            worker.run()
        except KeyboardInterrupt:
            print('bye bye')

消息发送端 client.py

#!/usr/bin/env python

from kombu.pools import producers

#from .queues import task_exchange
from queues import task_exchange

priority_to_routing_key = {'high': 'hipri',
                           'mid': 'midpri',
                           'low': 'lopri'}


def send_as_task(connection, fun, args=(), kwargs={}, priority='mid'):
    payload = {'fun': fun, 'args': args, 'kwargs': kwargs}
    routing_key = priority_to_routing_key[priority]

    with producers[connection].acquire(block=True) as producer:
        producer.publish(payload,
                         serializer='pickle',
                         #compression='bzip2',
                         exchange=task_exchange,
                         declare=[task_exchange],
                         routing_key=routing_key)

if __name__ == '__main__':
    from kombu import Connection
    #from .tasks import hello_task
    from tasks import hello_task

    connection = Connection('amqp://guest:httc123@localhost:5672//')
    send_as_task(connection, fun=hello_task, args=('Kombu', ), kwargs={},
                 priority='high')

tasks.py 文件

def hello_task(who="world"):
    print("Hello %s" % (who, ))

以下是运行结果:

../_images/kombu_recv.png

kombu 运行结果

另外需要注意的是:我尝试把 hello_task 函数放在 client.py 文件中定义,结果 运行时,总是提示如下错误。目前还不知道原因,待探讨。

root@allinone-v2:/smbshare/oslo-test/msg# ./worker.py
Connected to amqp://guest@localhost:5672//
Can't decode message body: AttributeError("'module' object has no attribute 'hello_task'",) (type:'application/x-python-serialize' encoding:'binary' raw:'<read-only buffer ptr 0x1bf3c9f, size 71 at 0x7f5bcf2b02b0>'')

在 kombu 的基础上,后续会继续熟悉 oslo.messaging。另外,Python导入问题遇到多次,也会 抽空彻底熟悉下。