目录

分布式消息队列RabbitMQ

消息队列MQ技术的介绍和原理

消息中间件概述

消息队列技术是分布式应用间交换信息的一种技术。消息队列可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读走。通过消息队列,应用程序可独立地执行-它们不需要知道彼此的位置、或在继续执行前不需要等待接收程序接收此消息。

在分布式计算环境中,为了集成分布式应用,开发者需要对异构网络环境下的分布式应用提供有效的通信手段。为了管理需要共享的信息,对应用提供公共的信息交换机制是重要的。

设计分布式应用的方法主要有

  1. 远程过程调用(PRC)–分布式计算环境(DCE)的基础标准成分之一;
  2. 对象事务监控(OTM)–基于CORBA的面向对象工业标准与事务处理(TP)监控技术的组合;
  3. 消息队列(MessageQueue)–构造分布式应用的松耦合方法。

(a) 分布计算环境/远程过程调用 (DCE/RPC)

RPC是DCE的成分,是一个由开放软件基金会(OSF)发布的应用集成的软件标准。RPC模仿一个程序用函数引用来引用另一程序的传统程序设计方法,此引用是过程调用的形式,一旦被调用,程序的控制则转向被调用程序。

在RPC实现时,被调用过程可在本地或远地的另一系统中驻留并在执行。当被调用程序完成处理输入数据,结果放在过程调用的返回变量中返回到调用程序。RPC完成后程序控制则立即返回到调用程序。因此RPC模仿子程序的调用/返回结构,它仅提供了Client(调用程序)和Server(被调用过程)间的同步数据交换。

(b) 对象事务监控 (OTM)

基于CORBA的面向对象工业标准与事务处理(TP)监控技术的组合,在CORBA规范中定义了:使用面向对象技术和方法的体系结构;公共的Client/Server程序设计接口;多平台间传输和翻译数据的指导方针;开发分布式应用接口的语言(IDL)等,并为构造分布的Client/Server应用提供了广泛及一致的模式。

(c) 消息队列 (Message Queue)

消息队列为构造以同步或异步方式实现的分布式应用提供了松耦合方法。消息队列的API调用被嵌入到新的或现存的应用中,通过消息发送到内存或基于磁盘的队列或从它读出而提供信息交换。消息队列可用在应用中以执行多种功能,比如要求服务、交换信息或异步处理等。

什么是中间件

中间件是一种独立的系统软件或服务程序,分布式应用系统借助这种软件在不同的技术之间共享资源,管理计算资源和网络通讯。它在计算机系统中是一个关键软件,它能实现应用的互连和互操作性,能保证系统的安全、可靠、高效的运行。

中间件位于用户应用和操作系统及网络软件之间,它为应用提供了公用的通信手段,并且独立于网络和操作系统。中间件为开发者提供了公用于所有环境的应用程序接口,当应用程序中嵌入其函数调用,它便可利用其运行的特定操作系统和网络环境的功能,为应用执行通信功能。

如果没有消息中间件完成信息交换,应用开发者为了传输数据,必须要学会如何用网络和操作系统软件的功能,编写相应的应用程序来发送和接收信息,且交换信息没有标准方法,每个应用必须进行特定的编程从而和多平台、不同环境下的一个或多个应用通信。

例如,为了实现网络上不同主机系统间的通信,将要求具备在网络上如何交换信息的知识(比如用TCP/IP的socket程序设计);为了实现同一主机内不同进程之间的通讯,将要求具备操作系统的消息队列或命名管道(Pipes)等知识。

目前中间件的种类很多,如交易管理中间件(如IBM的CICS)、面向Java应用的Web应用服务器中间件(如IBM的WebSphere Application Server)等,而消息传输中间件(MOM)是其中的一种。

它简化了应用之间数据的传输,屏蔽底层异构操作系统和网络平台,提供一致的通讯标准和应用开发,确保分布式计算网络环境下可靠的、跨平台的信息传输和数据交换。它基于消息队列的存储-转发机制,并提供特有的异步传输机制,能够基于消息传输和异步事务处理实现应用整合与数据交换。

IBM 消息中间件MQ以其独特的安全机制、简便快速的编程风格、卓越不凡的稳定性、可扩展性和跨平台性,以及强大的事务处理能力和消息通讯能力,成为业界市场占有率最高的消息中间件产品。

MQ具有强大的跨平台性,它支持的平台数多达35种。

