前言
Rabbitmq是个消息中间件(broker),broker是代理的意思,也可译为消息代理:它接收并转发消息;你可以理解它为一个邮局:当你想寄信时,你要把它行放进邮筒,这时你肯定知道邮递员最终会把信件交到收件人手上;类似地,RabbitMQ就是这个邮筒、邮局和一个邮递员。RabbitMQ和邮局的区别就是它的无纸化,相反它会接收、存储并转发消息。RabbitMQ使用很多专业术语,如生产者、消费者、队列
生产者
用pyton的Pika模块来实现生产者,我们用send.py来发送一条消息到queue里面,首先们要做的事情就是与RabbitMQ server建立连接
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
这就建立起了连接,从本机连接到了代理,因为我们用的时localhost,若是其它的请用IP替代;
然后,在我们发送消息之前,我们要确保消费者队列存在,如果我们将消息发送到发送至空接收端时,该消息将会被丢弃。现在我们来创建一个名为hello的queue,然后通过个queue来传送消息
channel.queue_declare(queue='hello')
至此,我们已经做好发送消息的准备了,我们的第一条消息,将会包含一个Hello World字条串,然后我们把发送到hello队列里面。
在RabbitMQ永远不可能直接被发送到queue里面去,它必须先经过exchange
我们现在只需要知道怎样来使用默认的空字符串exchange,这个exchange比较特殊,它决定了哪个消息该进哪个queue,queue的名字需要在routing_key参数里面来定义。
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
在退出程序之前,我们要确保网络缓存已经被清空了,同时我们的消息确实已经投递到了mq,只有这样,我们才能关闭我们的连接
connection.close()
注意,如果这是你第一次使用mq,而且你一直无法成功发送消息,不要抓狂,记得,rabbitmq需要足够多的磁盘空间,默认是200M,这个值也可以在配置里改,disk_free_limit,关于RabbitMQ配置更改,请参阅点我
消息者
我们的第二个程序是receive.py,它将会从队列里拿消息,并把它打印到屏幕上
同样的,我们首先要连接到RabbitMQ server
然后,跟以前一样,我们要保证queue存在,用queue_declare来创建一个queue
channel.queue_declare(queue='hello')
你可能会问,为什么我们要重新创建这个queue,我们不是已经在前面的代码里面创建过了queue了吗?
其实如果我们能确保这个queue确实存在的,那我们其实没必要来重复创建;如:send.py程序和我们的receive.py这两个程序,我们不确定哪个程序会先启动,在这种场景下,在两个程序中重复定义queue是最佳实践。
用rabbitmqctl命令来列举我们的queue
sudo rabbitmqctl list_queues
从queue里取消息会稍微复杂些,它通过将回调函数订阅到队列来工作,简而言之,就是它实现一个回调函数,这个函数会处理queue里面的消息数据。如:
def callback(ch, method, properties, body): print(" [x] Received %r" % body)
接下来,我们要告诉RabbitMQ,回调函数要从我们的名为hello的queue里面接收消息了。
channel.basic_consume(callback, queue='hello', no_ack=True)
为了让以上命令能顺利执行,我们必须确保我们要订阅的queue是存在的,通过以上代码我们能确认这点,因为我们通过queue_declare创建了queue
no_ack
参数将在后续来定义;
最后,我们要进入一个永远都不会结束的循环,它会会一直等待接收数据并且在有必要的情况下进行回调print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
CODE
- send.py
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
- receive.py
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
现在可以在终端运行了,首先启动消费者
python receive.py
接着来创建生产者
python send.py