Fork me on GitHub
随笔 - 215  文章 - 13  trackbacks - 0
<2016年3月>
282912345
6789101112
13141516171819
20212223242526
272829303112
3456789


专注即时通讯及网游服务端编程
------------------------------------
Openresty 官方模块
Openresty 标准模块(Opm)
Openresty 三方模块
------------------------------------
本博收藏大部分文章为转载,并在文章开头给出了原文出处,如有再转,敬请保留相关信息,这是大家对原创作者劳动成果的自觉尊重!!如为您带来不便,请于本博下留言,谢谢配合。

常用链接

留言簿(1)

随笔分类

随笔档案

相册

Awesome

Blog

Book

GitHub

Link

搜索

  •  

积分与排名

  • 积分 - 211162
  • 排名 - 118

最新评论

阅读排行榜

http://blog.csdn.net/zyz511919766/article/details/41946521
之前的几篇文章介绍了一下RabbitMQ的概念以及环境的搭建和配置,有了RabbitMQ环境就可以基于其实现一些特殊的任务场景了。RabbitMQ官方有个很好的Tutorials基本覆盖了RabbitMQ的各中常见应用场景,现以代码加注释的方式以其Python客户端pika为例简单介绍如下。更详尽的信息可参阅:http://www.rabbitmq.com/getstarted.html 。

之前的几篇文章:
RabbitMQ概念及环境搭建(一)单节点安装与配置
RabbitMQ概念及环境搭建(二)RabbitMQ Broker管理
RabbitMQ概念及环境搭建(三)RabbitMQ cluster
RabbitMQ概念及环境搭建(四)RabbitMQ High Availability
RabbitMQ概念及环境搭建(五)与web的整合


RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收message。

应用场景1-“Hello Word”

一个P向queue发送一个message,一个C从该queue接收message并打印。

send.py 
producer,连接至RabbitMQ Server,声明队列,发送message,关闭连接,退出。

[python] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. #!/usr/bin/python27  
  2. #encoding:utf8  
  3. import pika  
  4.   
  5. #与RabbitMQ Server建立连接  
  6. #连接到的broker在本机-localhost上  
  7. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  8.         host='localhost'))  
  9. channel = connection.channel()  
  10.   
  11. #声明队列以向其发送消息消息  
  12. #向不存在的位置发送消息时RabbitMQ将消息丢弃  
  13. #queue='hello'指定队列名字  
  14. channel.queue_declare(queue='hello', durable=True)  
  15.   
  16. #message不能直接发送给queue,需经exchange到达queue,此处使用以空字符串标识的默认的exchange  
  17. #使用默认exchange时允许通过routing_key明确指定message将被发送给哪个queue  
  18. #body参数指定了要发送的message内容  
  19. channel.basic_publish(exchange='',  
  20.                       routing_key='hello',  
  21.                       body='Hello World!')  
  22.   
  23. print " [x] Sent 'Hello World!'"  
  24.   
  25. #关闭与RabbitMq Server间的连接  
  26. connection.close()  
receive.py 
consumer,连接至RabbitMQ Server,声明队列,接收消息并进行处理这里为打印出消息,退出。

[python] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. #!/usr/bin/env python  
  2. #encoding:utf8  
  3. import pika  
  4.   
  5. #建立到达RabbitMQ Server的connection  
  6. #此处RabbitMQ Server位于本机-localhost  
  7. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  8.         host='localhost'))  
  9. channel = connection.channel()  
  10.   
  11. #声明queue,确认要从中接收message的queue  
  12. #queue_declare函数是幂等的,可运行多次,但只会创建一次  
  13. #若可以确信queue是已存在的,则此处可省略该声明,如producer已经生成了该queue  
  14. #但在producer和consumer中重复声明queue是一个好的习惯  
  15. channel.queue_declare(queue='hello')  
  16.   
  17. print ' [*] Waiting for messages. To exit press CTRL+C'  
  18.   
  19. #定义回调函数  
  20. #一旦从queue中接收到一个message回调函数将被调用  
  21. #ch:channel  
  22. #method:  
  23. #properties:  
  24. #body:message  
  25. def callback(ch, method, properties, body):  
  26.     print " [x] Received %r" % (body,)  
  27.   
  28. #从queue接收message的参数设置  
  29. #包括从哪个queue接收message,用于处理message的callback,是否要确认message  
  30. #默认情况下是要对消息进行确认的,以防止消息丢失。  
  31. #此处将no_ack明确指明为True,不对消息进行确认。  
  32. channel.basic_consume(callback,  
  33.                       queue='hello',  
  34.                       no_ack=True)  
  35.   
  36. #开始循环从queue中接收message并使用callback进行处理  
  37. channel.start_consuming()  
