RocketMQ 死信队列[亲测有效]

RocketMQ 死信队列[亲测有效]RocketMQ死信队列死信队列死信队列是什么?死信队列指的是种正常情况下无法被消费的消息称为死信消息(Dead-LetterMessage),存储死信消息的特殊队列称为死信队列(Dead-LetterQueue)。在RocketMQ中消息重试超过一定次数后(默认16次)就会被放到死信队列中,在消息队列。可以在控制台Topic列表中看到“DLQ”相关的Topic,默认命名是:%RETRY%消费组名称(重试Topic)%DLQ%消费组名称(死信Topic)死信队列也可以被订阅和消费,并且

大家好,欢迎来到IT知识分享网。

RocketMQ 死信队列

死信队列

死信队列是什么?

死信队列指的是种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

在RocketMQ中消息重试超过一定次数后(默认16次)就会被放到死信队列中,在消息队列。

可以在控制台Topic列表中看到“DLQ”相关的Topic,默认命名是:

%RETRY%消费组名称(重试Topic)

%DLQ%消费组名称(死信Topic)

死信队列也可以被订阅和消费,并且也会过期RocketMQ 中

其中包括重试之后也无法消费的消息也会

死信队列应用场景

如我们平时下单后未在指定时间内付款,过来这个时间,我们的订单会被放入死信队列中。当我们再去付款时候,会发现订单已经被取消,此时我们只需要去死信队列中查该订单是否存在。

如当一些消息出现异常迟迟未被消费(或者最大重试次数后也未成功消费),这时候就会将消息存放到死信队列中。

示例

这里我们定义生产者

     // 实例化生产者,并指定生产组名称
        DefaultMQProducer producer = new DefaultMQProducer("myproducer_group_topic_name_dle_01");


        //设置实例名称,一个jvm中有多个生产者可以根据实例名区分
        //默认default
        producer.setInstanceName("topic_name_dle");

        // 指定nameserver的地址
        producer.setNamesrvAddr("localhost:9876");

        //设置同步重试次数
        producer.setRetryTimesWhenSendFailed(2);
        //设置异步发送次数
        //producer.setRetryTimesWhenSendAsyncFailed(2);


        // 初始化生产者
        producer.start();

        for (int i = 0; i <10 ; i++) { 
   
            Message message = new Message("topic_name_dle", ("key=" + i).getBytes("utf-8"));
            // 1 同步发送 如果发送失败会根据重试次数重试
            SendResult send = producer.send(message);
            SendStatus sendStatus = send.getSendStatus();
            System.out.println(sendStatus.toString());

        }

消费者

这里默认返回消息消费失败,指定消费者重试一次。

  /** * 推消息消费 */
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("consumer_group_topic_name_dle_01");
        // 指定nameserver的地址
        defaultMQPushConsumer.setNamesrvAddr("localhost:9876");

        //
        defaultMQPushConsumer.subscribe("topic_name_dle", "*");
        /** * 推送消息 提高消费处理能力 * 1 提高消费并行度 * 2 以批量方式进行 消费 * 3 检测延时情况,跳过非重要消息 */
        //消费限流 只针对推送来设置,拉取消息自己控制
        // 1 提高消费并行度
        defaultMQPushConsumer.setConsumeThreadMax(10);
        defaultMQPushConsumer.setConsumeThreadMin(1);

        // 2 以批量方式进行 消费
        // 设置消息批处理的一个批次中消息的最大个数
        defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10);

        //设置重试次数 默认16次
        defaultMQPushConsumer.setMaxReconsumeTimes(1);

        // 添加消息监听器,一旦有消息推送过来,就进行消费
        defaultMQPushConsumer.setMessageListener(new MessageListenerConcurrently() { 
   
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { 
   
                //final MessageQueue messageQueue = context.getMessageQueue();
                for (MessageExt msg : msgs) { 
   
                    System.out.println(msg);
                    try { 
   
                        System.out.println(new String(msg.getBody(), "utf-8"));
                    } catch (UnsupportedEncodingException e) { 
   
                        e.printStackTrace();
                    }
                }

                // 消息消费成功
                //return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                //null 也表示推送失败,会进行重试
                return null;
                // 消息消费失败
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });

启动消费者和生产者之后,消费者可以看到,消息重发了一次,这里图没截全。
在这里插入图片描述

RocketMq 可视化工具:rocketmq-console下载地址:

https://github.com/apache/rocketmq-externals/archive/rocketmq-console-1.0.0.zip

下载成功带入idea 将配置文件改成自己的地址,然后启动

在这里插入图片描述

可以从控制台中看到,没有被正常消费的消息被发送到死信队列中

这里与RocketMQ不同的是RabbitMQ需要自己定义一个队列与交换机绑定,没有被成功消费会将消息发送到自己创建的死信队列中去,而RocketMQ不需要我们自己去指定死信队列,会自己根据重试次数以及消息是否消费成功,将消息发送到死信队列(不需要我们去创建)。

死信队列
在这里插入图片描述
重试的队列以
可以看到队列名前会%RETRY%前缀 表示是重试队列
在这里插入图片描述

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/24317.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信