大家好,欢迎来到IT知识分享网。
RocketMQ 死信队列
死信队列
死信队列是什么?
死信队列指的是种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
在RocketMQ中消息重试超过一定次数后(默认16次)就会被放到死信队列中,在消息队列。
可以在控制台Topic列表中看到“DLQ”相关的Topic,默认命名是:
%RETRY%消费组名称(重试Topic)
%DLQ%消费组名称(死信Topic)
死信队列也可以被订阅和消费,并且也会过期RocketMQ 中
其中包括重试之后也无法消费的消息也会
死信队列应用场景
如我们平时下单后未在指定时间内付款,过来这个时间,我们的订单会被放入死信队列中。当我们再去付款时候,会发现订单已经被取消,此时我们只需要去死信队列中查该订单是否存在。
如当一些消息出现异常迟迟未被消费(或者最大重试次数后也未成功消费),这时候就会将消息存放到死信队列中。
示例
这里我们定义生产者
// 实例化生产者,并指定生产组名称
DefaultMQProducer producer = new DefaultMQProducer("myproducer_group_topic_name_dle_01");
//设置实例名称,一个jvm中有多个生产者可以根据实例名区分
//默认default
producer.setInstanceName("topic_name_dle");
// 指定nameserver的地址
producer.setNamesrvAddr("localhost:9876");
//设置同步重试次数
producer.setRetryTimesWhenSendFailed(2);
//设置异步发送次数
//producer.setRetryTimesWhenSendAsyncFailed(2);
// 初始化生产者
producer.start();
for (int i = 0; i <10 ; i++) {
Message message = new Message("topic_name_dle", ("key=" + i).getBytes("utf-8"));
// 1 同步发送 如果发送失败会根据重试次数重试
SendResult send = producer.send(message);
SendStatus sendStatus = send.getSendStatus();
System.out.println(sendStatus.toString());
}
消费者
这里默认返回消息消费失败,指定消费者重试一次。
/** * 推消息消费 */
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("consumer_group_topic_name_dle_01");
// 指定nameserver的地址
defaultMQPushConsumer.setNamesrvAddr("localhost:9876");
//
defaultMQPushConsumer.subscribe("topic_name_dle", "*");
/** * 推送消息 提高消费处理能力 * 1 提高消费并行度 * 2 以批量方式进行 消费 * 3 检测延时情况,跳过非重要消息 */
//消费限流 只针对推送来设置,拉取消息自己控制
// 1 提高消费并行度
defaultMQPushConsumer.setConsumeThreadMax(10);
defaultMQPushConsumer.setConsumeThreadMin(1);
// 2 以批量方式进行 消费
// 设置消息批处理的一个批次中消息的最大个数
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10);
//设置重试次数 默认16次
defaultMQPushConsumer.setMaxReconsumeTimes(1);
// 添加消息监听器,一旦有消息推送过来,就进行消费
defaultMQPushConsumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//final MessageQueue messageQueue = context.getMessageQueue();
for (MessageExt msg : msgs) {
System.out.println(msg);
try {
System.out.println(new String(msg.getBody(), "utf-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
// 消息消费成功
//return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//null 也表示推送失败,会进行重试
return null;
// 消息消费失败
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
启动消费者和生产者之后,消费者可以看到,消息重发了一次,这里图没截全。
RocketMq 可视化工具:rocketmq-console
下载地址:
https://github.com/apache/rocketmq-externals/archive/rocketmq-console-1.0.0.zip
下载成功带入idea 将配置文件改成自己的地址,然后启动
可以从控制台中看到,没有被正常消费的消息被发送到死信队列中
这里与RocketMQ不同的是RabbitMQ需要自己定义一个队列与交换机绑定,没有被成功消费会将消息发送到自己创建的死信队列中去,而RocketMQ不需要我们自己去指定死信队列,会自己根据重试次数以及消息是否消费成功,将消息发送到死信队列(不需要我们去创建)。
死信队列
重试的队列以
可以看到队列名前会%RETRY%前缀 表示是重试队列
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/24317.html