RabbitMQ 介绍及简单使用

1、MQ简介

    一、简单释义:

        消息总线(Message Queue),是一种跨进程、异步的通信机制,用于上下游传递消息。由消息系统来确保消息的可靠传递。

    

    二、使用场景:

        1、上下游逻辑解耦&&物理解耦。

        2、保证数据最终一致性。

        3、广播。

        4、错峰流控。


2、RabbitMQ的特点

   1、RabbitMQ是由Erlang语言开发的AMQP的开源实现,所以他有本身的高并发处理能力(自带并发光环)。

    2、AMQP:Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言灯条件的限制。

    3、可靠性(Reliablity):使用了一些机制来保证可靠性,比如持久化、传输确认、发布确认。

    4、灵活的路由(Flexible Routing):在消息进入队列之前,通过Exchange来路由消息。对于典型的路由功能,Rabbit已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现自己的Exchange。

    5、消息集群(Clustering):多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。

    6、高可用(Highly Avaliable Queues):队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

    7、多种协议(Multi-protocol):支持多种消息队列协议,如STOMP、MQTT等。

    8、多种语言客户端(Many Clients):几乎支持所有常用语言,比如Java、.NET、Ruby等。

    9、管理界面(Management UI):提供了易用的用户界面,使得用户可以监控和管理消息Broker的许多方面。

    10、跟踪机制(Tracing):如果消息异常,RabbitMQ提供了消息的跟踪机制,使用者可以找出发生了什么。

    11、插件机制(Plugin System):提供了许多插件,来从多方面进行扩展,也可以编辑自己的插件。


3、RabbitMQ的流程组件概念

    先用一张图片来展示所有的过程

    

1.png


    1、消费者(consumer):订阅某个队列,并依次进行消费数据。

    2、生产者(product):创建消息,然后发布到队列中(queue),最终将消息发送到监听的消费者。

    3、Broker:标识消息队列服务器实体,一个服务就是一个Broker。

    4、Virtual Host:虚拟主机。标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。(这里虚拟机代表的是一个独立的虚拟环境,里面的数据、路由、交换器等等与其他虚拟机不共享,包括用户。"/","/test","/p"这就是三个虚拟机,虚拟机需要自己的用户才能去消费或生产)。

    5、Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。队列与用户之间的一个组件,他相当于一个路由器,负责拿到请求并转发到对应的队列上。一个交换器可以绑定N个队列,当然一个队列也能绑定N个交换器,所以交换机与队列的关系是NXN.

    6、Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。一个队列可以绑定多个交换机,所以交换机与队列的关系是NXN.

    7、Banding:绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。绑定Exchange与Queue需要一个键,具体下面详细讲解交换机与队列的关系。

    8、Channel:信道,多路复用连接中的一条独立的双向数据流通道。新到是建立在真实的TCP连接内地虚拟链接,AMQP命令都是通过新到发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。所以不管做什么操作都需要信道来支持,没有信道就无法与MQ展开联系。

    9、Connection:网络连接,比如一个TCP连接。只有先链接上MQ服务器后才能进行信道的开辟。

    10、Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。

    11、Consumer:消息的消费者,表示一个从一个消息队列中取得消息的客户端应用程序。

    12、Message:消息,消息是不具名的,它是由消息头和消息体组成。消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息可能需要持久性存储[消息的路由模式])等。

    下面用一张更具体的图片标示一下:

    

2.png