它支持各种主流Unix操作系统平台,如:HP-UX、AIX、SUN Solaris、Digital UNIX、Open VMX、SUNOS、NCR UNIX;支持各种主机平台,如:OS/390、MVS/ESA、VSE/ESA;同样支持Windows NT服务器。在PC平台上支持Windows9X/Windows NT/Windows 2000和UNIX (UnixWare、Solaris)以及主要的Linux版本(Redhat、TurboLinux等)。此外,MQ还支持其他各种操作系统平台,如:OS/2、AS/400、Sequent DYNIX、SCO OpenServer、SCO UnixWare、Tandem等。

MQ的基本概念

1) 队列管理器

队列管理器是MQ系统中最上层的一个概念,由它为我们提供基于队列的消息服务。

2) 消息

在MQ中,我们把应用程序交由MQ传输的数据定义为消息,我们可以定义消息的内容并对消息进行广义的理解,比如:用户的各种类型的数据文件,某个应用向其它应用发出的处理请求等都可以作为消息。

消息有两部分组成:

消息描述符(Message Discription或Message Header),描述消息的特征,如:消息的优先级、生命周期、消息Id等;

消息体(Message Body),即用户数据部分。

3) 队列

队列是消息的安全存放地,队列存储消息直到它被应用程序处理。

4) 通道

通道是MQ系统中队列管理器之间传递消息的管道,它是建立在物理的网络连接之上的一个逻辑概念,也是MQ产品的精华。

在MQ中,主要有三大类通道类型,即消息通道,MQI通道和Cluster通道。

消息通道是用于在MQ的服务器和服务器之间传输消息的,需要强调指出的是,该通道是单向的,它又有发送(sender), 接收(receive), 请求者(requestor), 服务者(server)等不同类型,供用户在不同情况下使用。

MQI通道是MQ Client和MQ Server之间通讯和传输消息用的,与消息通道不同,它的传输是双向的。

群集(Cluster)通道是位于同一个MQ 群集内部的队列管理器之间通讯使用的。

MQ的工作原理

MQ的工作原理

MQ的通讯模式

1) 点对点通讯

点对点方式是最为传统和常见的通讯方式,它支持一对一、一对多、多对多、多对一等多种配置方式,支持树状、网状等多种拓扑结构。

2) 多点广播

MQ适用于不同类型的应用。其中重要的,也是正在发展中的是“多点广播”应用,即能够将消息发送到多个目标站点(Destination List)。可以使用一条MQ指令将单一消息发送到多个目标站点,并确保为每一站点可靠地提供信息。

MQ不仅提供了多点广播的功能,而且还拥有智能消息分发功能,在将一条消息发送到同一系统上的多个用户时,MQ将消息的一个复制版本和该系统上接收者的名单发送到目标MQ系统。目标MQ系统在本地复制这些消息,并将它们发送到名单上的队列,从而尽可能减少网络的传输量。

3) 发布/订阅(Publish/Subscribe)模式

发布/订阅功能使消息的分发可以突破目的队列地理指向的限制,使消息按照特定的主题甚至内容进行分发,用户或应用程序可以根据主题或内容接收到所需要的消息。

发布/订阅功能使得发送者和接收者之间的耦合关系变得更为松散,发送者不必关心接收者的目的地址,而接收者也不必关心消息的发送地址,而只是根据消息的主题进行消息的收发。

4) 群集(Cluster)

为了简化点对点通讯模式中的系统配置,MQ提供Cluster(群集)的解决方案。群集类似于一个域(Domain),群集内部的队列管理器之间通讯时,不需要两两之间建立消息通道,而是采用群集(Cluster)通道与其它成员通讯,从而大大简化了系统配置。

此外,群集中的队列管理器之间能够自动进行负载均衡,当某一队列管理器出现故障时,其它队列管理器可以接管它的工作,从而大大提高系统的高可靠性。

市面上的主流MQ

过去的4年里,人们写了有好多好多的开源的MQ服务器啊。其中大多数都是某公司例如LiveJournal写出来用来解决特定问题的。它们的确不关心上面跑的是什么类型的消息,不过他们的设计思想通常是和创建者息息相关的(消息的持久化,崩溃恢复等通常不在他们考虑范围内)。不过,有三个专门设计用来做及其灵活的消息队列的程序值得关注:

  1. Apache ActiveMQ
  2. ZeroMQ
  3. RabbitMQ

