软件开发
OA系统

扫一扫微信二维码

RabbitMQ 发布订阅实战实现延时重试队列

发布时间2018年06月29日标签:发布,订阅,实战,实现,延时,重试,队列 30
本文作者: 计算机软件开发 - 管宜尧 。未经作者允许,禁止转载!
迎接加入计算机软件开发 专栏作者。

RabbitMQ是一款使用Erlang开发的开源新闻队列。本文假设读者对RabbitMQ是什么已经有了基本的了解,假如你还不知道它是什么以及可以用来做什么,建议先从官网的 RabbitMQ Tutorials 入门教程开始学习。

本文将会讲解如何使用RabbitMQ实现延时重试和失败江苏人事考试中心新闻队列,实现可靠的新闻消耗,消耗失败后,主动延时将新闻重新投递,当达到肯定的重试次数后,将新闻投递到失败新闻队列,等待人工介入处理。在这里我会带领大家一步一步的实现一个带有失败重试功能的发布订阅组件,使用该组件后可以特别很是简单的实现新闻的发布订阅,在进行营业开发的时候,营业开发人员可以将重要精力放在营业逻辑实现上,而不必要花费时间去理解RabbitMQ的一些复杂概念。

概要

我们将会实现如下功能

  • 结合RabbitMQ的Topic模式和Work Queue模式实现生产方产生新闻,消耗方按需订阅,新闻投递到消耗方的队列之后,多个worker同时对新闻进行消耗
  • 结合RabbitMQ的 Message TTL 和 Dead Letter Exchange 实现新闻的延时重试功能
  • 新闻达到最大重试次数之后,将其投递到失败队列,等待人工介入处理bug后,重新将其加入队列消耗

详细流程见下图

xxx

  1. 生产者发布新闻到主Exchange
  2. 主Exchange根据Routing Key将新闻分发到对应的新闻队列
  3. 多个消耗者的worker进程同时对队列中的新闻进行消耗,因此它们之间采用“竞争”的体例来争夺新闻的消耗
  4. 新闻消耗后,不管成功失败,都要返回ACK消耗确认新闻给队列,避免新闻消耗确认机制导致重复投递,同时,假如新闻处理成功,则结束流程,否则进入重试阶段
  5. 假如重试次数小于设定的最大重试次数(3次),则将新闻重新投递到Retry Exchange的重试队列
  6. 重试队列不必要消耗者直接订阅,它会等待新闻的有用时间过期之后,重新将新闻投递给Dead Letter Exchange,我们在这里将其设置为主Exchange,实现延时后重新投递新闻,如许消耗者就可以重新消耗新闻
  7. 假如三次以上都是消耗失败,则认为新闻无法被处理,直接将新闻投递给Failed Exchange的Failed Queue,这时候应用可以触发报警机制,以关照相干责任人处理
  8. 等待人工介入处理(解决bug)之后,重新将新闻投递到主Exchange,如许就可以重新消耗了

技术实现

Linus Torvalds 曾经说过

Talk is cheap. Show me the code

我分别用Java和PHP实现了本文所讲述的方案,读者可以通过参考代码以及本文中的基本步骤来更好的理解

  • rabbitmq-pubsub-php
  • rabbitmq-pubsub-java

创建Exchange

为了实现新闻的延时重试和失败存储,我们必要创建三个Exchange来处理新闻。

  • master 主Exchange,发布新闻时发布到该Exchange
  • master.retry 重试Exchange,新闻处理失败时(3次以内),将新闻重新投递给该Exchange
  • master.failed 失败Exchange,超过三次重试失败后,新闻投递到该Exchange

所有的Exchange声明(declare)必须使用以下参数

参数 值 说明 exchange – Exchange名称 type topic Exchange 类型 passive false 假如Exchange已经存在,则返回成功,不存在则创建 durable true 持久化存储Exchange,这里仅仅是Exchange自己持久化,新闻和队列必要单独指定其持久化 no-wait false 该方法必要应答确认

Java代码

1234 // 声明Exchange:主体,失败,重试channel.exchangeDeclare("master", "topic", true);channel.exchangeDeclare("master.retry", "topic", true);channel.exchangeDeclare("master.failed", "topic", true);

PHP代码

123456 // 通俗交换机$this->channel->exchange_declare('master', 'topic', false, true, false);// 重试交换机$this->channel->exchange_declare('master.retry', 'topic', false, true, false);// 失败交换机$this->channel->exchange_declare('master.failed', 'topic', false, true, false);

在RabbitMQ的管理界面中,我们可以看到创建的三个Exchange

-w539

新闻发布

新闻发布时,使用basic_publish方法,参数如下

参数 值 说明 message – 发布的新闻对象 exchange master 新闻发布到的Exchange routing-key – 路由KEY,用于标识新闻类型 mandatory false 是否强制路由,指定了该选项后,假如没有订阅该新闻,则会返回路由不可达错误 immediate false 指定了当新闻无法直接路由给消耗者时如何处理

发布新闻时,对于message对象,其内容建议使用json编码后的字符串,同时新闻必要标识以部属性