测试

[plain] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. python send.py  
  2. python receive.py  

应用场景2-work queues

将耗时的消息处理通过队列分配给多个consumer来处理,我们称此处的consumer为worker,我们将此处的queue称为Task Queue,其目的是为了避免资源密集型的task的同步处理,也即立即处理task并等待完成。相反,调度task使其稍后被处理。也即把task封装进message并发送到task queue,worker进程在后台运行,从task queue取出task并执行job,若运行了多个worker,则task可在多个worker间分配。


new_task.py
建立连接,声明队列,发送可以模拟耗时任务的message,断开连接、退出。

[python] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. #!/usr/bin/env python  
  2. #encoding:utf8  
  3. import pika  
  4. import sys  
  5.   
  6. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  7.         host='localhost'))  
  8. channel = connection.channel()  
  9.   
  10. #仅仅对message进行确认不能保证message不丢失,比如RabbitMQ崩溃了queue就会丢失  
  11. #因此还需使用durable=True声明queue是持久化的,这样即便Rabb崩溃了重启后queue仍然存在  
  12. channel.queue_declare(queue='task_queue', durable=True)  
  13.   
  14. #从命令行构造将要发送的message  
  15. message = ' '.join(sys.argv[1:]) or "Hello World!"  
  16.   
  17. #除了要声明queue是持久化的外,还需声明message是持久化的  
  18. #basic_publish的properties参数指定message的属性  
  19. #此处pika.BasicProperties中的delivery_mode=2指明message为持久的  
  20. #这样一来RabbitMQ崩溃重启后queue仍然存在其中的message也仍然存在  
  21. #需注意的是将message标记为持久的并不能完全保证message不丢失,因为  
  22. #从RabbitMQ接收到message到将其存储到disk仍需一段时间,若此时RabbitMQ崩溃则message会丢失  
  23. #况且RabbitMQ不会对每条message做fsync动作  
  24. #可通过publisher confirms实现更强壮的持久性保证  
  25. channel.basic_publish(exchange='',  
  26.                       routing_key='task_queue',  
  27.                       body=message,  
  28.                       properties=pika.BasicProperties(  
  29.                          delivery_mode = 2# make message persistent  
  30.                       ))  
  31. print " [x] Sent %r" % (message,)  
  32. connection.close()  
worker.py
建立连接,声明队列,不断的接收message,处理任务,进行确认。

[python] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. #!/usr/bin/env python  
  2. #encoding:utf8  
  3. import pika  
  4. import time  
  5.   
  6. #默认情况RabbirMQ将message以round-robin方式发送给下一个consumer  
  7. #每个consumer接收到的平均message量是一样的  
  8. #可以同时运行两个或三个该程序进行测试  
  9.   
  10. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  11.         host='localhost'))  
  12. channel = connection.channel()  
  13.   
  14. #仅仅对message进行确认不能保证message不丢失,比如RabbitMQ崩溃了  
  15. #还需使用durable=True声明queue是持久化的,这样即便Rabb崩溃了重启后queue仍然存在其中的message不会丢失  
  16. #RabbitMQ中不允许使用不同的参数定义同名queue  
  17. channel.queue_declare(queue='task_queue', durable=True)  
  18.   
  19. print ' [*] Waiting for messages. To exit press CTRL+C'  
  20.   
  21. #回调函数,函数体模拟耗时的任务处理:以message中'.'的数量表示sleep的秒数  
  22. def callback(ch, method, properties, body):  
  23.     print " [x] Received %r" % (body,)  
  24.     time.sleep( body.count('.') )  
  25.     print " [x] Done"  
  26.     #对message进行确认  
  27.     ch.basic_ack(delivery_tag = method.delivery_tag)  
  28.   
  29. #若存在多个consumer每个consumer的负载可能不同,有些处理的快有些处理的慢  
  30. #RabbitMQ并不管这些,只是简单的以round-robin的方式分配message  
  31. #这可能造成某些consumer积压很多任务处理不完而一些consumer长期处于饥饿状态  
  32. #可以使用prefetch_count=1的basic_qos方法可告知RabbitMQ只有在consumer处理并确认了上一个message后才分配新的message给他  
  33. #否则分给另一个空闲的consumer  
  34. channel.basic_qos(prefetch_count=1)  
  35.   
  36. #这里移除了no_ack=True这个参数,也即需要对message进行确认(默认行为)  
  37. #否则consumer在偶然down后其正在处理和分配到该consumer还未处理的message可能发生丢失  
  38. #因为此时RabbitMQ在发送完message后立即从内存删除该message  
  39. #假如没有设置no_ack=True则consumer在偶然down掉后其正在处理和分配至该consumer但还未来得及处理的message会重新分配到其他consumer  
  40. #没有设置no_ack=True则consumer在收到message后会向RabbitMQ反馈已收到并处理了message告诉RabbitMQ可以删除该message  
  41. #RabbitMQ中没有超时的概念,只有在consumer down掉后重新分发message  
  42. channel.basic_consume(callback,  
  43.                       queue='task_queue')  
  44.   
  45. channel.start_consuming()  