3、部分模块详解

    Echange类型

        Exchange分发消息时,根据类型的不同分发策略有区别。目前共四种类型:direct、fanout、topic、haders(headers匹配AMQP消息的header而不是路由键(Routing-key),此外headers交换器和direct交换器完全一致,但是性能差了很多,目前几乎用不到了。所以直接看另外三种类型。)。

            Direct: 消息中的路由键(routing key)如果和Binding中的binding key一致,交换器就将消息发到对应的队列中。路由键与队列名完全匹配。就是路由名字必须全匹配。

            Fanout:每个发到fanout类型交换器的消息都会分到所有绑定的队列上去。fanout交换器不处理该路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout类型转发消息是最快的。所有此种类型的交换器不需要指定Routing Key,即使你指定了RoutingKey,系统也会自动的忽略。

            Topic:topic交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键(routing-key)和绑定键(bingding-key)的字符串切分成单词,这些单词之间用点"."隔开。它同样也会识别两个通配符:"#"和"*"。"#"匹配0个或多个单词,而"*"匹配不多不少一个单词。所以Topic是一种模糊匹配的路由方案.

            Headers:header exchange(头交换机)和主题交换机有点相似,但是不同于主题交换机的路由是基于路由键,头交换机的路由值基于消息的header数据。主题交换机路由键只有是字符串,而头交换机可以是整型和哈希值。

                    队列A:绑定交换机参数是:format=pdf,type=report,x-match=all,

                    队列B: 绑定交换机参数是:format=pdf,type=log,x-match=any,

                    队列C:绑定交换机参数是:format=zip,type=report,x-match=all,

                    测试场景:

                    消息1发送交换机的头参数是:format=pdf,type=reprot则消息传送到队列A 

                    消息2发送交换机的头参数是:format=pdf则消息传送到队列A和队列B 

                    消息3发送交换机的头参数是:format=zip,type=log则消息没有匹配队列,此消息会被丢弃

            消息header数据里有一个特殊值”x-match”,它有两个值:

                all: 默认值。一个传送消息的header里的键值对和交换机的header键值对全部匹配,才可以路由到对应交换机                  any: 一个传送消息的header里的键值对和交换机的header键值对任意一个匹配,就可以路由到对应交换机


    Queue类型:

        1、队列在声明(declare)后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存队列的属性存在差异,那么一个错误代码为406的通道级异常就会被抛出。

        2、队列的属性:

           1、名称

             2、持久性(消息代理重启后,队列依旧存在)
             3、独享(只被一个连接(connection)使用,而且当连接关闭后队列即被删除)
             4、自动删除(当最后一个消费者退订后即被删除)

             5、其他参数:(消息代理用他来完成类似与TTL的某些额外功能、死信队列等)

        3、队列名称

            应用(application)可以为队列取一个名字,或者让消息代理(broker)直接生成一个名字给队列。队列的名字可以是最多255字节的一个utf-8字符串。若希望AMQP消息代理生成队列名,需要给队列的name参数赋值一个空字符串:在同一个通道(channel)的后续的方法(method)中,我们可以使用空字符串来表示之前生成的队列名称。之所以之后的方法可以获取正确的队列名是因为通道可以默默地记住消息代理最后一次生成的队列名称。以"amq."开始的队列名称被预留做消息代理内部使用。如果试图在队列声明时打破这一规则的话,一个通道级的403 (ACCESS_REFUSED)错误会被抛出。

        4、Queue Durability 队列持久化

            持久化队列(Durable queues)会被存储在硬盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。并不是所有的场景和案例都需要将队列持久化。持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被重新声明,无论怎样,只有经过持久化的消息才能被重新恢复。消息如果需要持久化则需要交换机持久化、队列持久化、消息持久化,三个同时设置为持久化才行,缺一不可。否则在重启时我无法恢复之前的数据。

        5、死信队列:

            在RabbitMQ中队列中的消息是可以设置过期时间的,但是即使设置了过期时间消息队列到了时间也不会进行删除,但是也不可被消费,当消费到那个数据时会为空。所以在RabbitMQ中添加了一个死信机制,在创建队列时可以传入参数x-dead-letter-exchange:交换器名称,当此队列中的数据到期时会自动把那个数据以及RoutingKey传输到参数绑定的交换器上去在进行路由入队,当然也可以设置RoutingKey在创建时使用x-dead-letter-routing-key:转发时的Routing Key.这样就生成了一个队列并且可以将失效的数据转发到另一个队列中去,这就是死信队列。

        6、Ack,Nack,Reject:

            Ack:手动的告诉服务器消息消费成功,可以带入参数:AMQP_MULTIPLE或AMQP_NOPARAM(无视其他标签,默认值)

            Nack:手动告诉服务器一条或多条消费失败,带入参数flag:AMQP_REQUEUE(重新入队) 或 AMQP_MULTIPLE(多个丢弃) 或 AMQP_NOPARAM(无视其他标签,默认值)

            Reject:手动告诉服务器一条消息消费失败,带入参数flag:AMQP_REQUEUE (重新入队) 或 AMQP_NOPARAM(无视其他标签,默认值)

            注意:Nack、Reject如果没有显示的调用flag:AMQP_REQUEUE 则会丢弃消费的消息。

            

4:PHP中使用AMQP扩展来调用RabbitMQ服务

    1、安装扩展:

        扩展下载地址:http://pecl.php.net/package/amqp

           Windows下载好扩展包之后解压,并将里面的php_amqp.dll文件和rabbitmq.4.dll文件拷到PHP的扩展目录,并且将rabbitmq.4.dll文件放到php.exe执行文件文件目录,即ext扩展目录平级,否则将会抱AMQP模块找不到。

        2、编码使用:

<?php
class RabbitMQModel {

    protected static $CONFIG;

    protected $vhost = '/';

    protected $connect = '';
    protected $channel = '';
    protected $exchange = '';
    protected $eqeue = '';

    public function __construct($vhost='/')
    {
        $this->vhost = $vhost ? : '/';
        self::$CONFIG = [
            '/'     =>  [
                'host' => '127.0.0.1',
                'vhost' => '/',
                'port' => 5672,
                'login' => 'test',
                'password' => 'test'
            ]
        ];
        $this->initRabbitConnect();
    }

    public function getConfig($key, $default="") {
        if ($key == '*') {
            return self::$CONFIG[$this->vhost] ?? [];
        }

        return self::$CONFIG[$this->vhost][$key] ?? $default;
    }

    public function setVhost($v) {
        $this->vhost = $v;
        $this->initRabbitConnect();
    }

    protected function initRabbitConnect() {
        $this->connect = new AMQPConnection($this->getConfig('*'));
        if (!$this->connect->connect()) {
            echo "Cannot connect to the broker";
            exit();
        }
        //创建信道
        $this->channel = new AMQPChannel($this->connect);
        //创建交换器
        $this->exchange = new AMQPExchange($this->channel);
    }