1 'delivery_mode'=> 2 // 1为非持久化,2为持久化

Java代码

123456 channel.basicPublish(    "master",     routingKey,     MessageProperties.PERSISTENT_BASIC, // delivery_mode    message.getBytes());

PHP代码

12345 $msg = new AMQPMessage($message->serialize(), [    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,]); $this->channel->basic_publish($msg, 'master', $routingKey);

新闻订阅

新闻订阅的实现相对复杂一些,必要完成队列的声明以及队列和Exchange的绑定。

Declare Queue

对于每一个订阅新闻的服务,都必须创建一个该服务对应的队列河南人事考试网站,将该队列绑定到关注的路由规则,如许之后,新闻生产者将新闻投递给Exchange之后,就会按照路由规则将新闻分发到对应的队列供消耗者消耗了。

消耗服务必要declare三个队列

  • [queue_name] 队列名称,格式吻合 [服务名称]@订阅服务标识
  • [queue_name]@retry 重试队列
  • [queue_name]@failed 失败队列

订阅服务标识是客户端本身对订阅的分类标识符,比如用户中间服务(服务名称ucenter),包含两个订阅:user和enterprise,这里两个订阅的队列名称就为 ucenter@userucenter@enterprise,其对应的重试队列为 ucenter@user@retryucenter@enterprise@retry

Declare队列时,参数规定规则如下

参数 值 说明 queue – 队列名称 passive false 队列不存在则创建,存在则直接成功 durable true 队列持久化 exclusive false 排他,指定该选项为true则队列只对当前连接有用,连接断开后主动删除 no-wait false 该方法必要应答确认 auto-delete false 当不再使用时,是否主动删除

对于@retry重试队列,必要指定额外参数

12 'x-dead-letter-exchange' => 'master''x-message-ttl'          => 30 * 1000 // 重试时间设置为30s

这里的两个header字段的含义是,在队列中耽误30s后,将该新闻重新投递到x-dead-letter-exchange对应的Exchange中

Java代码

1234567891011121314 // 声明监听队列channel.queueDeclare(    queueName, // 队列名称    true,      // durable    false,     // exclusive    false,     // autoDelete    null       // arguments);channel.queueDeclare(queueName + "@failed", true, false, false, null); Map arguments = new HashMap();arguments.put("x-dead-letter-exchange", exchangeName());arguments.put("x-message-ttl", 30 * 1000);channel.queueDeclare(queueName + "@retry", true, false, false, arguments);

PHP代码

1234567891011121314 $this->channel->queue_declare($queueName, false, true, false, false, false);$this->channel->queue_declare($failedQueueName, false, true, false, false, false);$this->channel->queue_declare(    $retryQueueName, // 队列名称    false,           // passive    true,            // durable    false,           // exclusive    false,           // auto_delete    false,           // nowait    new AMQPTable([        'x-dead-letter-exchange' => 'master',        'x-message-ttl'          => 30 * 1000,    ]));

在RabbitMQ的管理界面中,Queues部分可以看到我们创建的三个队列

查看队列的细致信息,我们可以看到 queueName@retry 队列与其它两个队列的不同

-w486

Bind Exchange & Queue

创建完队列之后,必要将队列与Exchange绑定(bind),不同队列必要绑定到之前创建的对应的Exchange上面

Queue Exchange [queue_name] master [queue_name]@retry master.retry [queue_name]@failed master.failed

绑准时,必要提供订阅的路由KEY,该路由KEY与新闻发布时的路由KEY对应,区别是这里可以使用通配符同时订阅多种类型的新闻。

参数 值 说明 queue – 绑定的队列 exchange – 绑定的Exchange routing-key – 订阅的新闻路由规则 no-wait false 该方法必要应答确认

Java代码

1234 // 绑定监听队列到Exchangechannel.queueBind(queueName, "master", routingKey);channel.queueBind(queueName + "@failed", "master.failed", routingKey);channel.queueBind(queueName + "@retry", "master.retry", routingKey);

PHP代码

123 $this->channel->queue_bind($queueName, 'master', $routingKey);$this->channel->queue_bind($retryQueueName, 'master.retry', $routingKey);$this->channel->queue_bind($failedQueueName, 'master.failed', $routingKey);

在RabbitMQ的管理界面中,我们可以看到该队列与Exchange和routing-key的绑定关系

-w361

-w405

-w399

新闻消耗实现

使用 basic_consume 对新闻进行消耗的时候,必要细致下面参数

参数 值 说明 queue – 消耗的队列名称 consumer-tag – 消耗者标识,留空即可 no_local false 假如设置了该字段,服务器将不会发布新闻到 发布它的客户端 no_ack false 必要消耗确认应答 exclusive false 排他访问,设置后只许可当前消耗者访问该队列 nowait false 该方法必要应答确认

消耗端在消耗新闻时,必要从新闻中获取新闻被消耗的次数,以此判断该新闻处理失败时重试照旧发送到失败队列。

Java代码

1234567891011121314151617181920212223242526272829 protected Long getRetryCount(AMQP.BasicProperties properties) {    Long retryCount = 0L;    try {        Map headers = properties.getHeaders();        if (headers != null) {            if (headers.containsKey("x-death")) {                List   <map>> deaths = (List   <map>>) headers.get("x-death");                if (deaths.size() > 0) {                    Map death = deaths.get(0);                    retryCount = (Long) death.get("count");                }            }        }    } catch (Exception e) {}     return retryCount;}</map>?  </map>

 

 

 

PHP代码

123456789101112 protected function getRetryCount(AMQPMessage $msg): int{    $retry = 0;    if ($msg->has('application_headers')) {        $headers = $msg->get('application_headers')->getNativeData();        if (isset($headers['x-death'][0]['count'])) {            $retry = $headers['x-death'][0]['count'];        }    }     return (int)$retry;}

新闻消耗完成后,必要发送消耗确认新闻给服务端,使用basic_ack方法

1 ack(delivery-tag=新闻的delivery-tag标识)

Java代码

12345678910111213141516 // 新闻消耗处理Consumer consumer = new DefaultConsumer(channel) {    @ Override    public void handleDelivery(String consumerTag, Envelope envelope,                               AMQP.BasicProperties properties, byte[] body) throws IOException {        ...        // 细致,因为使用了basicConsume的autoAck特征,因此这里就不必要手动实行        // channel.basicAck(envelope.getDeliveryTag(), false);    }};// 实行新闻消耗处理channel.basicConsume(    queueName,     true, // autoAck    consumer);

PHP代码

123456789101112 $this->channel->basic_consume(    $queueName,    '',    // customer_tag    false, // no_local    false, // no_ack    false, // exclusive    false, // nowait    function (AMQPMessage $msg) use ($queueName, $routingKey, $callback) {        ...        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);    });

假如新闻处理中出现非常,应该将该新闻重新投递到重试Exchange,等待下次重试

12 basic_publish(msg, 'master.retry', routing-key)ack(delivery-tag) // 不要忘掉了应答消耗成功新闻

假如判断重试次数大于3次,仍然处理失败,则应该讲新闻投递到失败Exchange,等待人工处理

12 basic_publish(msg, 'master.failed', routing-key)ack(delivery-tag) // 不要忘掉了应答消耗成功新闻

肯定不要忘掉ack新闻,由于重试、失败都是通过将新闻重新投递到重试、失败Exchange来实现的,假如忘掉ack,则该新闻在超时或者连接断开后,会重新被重新投递给消耗者,假如消耗者仍旧无法处理,则会造成死循环。

Java代码

123456789101112131415 try {    String message = new String(body, "UTF-8");    // 新闻处理函数    handler.handle(message, envelope.getRoutingKey()); } catch (Exception e) {    long retryCount = getRetryCount(properties);    if (retryCount > 3) {        // 重试次数大于3次,则主动加入到失败队列        channel.basicPublish("master.failed", envelope.getRoutingKey(), MessageProperties.PERSISTENT_BASIC, body);    } else {        // 重试次数小于3冷凝水蒸发器,则加入到重试队列,30s后再重试        channel.basicPublish("master.retry", envelope.getRoutingKey(), properties, body);    }}

失败义务重试

假如义务重试三次仍未成功,则会被投递到失败队列,这时候必要人工处理程序非常,处理完毕后,必要将新闻重新投递到队列进行处理,这里唯一必要做的就是从失败队列订阅新闻百度网站优化,然后获取到新闻后,清空其application_headers头信息,然后重新投递到master这个Exchange即可。

Java代码

123456 channel.basicPublish(    'master',     envelope.getRoutingKey(),    MessageProperties.PERSISTENT_BASIC,    body);

PHP代码

123456 $msg->set('application_headers', new AMQPTable([]));$this->channel->basic_publish(    $msg,    'master',    $msg->get('routing_key'));

怎么使用

队列和Exchange以及发布订阅的关系我们就说完了,那么使用起来是什么结果呢?这里我们以Java代码为例

123456789101112 // 发布新闻Publisher publisher = new Publisher(factory.newConnection(), 'master');publisher.publish("{\"id\":121, \"name\":\"guanyiyao\"}", "user.create"); // 订阅新闻new Subscriber(factory.newConnection(), Main.EXCHANGE_NAME)    .init("user-monitor", "user.*")    .subscribe((message, routingKey) -> {        // TODO 营业逻辑        System.out.printf("     message consumed: %s\n", routingKey, message);    });

总结

使用RabbitMQ时,实现延时重试和失败队列的体例并不仅仅局限于本文中描述的方法,假如读者有更好的实现方案,迎接拍砖,在这里我也只是抛砖引玉了。本文中讲述的方法还有许多优化空间,读者也可以试着去改进其实现方案,比如本文中使用了三个Exchagne,是否只使用一个Exchange也能实现本文中所讲述的功能。

打赏支撑我写出更多好文章,谢谢!

打赏作者

打赏支撑我写出更多好文章,谢谢!

任选一种付出体例