测试

[plain] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. python new_task.py "A very hard task which takes two seconds.."  
  2. python worker.py  

应用场景3-Publish/Subscribe

在应用场景2中一个message(task)仅被传递给了一个comsumer(worker)。现在我们设法将一个message传递给多个consumer。这种模式被称为publish/subscribe。此处以一个简单的日志系统为例进行说明。该系统包含一个log发送程序和一个log接收并打印的程序。由log发送者发送到queue的消息可以被所有运行的log接收者接收。因此,我们可以运行一个log接收者直接在屏幕上显示log,同时运行另一个log接收者将log写入磁盘文件。


receive_logs.py
日志消息接收者:建立连接,声明exchange,将exchange与queue进行绑定,开始不停的接收log并打印。

[python] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. #!/usr/bin/env python  
  2. #encoding:utf8  
  3. import pika  
  4.   
  5. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  6.         host='localhost'))  
  7. channel = connection.channel()  
  8.   
  9. #作为好的习惯,在producer和consumer中分别声明一次以保证所要使用的exchange存在  
  10. channel.exchange_declare(exchange='logs',  
  11.                          type='fanout')  
  12.   
  13. #在不同的producer和consumer间共享queue时指明queue的name是重要的  
  14. #但某些时候,比如日志系统,需要接收所有的log message而非一个子集  
  15. #而且仅对当前的message 流感兴趣,对于过时的message不感兴趣,那么  
  16. #可以申请一个临时队列这样,每次连接到RabbitMQ时会以一个随机的名字生成  
  17. #一个新的空的queue,将exclusive置为True,这样在consumer从RabbitMQ断开后会删除该queue  
  18. result = channel.queue_declare(exclusive=True)  
  19.   
  20. #用于获取临时queue的name  
  21. queue_name = result.method.queue  
  22.   
  23. #exchange与queue之间的关系成为binding  
  24. #binding告诉exchange将message发送该哪些queue  
  25. channel.queue_bind(exchange='logs',  
  26.                    queue=queue_name)  
  27.   
  28. print ' [*] Waiting for logs. To exit press CTRL+C'  
  29.   
  30. def callback(ch, method, properties, body):  
  31.     print " [x] %r" % (body,)  
  32.   
  33. #从指定地queue中consume message且不确认  
  34. channel.basic_consume(callback,  
  35.                       queue=queue_name,  
  36.                       no_ack=True)  
  37.   
  38. channel.start_consuming()  
emit_log.py
日志消息发送者:建立连接,声明fanout类型的exchange,通过exchage向queue发送日志消息,消息被广播给所有接收者,关闭连接,退出。

[python] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. #!/usr/bin/env python  
  2. #encoding:utf8  
  3.   
  4. import pika  
  5. import sys  
  6.   
  7. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  8.         host='localhost'))  
  9. channel = connection.channel()  
  10.   
  11. #producer只能通过exchange将message发给queue  
  12. #exchange的类型决定将message路由至哪些queue  
  13. #可用的exchange类型:direct\topic\headers\fanout  
  14. #此处定义一个名称为'logs'的'fanout'类型的exchange,'fanout'类型的exchange简单的将message广播到它所知道的所有queue  
  15. channel.exchange_declare(exchange='logs',  
  16.                          type='fanout')  
  17.   
  18. message = ' '.join(sys.argv[1:]) or "info: Hello World!"  
  19.   
  20. #将message publish到名为log的exchange中  
  21. #因为是fanout类型的exchange,这里无需指定routing_key  
  22. channel.basic_publish(exchange='logs',  
  23.                       routing_key='',  
  24.                       body=message)  
  25.   
  26. print " [x] Sent %r" % (message,)  
  27.   
  28. connection.close()  
测试

[plain] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. python receive_logs.py  
  2. python emit_log.py "info: This is the log message"  

应用场景4-Routing