Apache ActiveMQ 曝光率最高,不过看起来它有些问题,可能会造成丢消息。不可接受,下一个。

ZeroMQ 和 RabbitMQ 都支持一个开源的消息协议-AMQP。

AMQP的一个优点是它是一个灵活和开放的协议,以便和另外两个商业化的Message Queue (IBM和Tibco)竞争,很好。不过ZeroMQ不支持消息持久化和崩溃恢复,不太好。如果你不在意消息持久化和崩溃恢复,试试ZeroMQ吧,延迟很低,而且支持灵活的拓扑。

剩下的只有RabbitMQ了。

RabbitMQ消息队列

RabbitMQ简介

rabbitMQ是一个在AMQP协议标准基础上完整的,可服用的企业消息系统。他遵循Mozilla Public License开源协议。采用 Erlang 实现的工业级的消息队列(MQ)服务器。

RabbitMQ的官方站:http://www.rabbitmq.com/

AMQP(高级消息队列协议) 是一个异步消息传递所使用的应用层协议规范,作为链路层协议,而不是API(例如JMS),AMQP客户端能够无视消息的来源任意发送和接受信息。

AMQP的原始用途只是为金融界提供一个可以彼此协作的消息协议,而现在的目标则是为通用消息队列架构提供通用构建工具。因此,面向消息的中间件 (MOM)系统,例如发布/订阅队列,没有作为基本元素实现。反而通过发送简化的AMQ实体,用户被赋予了构建例如这些实体的能力。这些实体也是规范的一部分,形成了在链路层协议顶端的一个层级:AMQP模型。

这个模型统一了消息模式,诸如之前提到的发布/订阅,队列,事务以及流数据,并且添加了额外的特性,例如更易于扩展,基于内容的路由。

AMQP 有四个非常重要的概念:虚拟机(virtual host),交换机(exchange),队列(queue)和绑定(binding)。

  1. 虚拟机: 通常是应用的外在边界,我们可以为不同的虚拟机分配访问权限。虚拟机可持有多个交换机、队列和绑定。
  2. 交换机: 从连接通道(Channel)接收消息,并按照特定的路由规则发送给队列。
  3. 队列: 消息最终的存储容器,直到消费客户端(Consumer)将其取走。
  4. 绑定: 也就是所谓的路由规则,告诉交换机将何种类型的消息发送到某个队列中。

通常的操作流程是:

(1) 消费者: 创建信息通道。 
(2) 消费者: 定义消息队列。 
(3) 消费者: 定义特定类型的交换机。 
(4) 消费者: 设定绑定规则 (包括交换机名称、队列名称以及路由键)(5) 消费者: 等待消息。 
(6) 生产者: 创建消息。 
(7) 生产者: 将消息投递给信息通道 (注明接收交换机名称和路由键)(8) 交换机: 获取消息,依据交换机类型决定是否匹配路由规则 (如需匹配,则对比消息路由键和绑定路由键)(9) 消费者: 获取并处理消息,发送反馈。 
(10) 结束: 关闭通道和连接。 

RabbitMQ交换机类型

Direct Exchange

处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。

Fanout Exchange

不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

Topic Exchange

将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.” 只会匹配到“audit.irs”。

RabbitMQ安装

Rabbitmq 是用 erlang 语言写的,所以使用还要安装 Erlang。

http://www.erlang.org/download.html 下载,我下载的是 otpsrcR14B.tar.gz

wget http://www.erlang.org/download/otp_src_R14B.tar.gz
tar -zxf otp_src_R14B.tar.gz
./configure  
make  
make install

最好不要用 –prefix 指定 erlang 安装目录,默认安装到 /usr/local,改了安装 rabbitmq 就不方便。

安装 erlang 还要安装 python 与 simplejson,我环境中已经安装了 python 2.6.4,simplejson 只要 easyinstall 就可以了 <code bash>easyinstall simplejson</code>

wget http://www.rabbitmq.com/releases/rabbitmq-server/v1.7.2/rabbitmq-server-1.7.2.tar.gz
tar zxf rabbitmq-server-1.7.2.tar.gz
cd rabbitmq-server-1.7.2
make TARGET_DIR=/usr/local/rabbitmq SBIN_DIR=/usr/local/rabbitmq/sbin MAN_DIR=/usr/local/rabbitmq/man install

RabbitMQ管理

服务管理

