大家好,欢迎来到IT知识分享网。
- 消息重发并不是字面上的意思:生产者重新发送消息,而是针对消费者,当消费者在处理消息出现异常时,消费者会将该消息重新放入到队列中进行下次处理。当超过重试次数之时,消息会放入一个特殊的队列中ActiveMQ.DLQ
全称为:Dead Letter Queue。
消息重发,是对消费者而言的,也就是重新消费,重新投递。在activeMq中叫ReDelivery(重新投递)。
应用场景:
- 在实际生产场景过程中,当消费者消费消息时,可能由于种种原因,导致消费者消费消息失败。例如:在一个通知系统中,生产者将通知Message
放入队列中,而消费者从队列中将消息读取出来之后,除了进行自身业务处理,还需要调用第三方服务发送短信通知用户,但在发送短信服务的时候,由于网络超时等原有导致消费失败。在这种异常情况下,希望可以建立一种机制。当未发送成功的消息,能够重新发送。处理超过一定次数后还处理不成功的,放弃处理该消息,记录下来。继续对别的消息进行处理。本文的实现可能对于上两遍的实现有所不同。
MQ 入门(一)——MQ、JMS的了解与 activemq 基本操作
MQ 入门(二)——activeMQ 与spring 整合
闲话少说,上代码:主体配置文件
<!-- 定义ReDelivery(重发机制)机制 ,重发时间间隔是100毫秒,最大重发次数是3次 --> <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> <!--是否在每次尝试重新发送失败后,增长这个等待时间 --> <property name="useExponentialBackOff" value="true"></property> <!--重发次数,默认为6次 这里设置为1次 --> <property name="maximumRedeliveries" value="3"></property> <!--重发时间间隔,默认为1秒 --> <property name="initialRedeliveryDelay" value="1000"></property> <!--第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value --> <property name="backOffMultiplier" value="2"></property> <!--最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第 二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 --> <property name="maximumRedeliveryDelay" value="1000"></property> </bean> <!-- ActiveMQ 连接工厂 --> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码 <amq:connectionFactory id="amqConnectionFactory" brokerURL="${activemq.url}" trustAllPackages="true" userName="${activemq.username}" password="${activemq.password}" redeliveryPolicy=""/>--> <!--创建连接工厂 --> <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${activemq.url}"></property> <property name="userName" value="${activemq.username}"></property> <property name="password" value="${activemq.password}"></property> <!-- 此处可以配置,也可以不配置重发机制,当不配置重发机制时,会使用默认的重发机制,重发次数6次 --> <!-- <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" /> --> <!-- 引用重发机制 --> </bean> <!-- ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory 可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗,要依赖于 activemq-pool包 <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="amqConnectionFactory"/> </bean>--> <!-- Spring Caching连接工厂 --> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="${activemq.sessioncachesize}"></property> </bean> <!-- Spring JmsTemplate 的消息生产者 start--> <!-- 定义JmsTemplate的Queue类型 --> <bean id="queueTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <constructor-arg ref="connectionFactory" /> <!-- 非pub/sub模型(发布/订阅),即队列模式 --> <property name="pubSubDomain" value="false" /> </bean> <!-- 声明ActiveMQ消息目标,目标可以是一个队列,也可以是一个主题ActiveMQTopic --> <bean id="destinationOne" class="org.apache.activemq.command.ActiveMueue"> <constructor-arg index="0" value="test.queue"></constructor-arg> </bean> <!-- 消息监听容器 消息接收监听器用于异步接收消息 --> <bean id="jmsContainerOne" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="destinationOne" /> <property name="messageListener" ref="receiverOne" /> <property name="sessionAcknowledgeMode" value="4"></property> </bean>
生产者
@Component public class SenderOne { @Autowired private JmsTemplate queueTemplate; //测试的 public void sendInfo(final String messageRecord) { queueTemplate.send("test.queue",new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage message = session.createTextMessage(); message.setText(messageRecord); return message; } }); } }
消费者:
该消费者与之前的消费者实现方式会有所不同,主要不同之处在于实现的消息监听器会有所不一样。之前我们的消费者,消息监听器为:MessageListener。而此处使用的是:SessionAwareMessageListener ,因为在处理消息的过程中我们需要使用到session。而Message不能满足现有的情况
注意:在使用重发机制时,如果没有在异常处理时,添加该代码session.recover();则该消息不能进行死信队列。
@Component public class ReceiverOne implements SessionAwareMessageListener { //测试方法 public void onMessage(Message message, Session session) throws JMSException { System.out.println("--------------------准备开始接受消息------------------------"); TextMessage textMsg = (TextMessage) message; try { if ("重发机制".equals(textMsg.getText())) { System.out.println("----------------"); throw new RuntimeException("故意抛出的异常"); } System.out.println(textMsg.getText()); System.out.println("--------------------接受消息结束------------------------"); } catch (Exception e) { //此不可省略 重发信息使用,当该代码注释掉之后,该队列消息不能移出队列 session.recover(); System.out.println("异常"); } } }
测试类:
@RunWith(SpringJUnit4ClassRunner.class) //使用junit4进行测试 @ContextConfiguration({ "/spring/web-servlet.xml" }) public class SenderOneTest { @Autowired private SenderOne senderOne; @Test public void test() { senderOne.sendInfo("我是队列消息002"); } }
正常运行情况如下:
当我们发送重发机制消息时,就会进入启动消息重发机制,如下图所示:
可以在ActiveMq的管理界面查看死信队列的情况
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/157836.html