应用场景3中构建了简单的log系统,可以将log message广播至多个receiver。现在我们将考虑只把指定的message类型发送给其subscriber,比如,只把error message写到log file而将所有log message显示在控制台。


receive_logs_direct.py
log message接收者:建立连接,声明direct类型的exchange,声明queue,使用提供的参数作为routing_key将queue绑定到exchange,开始循环接收log message并打印。

[python] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. #!/usr/bin/env python  
  2. import pika  
  3. import sys  
  4.   
  5. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  6.         host='localhost'))  
  7. channel = connection.channel()  
  8.   
  9. #声明一个名为direct_logs类型为direct的exchange  
  10. #同时在producer和consumer中声明exchage或queue是个好习惯,以保证其存在  
  11. channel.exchange_declare(exchange='direct_logs',  
  12.                          type='direct')  
  13.   
  14. result = channel.queue_declare(exclusive=True)  
  15. queue_name = result.method.queue  
  16.   
  17. #从命令行获取参数:routing_key  
  18. severities = sys.argv[1:]  
  19. if not severities:  
  20.     print >> sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],)  
  21.     sys.exit(1)  
  22.   
  23. for severity in severities:  
  24.     #exchange和queue之间的binding可接受routing_key参数  
  25.     #该参数的意义依赖于exchange的类型  
  26.     #fanout类型的exchange直接忽略该参数  
  27.     #direct类型的exchange精确匹配该关键字进行message路由  
  28.     #对多个queue使用相同的binding_key是合法的  
  29.     channel.queue_bind(exchange='direct_logs',  
  30.                        queue=queue_name,  
  31.                        routing_key=severity)  
  32.   
  33. print ' [*] Waiting for logs. To exit press CTRL+C'  
  34.   
  35. def callback(ch, method, properties, body):  
  36.     print " [x] %r:%r" % (method.routing_key, body,)  
  37.   
  38. channel.basic_consume(callback,  
  39.                       queue=queue_name,  
  40.                       no_ack=True)  
  41.   
  42. channel.start_consuming()  

emit_log_direct.py
log message发送者:建立连接,声明direct类型的exchange,生成并发送log message到exchange,关闭连接,退出。

[python] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. #!/usr/bin/env python  
  2. #encoding:utf8  
  3.   
  4. import pika  
  5. import sys  
  6.   
  7. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  8.         host='localhost'))  
  9. channel = connection.channel()  
  10.   
  11. #声明一个名为direct_logs的direct类型的exchange  
  12. #direct类型的exchange  
  13. channel.exchange_declare(exchange='direct_logs',  
  14.                          type='direct')  
  15.   
  16. #从命令行获取basic_publish的配置参数  
  17. severity = sys.argv[1if len(sys.argv) > 1 else 'info'  
  18. message = ' '.join(sys.argv[2:]) or 'Hello World!'  
  19.   
  20. #向名为direct_logs的exchage按照设置的routing_key发送message  
  21. channel.basic_publish(exchange='direct_logs',  
  22.                       routing_key=severity,  
  23.                       body=message)  
  24.   
  25. print " [x] Sent %r:%r" % (severity, message)  
  26. connection.close()  

测试:

python receive_logs_direct.py info
python emit_log_direct.py info "The message"

应用场景5-topic

应用场景4中改进的log系统中用direct类型的exchange替换应用场景3中的fanout类型exchange实现将不同的log message发送给不同的subscriber(也即分别通过不同的routing_key将queue绑定到exchange,这样exchange便可将不同的message根据message内容路由至不同的queue)。但仍然存在限制,不能根据多个规则路由消息,比如接收者要么只能收error类型的log message要么只能收info类型的message。如果我们不仅想根据log的重要级别如info、warning、error等来进行log message路由还想同时根据log message的来源如auth、cron、kern来进行路由。为了达到此目的,需要topic类型的exchange。topic类型的exchange中routing_key中可以包含两个特殊字符:“*”用于替代一个词,“#”用于0个或多个词。

receive_logs_topic.py
log message接收者:建立连接,声明topic类型的exchange,声明queue,根据程序参数构造routing_key,根据routing_key将queue绑定到exchange,循环接收并处理message。

[python] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. #!/usr/bin/env python  
  2. import pika  
  3. import sys  
  4.   
  5. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  6.         host='localhost'))  
  7. channel = connection.channel()  
  8.   
  9. #声明一个名为direct_logs类型为direct的exchange  
  10. #同时在producer和consumer中声明exchage或queue是个好习惯,以保证其存在  
  11. channel.exchange_declare(exchange='direct_logs',  
  12.                          type='direct')  
  13.   
  14. result = channel.queue_declare(exclusive=True)  
  15. queue_name = result.method.queue  
  16.   
  17. #从命令行获取参数:routing_key  
  18. severities = sys.argv[1:]  
  19. if not severities:  
  20.     print >> sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],)  
  21.     sys.exit(1)  
  22.   
  23. for severity in severities:  
  24.     #exchange和queue之间的binding可接受routing_key参数  
  25.     #该参数的意义依赖于exchange的类型  
  26.     #fanout类型的exchange直接忽略该参数  
  27.     #direct类型的exchange精确匹配该关键字进行message路由  
  28.     #对多个queue使用相同的binding_key是合法的  
  29.     channel.queue_bind(exchange='direct_logs',  
  30.                        queue=queue_name,  
  31.                        routing_key=severity)  
  32.   
  33. print ' [*] Waiting for logs. To exit press CTRL+C'  
  34.   
  35. def callback(ch, method, properties, body):  
  36.     print " [x] %r:%r" % (method.routing_key, body,)  
  37.   
  38. channel.basic_consume(callback,  
  39.                       queue=queue_name,  
  40.                       no_ack=True)  
  41.   
  42. channel.start_consuming()  