指令操作
rabbitmqctl stopapp/startapp/reset停止/启动/重置服务
rabbitmqctl status 查看状态
rabbitmqctl rotatelogs|回滚日志| |rabbitmqctl adduser/deleteuser/changepassword/clearpassword/listusers授权用户管理
rabbitmqctl setadmin/clearadmin设置用户为管理员
rabbitmqctl addvhost/deletevhost/listvhosts|管理vhosts| |rabbitmqctl setpermissions/listpermissions/chearpermissions/listuserpermissions列举权限
rabbitmqctl listconnections|列举连接器| |rabbitmqctl listchannels列举通道
rabbitmqctl listexchanges|列举交换机| |rabbitmqctl listqueues列举队列
rabbitmqctl listbindings|列举绑定标识| |rabbitmqctl listconsumers列举消息接收者

RabbitMQ Python API安装

下载安装pika

easy_install pika

或者直接从官方网站下载https://github.com/tonyg/pika,各版本对应的源码,解压后

python setup.py install

RabbitMQ Php API安装

参考文档

下载安装rabbitmq-c

http://hg.rabbitmq.com/rabbitmq-codegen
http://hg.rabbitmq.com/rabbitmq-c/summary

把rabbitmq-codegen解压到rabbitmq-c的解压目录中,在rabbitmq-c的目录中

autoreconf -i
./configure
make && make install

下载安装PECL AMQP client

编译过程与其它php编译模块相同,此处省略

Amqp Configure Options

NameDefaultChangeable
amqp.hostlocalhostPHPINIALL
amqp.vhost/ PHPINIALL
amqp.port5672 PHPINIALL
amqp.loginguest PHPINIALL
amqp.passwordguestPHPINIALL

RabbitMQ实战训练

直接上最复杂的topic类型的交换机,根据日志等级为关键词来分析日志信息:

  1. kern. - warn.
  2. info.*
  3. ….

测试rabbitmq

import pika
 
# 设置连接参数
credentials = pika.PlainCredentials('guest', 'xxxxxx')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
 
# 创建连接
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
 
# 关闭连接
connection.close()

消息接受者 receive.py

#!/bin/env python
import pika
import sys
 
# 连接器参数设置
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters(
                      host='www9.53kf.com',
                      virtual_host='/', 
                      credentials=credentials, 
                      heartbeat=30)
 
# 定义交换机名称和类型,消息发布者与消息接收者必须一致
exchange_name="topic_logs2"
exchange_type="topic"
 
# 建立连接,生成消息通道
conn = pika.AsyncoreConnection(parameters)
channel = conn.channel()
 
# 交换机绑定到通道中,也可以设置durable=True,保证交换机不会删除
channel.exchange_declare(exchange=exchange_name,type=exchange_type)
 
# 下面这个方法是生成随机唯一队列,获取队列名称
result=channel.queue_declare(exclusive=True)
queue_name=result.queue
 
# 自定义队列名称,队列属性,此处不使用
#queue_name="message"
#channel.queue_declare(queue=queue_name, durable=True)
 
# 判断绑定标识符是否正确
binding_keys = sys.argv[1:]
if not binding_keys:
	print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
	sys.exit(1)
 
# 如果提供了绑定标识符,设置队列的路由标志
for binding_key in binding_keys:
	channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key=binding_key)
 
# 回调函数
def callback(ch, method, properties, body):
	print " [-] %r:%r" % (method.routing_key, body,)
 
print '[%r] Waiting for messages,To exit press Ctrl+C ' %(queue_name,)
 
channel.basic_consume(callback,queue=queue_name,no_ack=True)
 
try:
	pika.asyncore_loop()
except (KeyboardInterrupt,SystemExit):
	channel.queue_delete(queue=queue_name)
	channel.close()
	conn.close()