    protected function getExchangeObj() {
        if (empty($this->exchange)) {
            $this->initRabbitConnect();
        }
        return $this->exchange;
    }

    protected function getQueueObj() {
        if (empty($this->eqeue)) {
            //创建队列
            $this->eqeue = new AMQPQueue($this->channel);
        }
        return $this->eqeue;
    }

    //设置交换机类型
    //AMQP_EX_TYPE_DIRECT:直连交换机
    //AMQP_EX_TYPE_FANOUT:扇形交换机
    //AMQP_EX_TYPE_HEADERS:头交换机
    //AMQP_EX_TYPE_TOPIC:主题交换机
    //flags:AMQP_DURABLE、AMQP_INTERNAL、AMQP_INTERNAL|AMQP_DURABLE
    public function createExchange($name, $exchangeType=AMQP_EX_TYPE_DIRECT, $flags=AMQP_DURABLE,$argenment=[]) {
        $exchangeObj = $this->getExchangeObj();
        $exchangeObj->setName($name);
        $exchangeObj->setType($exchangeType);
        $exchangeObj->setFlags($flags);
        if($argenment) {
            $exchangeObj->setArguments($argenment);
        }
        $exchangeObj->declareExchange();
    }

    /**
     * 创建一个队列
     * @param $name
     * @param int $flags
     * @throws AMQPChannelException
     * @throws AMQPConnectionException
     */
    public function createQueue($name, $flags=AMQP_DURABLE, $argenment=[]) {
        $queueObj = $this->getQueueObj();
        //设置队列名称
        $queueObj->setName($name);
        //设置队列持久
        $queueObj->setFlags($flags);
        if ($argenment) {
            //设队列参数,如要设置死信队列转发器就在此处添加x-dead-letter-exchange:交换器名称 参数
            $queueObj->setArguments($argenment);
        }
        //声明消息队列
        $queueObj->declareQueue();
    }

    /**
     * 绑定交换器和队列
     * @param string $routeKey
     */
    public function bind($routeKey='key') {
        //将队列与交换器进行绑定
        $this->eqeue->bind($this->exchange->getName(), $routeKey);
    }

    /**
     * 监听并消费消息
     * @param $fallBack
     */
    public function consume($fallBack) {
        $this->eqeue->consume($fallBack);
    }

    public function pushMsg($msg, $routingKey='', $delivery_mode=2) {
        $exchange = $this->getExchangeObj();
        //AMQP_NOPARAM表示无视其他标签
        return $exchange->publish(json_encode($msg), $routingKey, AMQP_NOPARAM, array('delivery_mode' => $delivery_mode));
    }
}
<?php
include "RabbitMQModel.php";

$mq = new RabbitMQModel();
//每次在使用交换器时都需要创建,因为如果不存在则会在服务器上创建,如果存在服务器则会忽略,
//如果参数发生了变动则会报错,所以每次要是用同一个名字的交换器参数必须一致、类型必须一致、持久性必须一致,下面队列也是相同的。
//创建exchange1交换器 类型默认为持久性、直连型(AMQP_EX_TYPE_DIRECT)
$mq->createExchange("exchange1");
echo "exchange1交换器创建成功" . PHP_EOL;

//创建queue1 默认为持久性队列不带任何参数
$mq->createQueue('queue1');
echo "queue1队列创建成功". PHP_EOL;

//绑定交换器与队列 因为封装的原因,所以他是将上面最近一次创建的交换器、与最近一次创建的队列进行的绑定,如有不同的业务需求可以自己修改。
//这个绑定只需要一次,当然重复绑定不会报错
$mq->bind('key_eq_1');
echo "绑定成功" . PHP_EOL;

//发送消息到交换机 此处会自动调用最近创建的一个交换器然后发送routing key到那个交换器上进行路由,如果key在交换器上不存在(匹配不成功)
//则数据会被丢失,如果匹配成功则数据加入到对应的队列,这里是传到exchange1交换器上并且路由key_eq_1这个key是存在的,所以数据会被发送到queue1队列上
$mq->pushMsg("123456", 'key_eq_1');

//开始消费消息 阻塞试
$mq->consume(function ($envelope, $queue){
   //休眠两秒,模拟消费耗时
   //$envelope->getHeader('name');//获取消息体的headers中key/value,传入的参数是key会返回对应的value
   //$envelope->getExchangeName();//获取交换器的名称
   //还有更多方法,请查看扩展的接口文档。
   echo $envelope->getBody(). PHP_EOL ;
   //显式确认,队列收到消费者显式确认后,会删除该消息
   $queue->ack($envelope->getDeliveryTag());
});

//以上的不同代码块都可以单独执行并不是上下关系,比如只是消费者则只需要创建队列然后进行监听就可以了,需要创建交换器,因为消费者不与交换器打交道


                        以上部分来自互联网,大部分原创,如有错误的地方请与我联系,我会尽快回复你,谢谢!

阅读数:229
如有疑问请与我联系:点击与我联系