emit_log_topic.py
log message发送者:建立连接、声明topic类型的exchange、根据程序参数构建routing_key和要发送的message,以构建的routing_key将message发送给topic类型的exchange,关闭连接,退出。

[python] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. #!/usr/bin/env python  
  2. #encoding:utf8  
  3. import pika  
  4. import sys  
  5.   
  6. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  7.         host='localhost'))  
  8. channel = connection.channel()  
  9.   
  10. #声明一个名为topic_logs的topic类型的exchange  
  11. #topic类型的exchange可通过通配符对message进行匹配从而路由至不同queue  
  12. channel.exchange_declare(exchange='topic_logs',  
  13.                          type='topic')  
  14.   
  15. routing_key = sys.argv[1if len(sys.argv) > 1 else 'anonymous.info'  
  16. message = ' '.join(sys.argv[2:]) or 'Hello World!'  
  17.   
  18. channel.basic_publish(exchange='topic_logs',  
  19.                       routing_key=routing_key,  
  20.                       body=message)  
  21.   
  22. print " [x] Sent %r:%r" % (routing_key, message)  
  23. connection.close()  

测试:

[plain] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. python receive_logs_topic.py "*.rabbit"  
  2. python emit_log_topic.py red.rabbit Hello  


应用场景6-PRC

在应用场景2中描述了如何使用work queue将耗时的task分配到不同的worker中。但是,如果我们task是想在远程的计算机上运行一个函数并等待返回结果呢。这根场景2中的描述是一个完全不同的故事。这一模式被称为远程过程调用。现在,我们将构建一个RPC系统,包含一个client和可扩展的RPC server,通过返回斐波那契数来模拟RPC service。


rpc_server.py
RPC server:建立连接,声明queue,定义了一个返回指定数字的斐波那契数的函数,定义了一个回调函数在接收到包
含参数的调用请求后调用自己的返回斐波那契数的函数并将结果发送到与接收到message的queue相关联的queue,并进行确认。开始接收调用请求并用回调函数进行请求处理。

[python] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. #!/usr/bin/env python  
  2. #encoding:utf8  
  3. import pika  
  4.   
  5. #建立到达RabbitMQ Server的connection  
  6. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  7.         host='localhost'))  
  8. channel = connection.channel()  
  9.   
  10. #声明一个名为rpc_queue的queue  
  11. channel.queue_declare(queue='rpc_queue')  
  12.   
  13. #计算指定数字的斐波那契数  
  14. def fib(n):  
  15.     if n == 0:  
  16.         return 0  
  17.     elif n == 1:  
  18.         return 1  
  19.     else:  
  20.         return fib(n-1) + fib(n-2)  
  21.   
  22. #回调函数,从queue接收到message后调用该函数进行处理  
  23. def on_request(ch, method, props, body):  
  24.     #由message获取要计算斐波那契数的数字  
  25.     n = int(body)  
  26.   
  27.     print " [.] fib(%s)"  % (n,)  
  28.     #调用fib函数获得计算结果  
  29.     response = fib(n)  
  30.       
  31.     #exchage为空字符串则将message发送个到routing_key指定的queue  
  32.     #这里queue为回调函数参数props中reply_ro指定的queue  
  33.     #要发送的message为计算所得的斐波那契数  
  34.     #properties中correlation_id指定为回调函数参数props中co的rrelation_id  
  35.     #最后对消息进行确认  
  36.     ch.basic_publish(exchange='',  
  37.                      routing_key=props.reply_to,  
  38.                      properties=pika.BasicProperties(correlation_id = \  
  39.                                                          props.correlation_id),  
  40.                      body=str(response))  
  41.     ch.basic_ack(delivery_tag = method.delivery_tag)  
  42.   
  43. #只有consumer已经处理并确认了上一条message时queue才分派新的message给它  
  44. channel.basic_qos(prefetch_count=1)  
  45.   
  46. #设置consumeer参数,即从哪个queue获取消息使用哪个函数进行处理,是否对消息进行确认  
  47. channel.basic_consume(on_request, queue='rpc_queue')  
  48.   
  49. print " [x] Awaiting RPC requests"  
  50.   
  51. #开始接收并处理消息  
  52. channel.start_consuming()  