消息接受者 java版本

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConnectionParameters;
import com.rabbitmq.client.GetResponse;
public class ConsumerTest {
        public static void main(String[] args) {
                try {
                        ConnectionParameters params = new ConnectionParameters();
                        params.setUsername("guest");
                        params.setPassword("guest");
                        params.setVirtualHost("/");
                        params.setRequestedHeartbeat(0);
                        ConnectionFactory factory = new ConnectionFactory(params);
                        Connection conn = factory.newConnection("192.168.56.110", 5672);
                        Channel channel = conn.createChannel();
                        boolean noAck = false;
                        GetResponse response = channel.basicGet("queue1", noAck);
                        if (response == null) {
                                System.out.println("No message!");
                        } else {
                                byte[] body = response.getBody();
                                long deliveryTag = response.getEnvelope().getDeliveryTag();
                                String str = new String(body);
                                System.out.println(str);
                                channel.basicAck(deliveryTag, false); // acknowledge receipt of the message
                        }
                        channel.close();
                        conn.close();
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }
}

消息发布者 java版本

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConnectionParameters;
import com.rabbitmq.client.MessageProperties;
public class PublishTest {
        public static void main(String[] args) {
                try {
                        ConnectionParameters params = new ConnectionParameters();
                        params.setUsername("guest");
                        params.setPassword("guest");
                        params.setVirtualHost("/");
                        params.setRequestedHeartbeat(0);
                        ConnectionFactory factory = new ConnectionFactory(params);
                        Connection conn = factory.newConnection("192.168.56.110", 5672);
                        Channel channel = conn.createChannel();
                        channel.exchangeDeclare("exchange1", "direct", true);
                        channel.queueDeclare("queue1", true);
                        channel.queueBind("queue1", "exchange1", "key1");
                        byte[] messageBodyBytes = "Hello, world!".getBytes();
                        channel.basicPublish("exchange1", "key1", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
                        channel.close();
                        conn.close();
                        System.out.println("Over.");
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }
}

消息发布者 send.py

#!/bin/env python
import pika
import sys
 
# Connection Parameters Settings
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters(
                      host='www9.53kf.com',
                      virtual_host='/',
                      credentials=credentials, 
                      heartbeat=30)
 
exchange_name="topic_logs2"
exchange_type="topic"
 
# fork connection
conn = pika.AsyncoreConnection(parameters)
channel = conn.channel()
 
# set durable=True ,rabbitMQ will kept it to disk
channel.exchange_declare(exchange=exchange_name,type=exchange_type)
 
message = ' '.join(sys.argv[2:]) or 'Hello World!'
routing_key = sys.argv[1]
channel.basic_publish(exchange=exchange_name,
                      routing_key=routing_key,
                      body=message)
print " [x] Sent %r:%r" % (routing_key, message)
conn.close()

用了能够产生随机消息,我用bash封装了一下

#!/bin/sh
n=$RANDOM;
n=$((n%=3))
date=`date`
 
if [ $n -eq 0 ];then
	/root/new_send3.py  'kern.error' "A kernel error at $date"
elif [ $n -eq 1 ];then
	/root/new_send3.py  'warn.error' "A warning error at $date"
else
	/root/new_send3.py  'info.error' "A infomation error at $date"
fi

消息发布者 php版本

<?php
ini_set("display_errors",true);
if(isset($_POST['submit']) && $_POST['submit']=="OK") {
        $exchange_name="topic_logs2";
        $route_key=$_POST['state'].".*";
        //$queue_name="message";
 
        $amqp_array=array(
                'host'=>'www9.53kf.com',
                'vhost'=>'/',
                'port'=>'5672',
                'login'=>'guest',
                'password'=>'guest',
                );
        $cnn=new AMQPConnection($amqp_array);
 
        if($cnn->connect()){
                echo "Established a connection to the broker.\\ \\ The content is ".$_POST['greeting']."\\ ";
 
                $ex=new AMQPExchange($cnn);
                $ex->declare($exchange_name,AMQP_EX_TYPE_TOPIC);
 
                // Create a new queue
                //$q = new AMQPQueue($cnn,$queue_name);
                //$q->declare($queue_name);
 
                // Bind it on the exchange to routing.key
                //$ex->bind($queue_name,$route_key);
 
                // Publish a message to the exchange with a routing key
                $ex->publish($_POST['greeting'],$route_key);
        }
        else{
                echo "Cannot connect to the broker.\n";
        }
 
        if (!$cnn->disconnect()) {
                throw new Exception('Could not disconnect');
        }
}
?>
<H2>AMQP PHP Testing</H2>
<form action="amqp.php" method="post">
<input type="radio" name="state" value = "kern" checked>kernel level\\ 
<input type="radio" name="state" value = "warn">warn level\\ 
<input type="radio" name="state" value = "info">info level\\ 
<input type="text" name="greeting" value = "" size="20" maxlength="50">\\ 
<input type="submit" name="submit" value="OK"><input type="reset" value="Again">
</form>

源代码下载