1、 rabbitmq介绍RabbitMQ是一个开源的靠AMQP协议实现的服务,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。它可以使对应的客户端(client)与对应的消息中间件(broker)进行交互。消息中间件从发布者(publisher)那里收到消息(发布消息的应用,也称为producer),然后将他们转发给消费者(consumers,处理消息的应用)。由于AMQP是一个网络协议,所以发布者、消费者以及消息中间件可以部署到不同的物理机器上面2、 消息队列的概念消息即是信息的载体。为了让消息发送者和消息接收者都能够明白消息所承载的信息(消息发送者需要知道如何构造消息;消息接收者需要知道如何解析消息),它们就需要按照一种统一的格式描述消息,这种统一的格式称之为消息协议。所以,有效的消息一定具有某一种格式;而没有格式的消息是没有意义的。3、 消息队列应用的场景消息队列在实际应用中常用的使用场景。异步处理,应用解耦,流量削锋和消息通讯四个场景3.1异步处理场景说明:如用户注册后,需要发送邮件和注册短信,传统的做法有两种1)串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。2)并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。3.2应用解耦场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合如何解决以上问题呢?引入应用消息队列后的方案,如下图:订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦3.3、流量削峰流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。a、可以控制活动的人数b、可以缓解短时间内高流量压垮应用用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理3.4、日志处理日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下日志采集客户端,负责日志数据采集,定时写受写入Kafka队列Kafka消息队列,负责日志数据的接收,存储和转发日志处理应用:订阅并消费kafka队列中的日志数据 3.5、消息通讯消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等点对点通讯:客户端A和客户端B使用同一队列,进行消息通讯。聊天室通讯:客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果4、 常见的消息队列产品4.1、redis是一个Key-Value的NoSQL数据库,开发维护很活跃,虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完成可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时,Redis的性能要高于RabbitMQ,而如否数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。4.2、 mecahceq持久化消息队列(简称mcq)是一个轻量级的消息队列,特性如下:简单易用处理速度快多条队列并发性能好与memcache的协议兼容。意味着只要装了前者的extension即可不需要额外的插件在zend framework中使用很方便 php开发框架4.3、 MSMQ这是微软的产品里唯一被认为有价值的东西关键是它并不复杂,除了接收和发送,没有别的,它有一些硬性限制,比如最大消息体积是4MB。4.4、 ZeroMQZeroMQ是一个非常轻量级的消息系统,号称最快的消息队列系统,专门为高吞吐量/低延迟的场景开发,在金融界的应用中经常可以发现它。与RabbitMQ相比,ZeroMQ支持许多高级消息场景,能够实现RabbitMQ不擅长的高级/复杂的队列,但是你必须实现ZeroMQ框架中的各个块(比如Socket或Device等)。ZeroMQ具有一个独特的非中间件的模式你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演这个服务角色。你只需要简单地引用ZeroMQ程序库,可以使用NuGet安装(微软开发的.Net平台),然后你就可以愉快地在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,即没有地方可以观察它是否有问题出现,也就是说如果down机,数据将会丢失。4.5、 Jafka/KafkaJafka/Kafka(能将消息分散到不同的节点上)是LinkedIn于2010年12月开发并开源的一个分布式MQ系统,现在是Apache的一个孵化项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:1)快速持久化,可以在O(1)的系统开销下进行消息持久化;2)高吞吐,在一台普通的服务器上既可以打到10W/s的吞吐速率;3)完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现复杂均衡;4)支持Hadoop数据并行加载,统一了在线和离线的消息处理,对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。5)相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统4.6、 Apache ActiveMQActiveMQ居于(RabbitMQ&ZeroMQ)之间,类似于ZemoMQ,支持高级消息场景,支持数据的持久化ActiveMQ被誉为Java世界的中坚力量。它有很长的历史,且被广泛使用。它还是跨平台的,给那些非微软平台的产品提供了一个天然的集成接入点。然而它只有跑过了MSMQ才有可能被考虑。如需配置ActiveMQ则需要在目标机器上安装Java环境。类似于RabbitMQ,它易于实现高级场景,而且只需付出低消耗。它被誉为消息中间件的“瑞士军刀”。4.7、 RabbitMQRabbitMQ是使用Erlang编写的一个开源消息队列,本身支持很多的协议:AMQP(高级消息队列协议), XMPP(可扩展消息处理现场协议), SMTP(简单邮件传输协议), STONP(简单文本定向消息协议),也正是如此,使的它变的非常重量级,更适合于企业级的开发。它实现了代理(Broker)架构,意味着消息在发送到客户端之前可以在中央节点上排队。此特性使得RabbitMQ易于使用和部署,适宜于很多场景如路由、负载均衡或消息持久化等,用消息队列只需几行代码即可搞定。但是,这使得它的可扩展性差,速度较慢,因为中央节点增加了延迟,消息封装后也比较大。如需配置RabbitMQ则需要在目标机器上安装Erlang环境最终,上述同类产品:1. 都有各自客户端API或支持多种编程语言2. 都有大量的文档3. 都提供了积极的支持4. ActiveMQ、RabbitMQ、MSMQ、Redis都需要启动服务进程,这些都可以监控 和配置,其他几个就有问题了5. 都相对提供了良好的可靠性(一致性)、扩展性和负载均衡,当然还有性能
5、Rabbitmq基础概念应用场景框架RabbitMQ Server:也叫broker server,它不是运送食物的卡车,而是一种传输服务。原话是RabbitMQ isn’t a food truck, it’s a delivery service. 他的角色就是维护一条从Producer到Consumer的路线,保证数据能够按照指定的方式进行传输。但是这个保证也不是100%的保证,但是对于普通的应用来说这已经足够了。当然对于商业系统来说,可以再做一层数据一致性的guard(监控是否执行成功),就可以彻底保证系统的一致性了。Client A & B:也叫Producer,数据的发送方。Create messages and Publish (Send) them to a broker server (RabbitMQ)。一个Message有两个部分:Payload(有效载荷)和Label(标签)。Payload顾名思义就是传输的数据,Label是Exchange的名字或者说是一个tag,它描述了payload,而且RabbitMQ也是通过这个label来决定把这个Message发给哪个Consumer。AMQP仅仅描述了label,而RabbitMQ决定了如何使用这个label的规则。Client 1,2,3:也叫Consumer,数据的接收方。Consumers attach to a broker server (RabbitMQ) and subscribe to a queue。把queue比作是一个有名字的邮箱。当有Message到达某个邮箱后,RabbitMQ把它发送给它的某个订阅者即Consumer。当然可能会把同一个Message发送给很多的Consumer。在这个Message中,只有payload,label已经被删掉了。对于Consumer来说,它是不知道谁发送的这个信息的。就是协议本身不支持。但是当然了如果Producer发送的payload包含了Producer的信息就另当别论了。对于一个数据从Producer到Consumer的正确传递,还有三个概念需要明确:exchanges, queues and bindings。Exchanges:消息交换机,它指定消息按什么规则,路由到哪个队列Queues:消息队列载体,每个消息都会被投入到一个或多个队列Bindings:它的作用就是把exchange和queue按照路由规则绑定起来Routing Key:路由关键字,exchange根据这个关键字进行消息投递Connection:就是一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个TCP连接。Channel:虚拟连接。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。Vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。每个virtual host本质上都是一个RabbitMQ Server,拥有它自己的queue,exchagne,和bings rule等等。这保证了你可以在多个不同的application中使用RabbitMQ。6、Channel的选择那么,为什么使用Channel,而不是直接使用TCP连接?对于OS来说,建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。对于Producer或者Consumer来说,可以并发的使用多个Channel进行Publish或者Receive。7、消息队列执行过程客户端连接到消息队列服务器,打开一个Channel。客户端声明一个Exchange,并设置相关属性。客户端声明一个Queue,并设置相关属性。客户端使用Routing key,在Exchange和Queue之间建立好绑定关系。客户端投递消息到Exchange。Exchange接收到消息后,就根据消息的key和已经设置的Binding,进行消息路由,将消息投递到一个或多个队列里。8、消息持久化RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,大多数用户都会选择持久化。消息队列持久化包括3个部分:Exchange持久化,在声明时指定durable => 1Queue持久化,在声明时指定durable => 1消息持久化,在投递时指定delivery_mode => 2(1是非持久化)若Exchange和Queue都是持久化的,那么它们之间的Binding也是持久化的;而Exchange和Queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。Consumer从durable queue中取回一条消息之后并发回了ack消息,RabbitMQ就会将其标记,方便后续垃圾回收。如果一条持久化的消息没有被consumer取走,RabbitMQ重启之后会自动重建exchange和queue(以及bingding关系),消息通过持久化日志重建再次进入对应的queues,exchanges。9、安装rabbitmq
24 yum -y localinstall erlang-18.1-1.el6.x86_64.rpm rabbitmq-server-3.6.6-1.el6.noarch.rpm socat-1.7.3.2-2.el7.x86_64.rpm (安装rabbitmq) 22 chkconfig --add rabbitmq-server (加入开机自启) 25 chkconfig rabbitmq-server on (开启rabbitmq开机自启) 26 /etc/init.d/rabbitmq-server start (开启rabbitmq) 查看rebbitmq是否运行 27 ps -ef | grep rabbitmq
30 rabbitmq-plugins enable rabbitmq_management#开启rabbitmq的web管理插件,用户可以通过浏览器进行访问31 rabbitmqctl add_user admin 123.com#创建登录用户admin 密码123.com32 rabbitmqctl set_user_tags admin administrator#将admin用户添加到管理员组当中
查看端口netstat -anpt | grep 15672
浏览器访问IP地址:1567210、Rabbitmq集群(1.40-1.60的操作)集群方式Rabbitmq集群大概分为二种方式:
24 yum -y localinstall erlang-18.1-1.el6.x86_64.rpm rabbitmq-server-3.6.6-1.el6.noarch.rpm socat-1.7.3.2-2.el7.x86_64.rpm (安装rabbitmq) 22 chkconfig --add abbitmq-server (加入开机自启) 25 chkconfig rabbitmq-server on (开启rabbitmq开机自启) 26 /etc/init.d/rabbitmq-server start (开启rabbitmq) 查看rebbitmq是否运行 27 ps -ef | grep rabbitmq
[root@localhost ~]# vim /etc/hosts (四台主机都要操作) 192.168.1.30 rabbitmq1 192.168.1.40 rabbitmq2 192.168.1.50 rabbitmq3 192.168.1.60 rabbitmq4 ##不想每台都手写可以使用scp [root@rabbitmq1 ~]# scp /etc/hosts root@192.168.1.40:/etc/hosts ##提示输入yes
在rabbitmq1上查看cookie节点信息并复制 ##安装集群的时候需要节点cookie信息一致
rabbitmq2,3和4上将文件cookie信息和rabbitmq1改成相同
[root@rabbitmq2 ~]# echo MIIPQWTLIDGVGKMDWQFX > /var/lib/rabbitmq/.erlang.cookie [root@rabbitmq3 ~]#echo MIIPQWTLIDGVGKMDWQFX > /var/lib/rabbitmq/.erlang.cookie [root@rabbitmq4 ~]#echo MIIPQWTLIDGVGKMDWQFX > /var/lib/rabbitmq/.erlang.cookie
配置完成后reboot重启虚拟机,##注意:2和3需要手动重启,重启完成后主机名会变为rabbitmq1、2、3,启动不了的开机界面会一直停留在这个地方5-6分钟恢复,可以再次手动重启一下就不会这样了[root@rabbitmq1 ~]# ps -ef | grep rabbit ##重启完成后查看是否启动在rabbitmq1上操作45 rabbitmqctl stop_app46 rabbitmqctl reset 47 rabbitmqctl start_app##设置完成后会提示节点名称并复制
在rabbitmq2、3,4上加入节点(2,3和4操作相同)
[root@rabbitmq2 ~]# rabbitmqctl stop_app Stopping node rabbit@rabbitmq2 ... [root@rabbitmq2 ~]# rabbitmqctl reset Resetting node rabbit@rabbitmq2 ... [root@rabbitmq2 ~]# rabbitmqctl join_cluster --ram rabbit@rabbitmq1 Clustering node rabbit@rabbitmq2 with rabbit@rabbitmq1 ... ##-join_cluter加入集群 --ram 以内存节点方式加入后面跟节点名rabbit@rabbitmq1 [root@rabbitmq2 ~]# rabbitmqctl start_app Starting node rabbit@rabbitmq2 ... [root@rabbitmq2 ~]# rabbitmq-plugins enable rabbitmq_management The following plugins have been enabled: mochiweb webmachine rabbitmq_web_dispatch amqp_client rabbitmq_management_agent rabbitmq_management Applying plugin configuration to rabbit@rabbitmq2... started 6 plugins. ##开启rabbitmq插件,不打开的话,就不能使用浏览器访问rabbit页面进行管理
回到rabbitmq1上创建管理用户和查看集群状态
[root@rabbitmq1 ~]# rabbitmqctl add_user admin redhat Creating user "admin" ... [root@rabbitmq1 ~]# rabbitmqctl set_user_tags admin administrator Setting tags for user "admin" to [administrator] ... ##创建用户admin密码为redhat,并加入到管理员组中 [root@rabbitmq1 ~]# rabbitmq-plugins enable rabbitmq_management Plugin configuration unchanged. Applying plugin configuration to rabbit@rabbitmq1... nothing to do. [root@rabbitmq1 ~]# rabbitmqctl cluster_status ##查看节点状态 ##rabbit1工作模式为磁盘节点 rabbit2、3,4为ram内存节点模式 ##running_nodes:正在运行的节点 ##cluster_name:节点名称 ##alarms:发生问题时rabbit1、2、3,4会进行报警
浏览器访问进行管理:http://192.168.1.30:15672 用户名为admin密码redhat点击vxgp后在下面找到premissions选项设置为admin用户权限访问设置完成后再次查看设置匹配策略发布消息:
设置发布消息内容可以看到已经有通知了
添加rabbitmq节点:rabbitmq4 192.168.83.4rabbitmq1、2、3添加hosts主机都添加192.168.83.4 rabbitmq4
rabbitmq4安装同上面安装一样 [root@localhost ~]# chkconfig rabbitmq-server on [root@localhost ~]# /etc/init.d/rabbitmq-server start [root@localhost ~]# echo MIIPQWTLIDGVGKMDWQFX > /var/lib/rabbitmq/.erlang.cookie [root@localhost ~]# reboot [root@rabbitmq4 ~]# rabbitmqctl stop_app [root@rabbitmq4 ~]# rabbitmqctl reset [root@rabbitmq4 ~]# rabbitmqctl join_cluster --ram rabbit@rabbitmq1
删除节点:##在rabbitmq4上先停止节点[root@rabbitmq1 ~]# rabbitmqctl stop_app回到主节点上rabbitmq1[root@rabbitmq1 ~]# rabbitmqctl -n rabbit@rabbitmq1 forget_cluster_node rabbit@rabbitmq4 Removing node rabbit@rabbitmq4 from cluster ...##-n 指定节点名称 #=forget_cluster_node 后面跟要删除的节点名称