rpc_client.py
RPC client:远程过程调用发起者:定义了一个类,类中初始化到RabbitMQ Server的连接、声明回调queue、开始在回调queue上等待接收响应、定义了在回调queue上接收到响应后的处理函数on_response根据响应关联的correlation_id属性作出响应、定义了调用函数并在其中向调用queue发送包含correlation_id等属性的调用请求、初始化一个client实例,以30为参数发起远程过程调用。

[python] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. #!/usr/bin/env python  
  2. #encoding:utf8  
  3. import pika  
  4. import uuid  
  5.   
  6. #在一个类中封装了connection建立、queue声明、consumer配置、回调函数等  
  7. class FibonacciRpcClient(object):  
  8.     def __init__(self):  
  9.         #建立到RabbitMQ Server的connection  
  10.         self.connection = pika.BlockingConnection(pika.ConnectionParameters(  
  11.                 host='localhost'))  
  12.   
  13.         self.channel = self.connection.channel()  
  14.           
  15.         #声明一个临时的回调队列  
  16.         result = self.channel.queue_declare(exclusive=True)  
  17.         self.callback_queue = result.method.queue  
  18.   
  19.         #此处client既是producer又是consumer,因此要配置consume参数  
  20.         #这里的指明从client自己创建的临时队列中接收消息  
  21.         #并使用on_response函数处理消息  
  22.         #不对消息进行确认  
  23.         self.channel.basic_consume(self.on_response, no_ack=True,  
  24.                                    queue=self.callback_queue)  
  25.       
  26.     #定义回调函数  
  27.     #比较类的corr_id属性与props中corr_id属性的值  
  28.     #若相同则response属性为接收到的message  
  29.     def on_response(self, ch, method, props, body):  
  30.         if self.corr_id == props.correlation_id:  
  31.             self.response = body  
  32.    
  33.     def call(self, n):  
  34.         #初始化response和corr_id属性  
  35.         self.response = None  
  36.         self.corr_id = str(uuid.uuid4())  
  37.          
  38.         #使用默认exchange向server中定义的rpc_queue发送消息  
  39.         #在properties中指定replay_to属性和correlation_id属性用于告知远程server  
  40.         #correlation_id属性用于匹配request和response  
  41.         self.channel.basic_publish(exchange='',  
  42.                                    routing_key='rpc_queue',  
  43.                                    properties=pika.BasicProperties(  
  44.                                          reply_to = self.callback_queue,  
  45.                                          correlation_id = self.corr_id,  
  46.                                          ),  
  47.                                    #message需为字符串  
  48.                                    body=str(n))  
  49.   
  50.         while self.response is None:  
  51.             self.connection.process_data_events()  
  52.           
  53.         return int(self.response)  
  54.   
  55. #生成类的实例  
  56. fibonacci_rpc = FibonacciRpcClient()  
  57.   
  58. print " [x] Requesting fib(30)"  
  59. #调用实例的call方法  
  60. response = fibonacci_rpc.call(30)  
  61. print " [.] Got %r" % (response,)  

测试:

[python] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. python rpc_server.py  
  2. python rpc_client.py  

RabbitMQ的几种典型使用场景

RabbitMQ主页:https://www.rabbitmq.com/

AMQP

AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现。它主要包括以下组件:

1.Server(broker): 接受客户端连接,实现AMQP消息队列和路由功能的进程。

2.Virtual Host:其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host

3.Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不同类型的Exchange路由的行为是不一样的。

4.Message Queue:消息队列,用于存储还未被消费者消费的消息。

5.Message: 由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。

6.Binding:Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。 

7.Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。

8.Channel:信道,仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection

9.Command:AMQP的命令,客户端通过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端可以通过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。

在了解了AMQP模型以后,需要简单介绍一下AMQP的协议栈,AMQP协议本身包括三层:

1.Module Layer,位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑,例如,客户端可以通过queue.declare声明一个队列,利用consume命令获取一个队列中的消息。

2.Session Layer,主要负责将客户端的命令发送给服务器,在将服务器端的应答返回给客户端,主要为客户端与服务器之间通信提供可靠性、同步机制和错误处理。

3.Transport Layer,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示。

RabbitMQ使用场景

学习RabbitMQ的使用场景,来自官方教程:https://www.rabbitmq.com/getstarted.html

场景1:单发送单接收

使用场景:简单的发送与接收,没有特别的处理。

Producer:

复制代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Send {
    
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws Exception {
                
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    
    channel.close();
    connection.close();
  }
}
复制代码

Consumer:

复制代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {
    
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(QUEUE_NAME, true, consumer);
    
    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      System.out.println(" [x] Received '" + message + "'");
    }
  }
}
复制代码

场景2:单发送多接收

使用场景:一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。

Producer:

复制代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {
  
  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    
    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    
    String message = getMessage(argv);
    
    channel.basicPublish( "", TASK_QUEUE_NAME, 
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    
    channel.close();
    connection.close();
  }
    
  private static String getMessage(String[] strings){
    if (strings.length < 1)
      return "Hello World!";
    return joinStrings(strings, " ");
  }  
  
  private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
      words.append(delimiter).append(strings[i]);
    }
    return words.toString();
  }
}
复制代码

发送端和场景1不同点:

1、使用“task_queue”声明了另一个Queue,因为RabbitMQ不容许声明2个相同名称、配置不同的Queue

2、使"task_queue"的Queue的durable的属性为true,即使消息队列durable

3、使用MessageProperties.PERSISTENT_TEXT_PLAIN使消息durable

When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.

Consumer:

复制代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
  
public class Worker {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    
    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
    channel.basicQos(1);
    
    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
    
    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      
      System.out.println(" [x] Received '" + message + "'");
      doWork(message);
      System.out.println(" [x] Done");

      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }         
  }
  
  private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
      if (ch == '.') Thread.sleep(1000);
    }
  }
}
复制代码

接收端和场景1不同点:

1、使用“task_queue”声明消息队列,并使消息队列durable

2、在使用channel.basicConsume接收消息时使autoAck为false,即不自动会发ack,由channel.basicAck()在消息处理完成后发送消息。

3、使用了channel.basicQos(1)保证在接收端一个消息没有处理完时不会接收另一个消息,即接收端发送了ack后才会接收下一个消息。在这种情况下发送端会尝试把消息发送给下一个not busy的接收端。

注意点:

1)It's a common mistake to miss the basicAck. It's an easy error, but the consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery), but RabbitMQ will eat more and more memory as it won't be able to release any unacked messages.

2)Note on message persistence

Marking messages as persistent doesn't fully guarantee that a message won't be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn't saved it yet. Also, RabbitMQ doesn't do fsync(2) for every message -- it may be just saved to cache and not really written to the disk. The persistence guarantees aren't strong, but it's more than enough for our simple task queue. If you need a stronger guarantee you can wrap the publishing code in atransaction.

3)Note about queue size

If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.

场景3:Publish/Subscribe

使用场景:发布、订阅模式,发送端发送广播消息,多个接收端接收。

Producer:

复制代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

    String message = getMessage(argv);

    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }
  
  private static String getMessage(String[] strings){
    if (strings.length < 1)
            return "info: Hello World!";
    return joinStrings(strings, " ");
  }
  
  private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
  }
}
复制代码

发送端:

发送消息到一个名为“logs”的exchange上,使用“fanout”方式发送,即广播消息,不需要使用queue,发送端不需要关心谁接收。

Consumer:

复制代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogs {

  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");
    
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);

    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());

      System.out.println(" [x] Received '" + message + "'");   
    }
  }
}
复制代码

接收端:

1、声明名为“logs”的exchange的,方式为"fanout",和发送端一样。

2、channel.queueDeclare().getQueue();该语句得到一个随机名称的Queue,该queue的类型为non-durable、exclusive、auto-delete的,将该queue绑定到上面的exchange上接收消息。

3、注意binding queue的时候,channel.queueBind()的第三个参数Routing key为空,即所有的消息都接收。如果这个值不为空,在exchange type为“fanout”方式下该值被忽略!

场景4:Routing (按路线发送接收)

