Think Deep,Work Lean

Rabbitmq官网阅读之第一部份

Posted on By zack

前言

Rabbitmq是个消息中间件(broker),broker是代理的意思,也可译为消息代理:它接收并转发消息;你可以理解它为一个邮局:当你想寄信时,你要把它行放进邮筒,这时你肯定知道邮递员最终会把信件交到收件人手上;类似地,RabbitMQ就是这个邮筒、邮局和一个邮递员。RabbitMQ和邮局的区别就是它的无纸化,相反它会接收、存储并转发消息。RabbitMQ使用很多专业术语,如生产者、消费者、队列

生产者

用pyton的Pika模块来实现生产者,我们用send.py来发送一条消息到queue里面,首先们要做的事情就是与RabbitMQ server建立连接

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

这就建立起了连接,从本机连接到了代理,因为我们用的时localhost,若是其它的请用IP替代;
然后,在我们发送消息之前,我们要确保消费者队列存在,如果我们将消息发送到发送至空接收端时,该消息将会被丢弃。现在我们来创建一个名为hello的queue,然后通过个queue来传送消息

channel.queue_declare(queue='hello')

至此,我们已经做好发送消息的准备了,我们的第一条消息,将会包含一个Hello World字条串,然后我们把发送到hello队列里面。 在RabbitMQ永远不可能直接被发送到queue里面去,它必须先经过exchange
我们现在只需要知道怎样来使用默认的空字符串exchange,这个exchange比较特殊,它决定了哪个消息该进哪个queue,queue的名字需要在routing_key参数里面来定义。

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")

在退出程序之前,我们要确保网络缓存已经被清空了,同时我们的消息确实已经投递到了mq,只有这样,我们才能关闭我们的连接

connection.close()

注意,如果这是你第一次使用mq,而且你一直无法成功发送消息,不要抓狂,记得,rabbitmq需要足够多的磁盘空间,默认是200M,这个值也可以在配置里改,disk_free_limit,关于RabbitMQ配置更改,请参阅点我

消息者

我们的第二个程序是receive.py,它将会从队列里拿消息,并把它打印到屏幕上
同样的,我们首先要连接到RabbitMQ server
然后,跟以前一样,我们要保证queue存在,用queue_declare来创建一个queue

channel.queue_declare(queue='hello')

你可能会问,为什么我们要重新创建这个queue,我们不是已经在前面的代码里面创建过了queue了吗?
其实如果我们能确保这个queue确实存在的,那我们其实没必要来重复创建;如:send.py程序和我们的receive.py这两个程序,我们不确定哪个程序会先启动,在这种场景下,在两个程序中重复定义queue是最佳实践。

用rabbitmqctl命令来列举我们的queue

sudo rabbitmqctl list_queues

从queue里取消息会稍微复杂些,它通过将回调函数订阅到队列来工作,简而言之,就是它实现一个回调函数,这个函数会处理queue里面的消息数据。如:

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

接下来,我们要告诉RabbitMQ,回调函数要从我们的名为hello的queue里面接收消息了。

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

为了让以上命令能顺利执行,我们必须确保我们要订阅的queue是存在的,通过以上代码我们能确认这点,因为我们通过queue_declare创建了queue
no_ack参数将在后续来定义;
最后,我们要进入一个永远都不会结束的循环,它会会一直等待接收数据并且在有必要的情况下进行回调

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

CODE

  • send.py
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()


channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
  • receive.py
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()


channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

现在可以在终端运行了,首先启动消费者

python receive.py

接着来创建生产者

python send.py