http://blog.csdn.net/mra__s__/article/details/55011530
理解消息通信
一、 消息通信的概念--消费者、生产者和代理
生产者创建消息,消费者接受这些消息。你的应用程序可以作为生产者,向其他应用程序发送消息,或者作为一个消费者,接收消息。也可以在两者之间进行切换。不过在此之前,它必须先建立一条信道(channel)。不论你是发布消息、订阅队列或是接受消息都是通过信道完成。
二、 AMQP元素--交换器、队列和绑定
从概念上讲,AMQP消息路由必须有三部分:交换器、队列和绑定。生产者把消息发布到交换器上;消息最终到达队列,并被消费者接受;绑定决定了消息如何从路由器路由到特定的队列。在研究交换器和绑定之前,需要先理解队列的概念和工作原理。如下图:
消费者通过以下两种方式从特定队列中接收消息:
(1) 通过AMQP的base.consumer命令订阅。这样做会将信道置为接收模式,直到取消对队列的订阅为止。
(2) 有些时候,你只想从队列中获取单条消息而不是持续订阅。向队列请求单条消息是通过AMQP的base.get命令实现的。大致上讲,base.get命令会订阅消息,获取单条消息,然后取消订阅。消费者理应始终使用base.consumer来实现高吞吐量。
A: 当有多个消费者订阅到同一个队列上是,消息是如何发布的:
Q: 队列收到的消息将以循环(round-robin)的方式发送给消费者,每条消息只会发送给一个消费者。消息确认接收机制:消费者必须通过AMQP的base.ask命令显式的向RabbitMQ发送一个确认消息,或者在订阅到队列的时候就将base.ask参数设置成true。消费者对消息的确认和告诉生产者消息已经被接收这两件事毫不相关。
如何创建队列。生产者和消费者都能使用AMQP的base.declare命令创建队列。如果消费者在同一条信道上订阅了另一条对列的话,就无法再声明队列了。则必须先取消队列,将信道置为“传输”模式。
队列设置了一些有用的参数:
1.exlusion--将参数设置成true后队列将变为私有,限制一个队列中只能有一个消费者。
2.auto-delete--将参数设置为true后,最后一个消费者取消订阅之后队列将自动移除。
联合:交换器和绑定。
Q: 消息是如何到达队列的呢?
A: 路由键规则来指定消息从路由器到哪个队列。
Q: 那它是如何处理投递到多个队列的情况呢?
A: 协议中定义的不同类型交换器发挥了作用。一共四种类型:direct、fanout、topic和header。
direct:如果路由键匹配的话,消息就被投递到对应的队列。
base_publish($msg,'默认交换器','队列名称');
fanout:当你发送一条消息到fanout路由器时,他会把消息投递给所有附加在此交换器上的队列。可以轻而易举地添加应用程序的功能。
base_publish($msg,'logs-exchange','error.msg-inbox');
topic:跟direct比较相像,有一些通配规则。单个“.”把路由分成几部分,“*”匹配特定位置的任意文本,“#”匹配所有规则。
queue_bind('msg-inbox-errors','logs-exchange','error.msg-inbox');
queue_bind('msg-inbox-errors','logs-exchange','*.msg-inbox');
queue_bind('all-logs','logs-exchange','#');
三、 虚拟主机
虚拟主机vhost是AMQP概念的基础,你必须在连接时进行指定。RabbitMQ包含了开箱即用的默认虚拟主机"/",因此使用非常简便。vhost之间是绝对隔离,保障了队列和交换机的安全性,因此消息路由的组件也无法进行交互。
rabbitmqctl add_vhost[vhost_name]
rabbitmqctl delete _vhost[vhost_name]
四、 消息持久化
重启rabbitmq服务器后,那些队列和交换器就都消失了,原因在于每个队列和交换器的durable属性,该属性默认情况为false。如果需要持久化,单纯将队列和交换器的durable属性设置为true是不够滴。消息发布之前,通过把它的“投递模式”(delivery mode)选项设置为2来把消息标记为持久化。代价则是消耗性能,虽然重启RabbitMQ服务器后队列和交换器能恢复。
五、 一条消息经历从生产者到消费者的生命周期
发布者需要完成的任务:1.连接到RabbitMQ。2.获取信道。3.声明交换机。4.创建消息。5.关闭消息。6.关闭信道。7.关闭连接。
消费者需要执行的任务:1.连接到RabbitMQ。2.获取信道。3.声明交换机。4.声明队列。5.把队列和交换机绑定起来。6.消费消息。7.关闭信道。8.关闭连接。
使用发送方确认模式来确认投递。
运行和管理RabbitMQ
一、 服务器管理—启动和停止节点
运行子系统:rabbitmq安装目录下找到./sbin目录,运行./rabbitmq-server。通过日志查看运行情况,日志目录/var/log/rabbitmq/下。以守护程序的方式在后台运行:./rabbitmq-server detached,至此rabbitmq服务启动成功。
当运行rabbitmq连接到控制台时,你按下CTRL+C组合键后你猜是哪个..
在rabbitmq安装目录下运行./sbin/rabbitmqctl stop来干净地关闭。
rabbitmq配置文件目录在/etc/rabbitmq/rabbitmq.config。
二、 权限配置
RabbitMQ权限工作原理:用户可以为连接到RabbitMQ主机的应用程序设计不同级别的权限(读、写和“/”或配置)。
管理用户:在RabbitMQ中,用户是访问控制的基本单元。针对一到多个vhost,其可以被赋予不同级别的访问权限,并使用标准的用户名/密码对来认证用户。对用的增加、删除以及列出列表,都非常简单。这些操作都是通过rabbitmqctl完成的。
添加用户:rabbitmqctl add_user username password
删除用户:rabbitmqctl delete_user username
用户列表:rabbitmqctl list_users
设置权限:rabbitmqctl set_permissions –p sycamore \ cashing-tier “.*” “.*” “.*”
–p sycamore:告诉set_permissions条目应该应用在哪个vhost上面。
cashing-tier:被授予权限的用户。
“.*” “.*” “.*”:这些是授予的权限。这些值分别映射到配置、写和读。
移除权限:rabbbimqctl clear_permissions –p sycamore cashing-tier
权限列表:rabbitmqctl list_user_permissions cashing-tier
三、 使用统计
其中经常看到的-p选项,它指明了虚拟主机和路径信息。
rabbitmqctl list_queues -p sycamore 列出虚拟主机为sycamore的队列列表
list_queues [-p Vhost_Path] [<QueueInfoItem>]
rabbitmqctl list_exchanges
list_exchanges [ExchangeInfoItem]
rabbitmqctl list_bindings
理解RabbitMQ日志,LOG_BASE=/var/log/rabbitmq
四、 RabbitMQ和Erlang问题疑难解答
由badrpc、nodedown和其他Erlang引起的问题
1. Erlang Cookie
使用rabbitmqctl命令时常出现一些莫名错误。先理解下rabbitmqctl的工作原理。rabbitmqctl命令先启动Erlang节点,并从那个使用Erlang分布式系统尝试连接RabbitMQ节点。要完成这项工作需要两样东西:合适的Erlang Cookie和合适的主机名称。
Q: 那么什么是Erlang Cookie呢?
A: Erlang节点通过交换作为秘密令牌的Erlangcookie以获得认证。Erlang将令牌存储于home目录下的.erlang.cookie文件中
2. Erlang节点
当你启动Erlang节点时,你可以给它两个互斥的节点名选项,name和sname。
当你想让rabbitmqctl能够连上RabbitMQ时,你必须使得这些参数两边都能匹配(rabbit@hostname)。
3. Mnesia和主机名
RabbitMQ使用Mnesia存储队列、交换器、绑定等信息。RabbitMQ启动时做的一件事就是启动Mnesia数据库。如果启动Mnesia失败,则RabbitMQ也会跟着失败。而导致Mnesia失败的原因大致有二:第一个也是最常见的MNESIA_BASE目录的权限问题。另一个常见问题是读取表格失败。如果主机名更改了,或是服务器运行在集群模式下,无法在启动的时候连接到其他节点,都会导致启动失败。
4. Erlang故障排除技巧
以test作为节点名启动Eralng节点:erl sname test。
执行node()函数会展示--Erlang中方括号为界的列表--你连接上的节点列表。
通过使用rpc:call,同时提供节点、模块、函数和参数作为入参,你可以在远程rabbit上执行其他参数以获取不同的信息。
解决Rabbit相关问题:编码与模式
一、 面向消息通信来设计应用程序
1. 异步状态思维(分离请求和动作)
2. 提供扩展性:没有负载均衡器的世界
3. 使用AMQP来解耦应用程序最大好处:免费的API,语言不会约束消息通信。
二、 消息通信模式
解决耗时的任务和整合用不同语言编写的应用程序。这两个看似有不同的问题,但却有着共同的本质:解耦请求和操作。或者换种说法,这两个问题均需要从同步编程模式转向异步编程模式。
三、 发后即忘模型
匹配该模式的两种一般类型的任务
1. 批处理(batchprocessing)--针对大型数据集合的工作和转换。这种类型的任务可以构建为单一的任务请求,或者多个任务对数据集合的独立部分进行操作。
2. 通知(notifications)--对发生事件的描述。内容可以是消息的日志,也可以是真实的报告通知给另一个程序或者是管理员。
这两个例子符合我们之前提到的两种类别:第一个是告警框架(发送告警)。另一个是将单张图片上传并将其转换成众多图片格式和尺寸(并行处理)。
四、 用RabbitMQ实现RPC
1. 私有队列和确认发送。
2. 使用reply_to来实现简单的jsonRPC
集群并处理失败
一、 RabbitMQ集群架构
RabbitMQ最优秀的功能之一就是其内建集群。同时能够将集群在5分钟内构建并运行起来。
RabbitMQ内建集群的设计用于完成两个目标:允许消费者和生产者在Rabbit节点奔溃的情况下继续运行,以及通过添加更多的节点来线性扩展消息通信吞吐量。
Q: RabbitMQ是如何记录你所有使用过的各种基础构件,同时他们又如何装配成一个消息通信服务器的呢?
A: RabbitMQ会始终记录以下四种类型的内部元数据:
1. 队列元数据--队列名称和属性(是否可持久化,是否自动删除)
2. 交换器元数据--交换器名称、类型和属性(可持久化等)
3. 绑定元数据--一张简单的表格展示了如何将消息路由到队列
4. vhost元数据--为vhost内的队列、交换器和绑定提供命名空间和安全属性。
我们深入到集群节点和他们如何存储元数据前,首先理解在集群环境中队列和交换器的行为。
1. 集群中的队列。
2. 分布交换器。
3. 是内存节点还是磁盘节点。
二、 在你的笔记本上搭建集群
在笔记本上创建三个节点,然后将三个节点集群,具体操作可以参考之前两篇文章。
三、 使用物理服务器创建集群
分布在不同的服务器上创建不同的节点,将不同节点进行集群。
四、 升级集群节点
独立系统中升级节点只需解压新版本,然后运行即可,旧的数据也将会保留。如果是集群节点,需要备份配置信息后,关闭所有生产者等待消费者消费完队列中的所有信息(rabbitmqctl观察队列状态)。现在,关闭节点,并解压新版本到RabbitMQ到现有的安装目录。
五、 与镜像队列一起工作
当加入镜像队列后,信道负责将消息路由到合适的队列。
镜像队列是如何影响事务和发送方确认模式的?
1. 镜像队列主拷贝故障时,从拷贝变成主拷贝。
2. 如果镜像队列失去一个从节点的话,则附加在镜像队列的任何消费者都不会注意到这一点。
3. 如果客户端库不能支持消费者取消通知的话,你应该避免使用镜像队列。
从故障中恢复
一、 理解负载均衡
负载均衡将服务器集群IP封装,提供暴露成统一接口。通过将负载均衡器放置在Rabbit的前端,你就可以让它来处理节点选择、故障服务器检测以及负载分布这些复杂的事情了。
二、 安装并配置HAProxy来为Rabbit做负载均衡
选择使用HAProxy的原因:它是免费的,而且非常可靠,并且为各种站点处理高负载,例如:StackOverFlow。同时,它可以运行在几乎所有的基于UNIX的平台上,并且非常容易配置。
1. 安装HAProxy
2. 配置HAProxy
以上两步自己度娘。
三、 重连并从故障中恢复
现在开发系统上运行着负载均衡器,我们准备深入探索如何使用它来为消息通信应用程序植入故障转移和快速恢复的能力。
1. 如果我重新连接到新的服务器,那么我的信道以及其上的所有消费循环会怎样呢?它们现在都失效了。你必须对它们进行重建。
2. 当我进行重连的时候,我能否假设所有的交换器、队列和绑定仍然存在于集群之中?我能否重连之后立即开始从队列消费呢?
Warren和Shovel:故障转移和复制
一、 理解主/备方式(warren)
集群迫使你不得不在以下两者之间做权衡:所有节点表现得像独立单元来分布负载的优点,但是在故障节点恢复前无法使用可持久化队列的缺点。这正是warren和Shovel的用武之地。
二、 使用负载均衡器创建warren
基于warren的负载均衡器的HAProxy配置。基于负载均衡器来做故障转移,最重要的是可以确保当故障转移发生时,你无须担心RabbitMQ无法在备用节点启动,因为它已经运行了。由于Rabbit始终在主节点和备用节点上运行,因此你们可以始终对它们进行监控。如果备机在派上用场之前就变为不可用的话,你在第一时间就能发现。这通过使用共享存储warren是无法做到的。
三、 使用Shovel构建远距离复制
在掌握了集群和warren之后,你就能处理故障并在数据中心之上进行扩展了,但是当你需要在不同的数据中心的Rabbit间复制消息时,改怎么办呢?这时,我们就需要Shovel了。
1. 给Rabbit装备Shovel:Shovel插件介绍。
2. 安装Shovel
3. 配置并运行Shovel
从Web端管理RabbitMQ
一、 Managent插件相对于rabbitmqctl脚本的优势
方便、简洁、功能齐全、浏览多方面同时监测。
二、 启用RabbitMQ Management插件
rabbitmq-plugins enable rabbitmq-management
三、 Management插件功能
四、 从Web控制台来管理用户、队列和交换器
在web控制台上做如下操作:(具体结合界面应用)
1. 创建用户
2. 管理用户的权限
3. 列出队列信息
4. 创建队列
五、 Management插件REST接口介绍
1. 通过使用REST API,你可以轻松自动化那些到目前为止只能通过图形化界面完成的任务。
2. CLI管理:一种更简洁的方式。
3. 安装rabbitmqadmin脚本。
4. 应用rabbitmqadmin脚本。
使用REST API控制Rabbit
一、 Rabbit REST API的限制和功能
不管用API创建队列还是设置权限,每当使用PUT或POST动作的时候,都需要将函数参数编码为JSON格式的哈希表,然后添加到请求正文中。你可以通过浏览器访问http://localhost:55672/api来查看目前大多数(以及完整的)API列表和支持的HTTP动作。
二、 访问消息通信数据统计和计数器
Rabbit Management API使得你可以在任何能够访问网络的地方监控并控制Rabbit。具体详情看此书,代码来控制的。
三、 自动化创建用户和虚拟主机
我们在部署应用时,就使用了相似的脚本来自动化创建用户。通过构造这段脚本,你不仅学习了如何使用Management API来展示条目,还学习了如何展示条目列表,以及如何创建和删除这些条目。你这样将这些概念应用到处理其他条目/资源类型(用户、队列、交换器、连接、权限等)。具体脚本看此书。
监控
一、 编写Nagios健康检测的基础
Nagios健康检测是一个独立程序,它在运行时监控服务并在程序停止运行时退出代码来指示服务的健康状态。从技术上来讲,你甚至不需要Nagios来运行健康检测--你可以在任何时候通过命令行执行并手工观测输出。Nagios健康检测可以用任何语言编写,可以是Python程序,甚至是BASE脚本。
二、 使用AMQP和REST API来监控Rabbit内部状态
构建的AMQP健康检测,当下列任务条件之一为真时,该检测程序会返回一个critical状态
1. RabbitMQ没有响应TCP连接。
2. 当发送AMQP命令时,Pika在接收到响应之前超时了。
3. 当构造AMQP信道时,遇到了协议错误。
仅当这些状态检测都为false时,健康检测程序才会返回OK状态。
基于API的健康检测,aliveness-test,顾名思义,使用三个步骤来验证Rabbit服务器是否健康:
1. 创建一个队列来接收测试消息。
2. 用队列名称作为消息路由键,将消息发往默认交换器。
3. 当消息到达队列的时候就消费该消息;否则就报错。
三、 使用Rabbit可用并且能够进行响应
有许多原因会致使RabbitMQ使用太多的内存,并达到Rabbit配置的最大内存上限。以下是最常见的几种原因:
1. 应用程序有缺陷,消费消息之后忘记向RabbitMQ发回确认消息。这在高容量环境下,会导致成千上万条的消息堆积并耗尽Rabbit的内存。
2. 应用程序使用RabbitMQ将大型数据(譬如图像)路由到处理节点。用不了多少张100MB大小的图像就可以将只有8GB内存大小的服务器内存耗尽。
3. 使用了最新RabbitMQ版本中的新功能,但是该功能有个BUG会导致缓慢的内存泄漏。
四、 观察队列状态以尽早检测消费者问题
监控消费者是否正确运作的方式就是通过监控队列的消息总数,并在总数超过设定的warning或者critical阈值时触发警告。以下通过两种方式来监控队列消息总数:
1. 使用AMQP的queue_declare()命令,设置passive=true参数来重新声明一个已存在的队列。当你在AMQP中声明一个队列时,如果将passive设置为true的话,那么该命令返回的结果中将包含队列消息的总数。
2. 利用我们的老朋友RabbitManagement API来总队列上拉取数据统计,其中就有队列当前的消息总数。
五、 检测消息通信结构中不合需求的配置更改
确保消费者正常工作,检查消息通信结构配置问题:
1. 通过AMQP监控队列等级。
2. 使用REST API来监控队列级别。
3. 建立队列的消息计数基准经验法则。
提升性能,保障安全
一、 交换器、队列和绑定的内存占用
根据数据显示队列、交换器和绑定的内存占用很小,另一个施加在RabbitMQ上的硬性限制是每个Erlang节点的最大Erlang进程数。Erlang应用程序在整个生命周期中会多次创建并销毁进程。
二、 消息持久化和磁盘I/O
当发布消息时,你需要决定丢失其中的任何消息对你来说是否可以接受,这决定了deleverymode设置的值为1(非持久化)还是2(持久化)的问题。
在消息消费过程中你该如何配置,加快消息投递的设定是no-ack标记,你可以在队列订阅时指明。
路由算法和绑定规则,三种不同类型的交换器:direct、fanout和topic。每种交换器类型代表了服务器实现的特定路由算法。绑定规则的路由键模式将更占用内存。
本节介绍了不同的算法和消息发布订阅设置如何影响整个系统的速度,以及像auto-ack模式标记设定能在很大程度上影响系统性能。
三、 RabbitMQ的SSL连接及设置私钥架构
1. SSL证书。
2. 设置证书颁发机构。
3. 生成根证书。
4. 生成服务器端证书。
5. 生成客户端证书。
6. 启用RabbitMQ的SSL监听。
7. 测试你的RabbitMQ SSL设置。
聪明的Rabbit:扩展RabbitMQ
一、 安装RabbitMQ插件
我可以通过安装插件的方式来解决:
1. 支持AMQP以外的协议。
2. 不同的认证机制(LDQP,自定义数据库)。
3. 消息复制。
4. 新的交换器和路由算法。
5. 消息日志和审计。
如果你想启用的插件不是服务器发行的一部分该怎么办呢?首先,你得下载插件的.ez文件到RabbitMQ安装目录的plugins文件夹下,之后像往常一样运行./rabbitmq-plugins enable plugin_name命令即可。
二、 实现自定义交换器插件
1. 将交换器注册到Rabbit。
2. 实现交换器behaviour。
3. 编译自定义交换器。
4. 测试你的插件。
posted on 2017-12-15 14:09
思月行云 阅读(1611)
评论(0) 编辑 收藏 引用 所属分类:
分布式\MQ