使用场景:发送端按routing key发送消息,不同的接收端按不同的routing key接收消息。

Producer:

复制代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLogDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");

    String severity = getSeverity(argv);
    String message = getMessage(argv);

    channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
    System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

    channel.close();
    connection.close();
  }
  
  private static String getSeverity(String[] strings){
    if (strings.length < 1)
            return "info";
    return strings[0];
  }

  private static String getMessage(String[] strings){ 
    if (strings.length < 2)
            return "Hello World!";
    return joinStrings(strings, " ", 1);
  }
  
  private static String joinStrings(String[] strings, String delimiter, int startIndex) {
    int length = strings.length;
    if (length == 0 ) return "";
    if (length < startIndex ) return "";
    StringBuilder words = new StringBuilder(strings[startIndex]);
    for (int i = startIndex + 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
  }
}
复制代码

发送端和场景3的区别:

1、exchange的type为direct

2、发送消息的时候加入了routing key

Consumer:

复制代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String queueName = channel.queueDeclare().getQueue();
    
    if (argv.length < 1){
      System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
      System.exit(1);
    }
    
    for(String severity : argv){    
      channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }
    
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);

    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      String routingKey = delivery.getEnvelope().getRoutingKey();

      System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");   
    }
  }
}
复制代码

接收端和场景3的区别:

在绑定queue和exchange的时候使用了routing key,即从该exchange上只接收routing key指定的消息。

场景5:Topics (按topic发送接收)

使用场景:发送端不只按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此。

Producer:

复制代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLogTopic {

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) {
    Connection connection = null;
    Channel channel = null;
    try {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
  
      connection = factory.newConnection();
      channel = connection.createChannel();

      channel.exchangeDeclare(EXCHANGE_NAME, "topic");

      String routingKey = getRouting(argv);
      String message = getMessage(argv);

      channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
      System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

    }
    catch  (Exception e) {
      e.printStackTrace();
    }
    finally {
      if (connection != null) {
        try {
          connection.close();
        }
        catch (Exception ignore) {}
      }
    }
  }
  
  private static String getRouting(String[] strings){
    if (strings.length < 1)
            return "anonymous.info";
    return strings[0];
  }

  private static String getMessage(String[] strings){ 
    if (strings.length < 2)
            return "Hello World!";
    return joinStrings(strings, " ", 1);
  }
  
  private static String joinStrings(String[] strings, String delimiter, int startIndex) {
    int length = strings.length;
    if (length == 0 ) return "";
    if (length < startIndex ) return "";
    StringBuilder words = new StringBuilder(strings[startIndex]);
    for (int i = startIndex + 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
  }
}
复制代码

发送端和场景4的区别:

1、exchange的type为topic

2、发送消息的routing key不是固定的单词,而是匹配字符串,如"*.lu.#",*匹配一个单词,#匹配0个或多个单词。

Consumer:

复制代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsTopic {

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) {
    Connection connection = null;
    Channel channel = null;
    try {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
  
      connection = factory.newConnection();
      channel = connection.createChannel();

      channel.exchangeDeclare(EXCHANGE_NAME, "topic");
      String queueName = channel.queueDeclare().getQueue();
 
      if (argv.length < 1){
        System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
        System.exit(1);
      }
    
      for(String bindingKey : argv){    
        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
      }
    
      System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

      QueueingConsumer consumer = new QueueingConsumer(channel);
      channel.basicConsume(queueName, true, consumer);

      while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        String routingKey = delivery.getEnvelope().getRoutingKey();

        System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");   
      }
    }
    catch  (Exception e) {
      e.printStackTrace();
    }
    finally {
      if (connection != null) {
        try {
          connection.close();
        }
        catch (Exception ignore) {}
      }
    }
  }
}
复制代码

接收端和场景4的区别:

1、exchange的type为topic

2、接收消息的routing key不是固定的单词,而是匹配字符串。

注意点:

Topic exchange

Topic exchange is powerful and can behave like other exchanges. When a queue is bound with "#" (hash) binding key - it will receive all the messages, regardless of the routing key - like in fanout exchange. When special characters "*" (star) and "#" (hash) aren't used in bindings, the topic exchange will behave just like a direct one.

 

参考:

https://www.rabbitmq.com/getstarted.html

http://backend.blog.163.com/blog/static/202294126201322215551999/

 

作者:阿凡卢
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
posted on 2017-03-06 17:18 思月行云 阅读(653) 评论(0)  编辑 收藏 引用 所属分类: 分布式\MQ

只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   博问   Chat2DB   管理