Java,ActiveMQ,消息持久、事务,投递方式,确认机制和死信队列[通俗易懂]

Java,ActiveMQ,消息持久、事务,投递方式,确认机制和死信队列[通俗易懂]定义:消息中间件是基于队列与消息传递技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支撑性软件系统。

大家好,欢迎来到IT知识分享网。

消息中间件

定义:消息中间件是基于队列与消息传递技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支撑性软件系统。

Java,ActiveMQ,消息持久、事务,投递方式,确认机制和死信队列[通俗易懂]

应用场景:

1、异步处理,串行方式转并行方式。

2、应用解耦,通常处理方式为:操作1==>操作2,解耦处理方式为:操作1(立刻返回)==>消息==>操作2。

3、日志处理,日志采集端==>消息。

下图,kafka日志处理应用案例:

Java,ActiveMQ,消息持久、事务,投递方式,确认机制和死信队列[通俗易懂]

4、流量削峰流量过大放入消息队列中,排队处理,达到削峰的目的。

5、消息通讯,订阅发布消息、点对点消息、聊天室。

消息中间件产品:

1、ActiveMQ,Java开发、吞吐量:万级、时效性:毫秒级;

2、RabbitMQ,Erlang开发、吞吐量:万级、时效性:微秒级;

3、Kafka,Scala开发、吞吐量:10万级、时效性:毫秒级;

4、RocketMQ,Java开发、吞吐量:10万级、时效性:毫秒级;

Java,ActiveMQ,消息持久、事务,投递方式,确认机制和死信队列[通俗易懂]

JMS消息服务

P2P模式,消息队列(Queue),发送者(Sender),接收者(Receiver):

Java,ActiveMQ,消息持久、事务,投递方式,确认机制和死信队列[通俗易懂]

Pub/Sub模式,主题(Topic)、发布者(Publisher)、订阅者(Subscriber):

Java,ActiveMQ,消息持久、事务,投递方式,确认机制和死信队列[通俗易懂]

编程模型:

Java,ActiveMQ,消息持久、事务,投递方式,确认机制和死信队列[通俗易懂]

ActiveMQ消息中间件

ActiveMQ通信协议

安装:部署ActiveMQ、内置Broker

Java,ActiveMQ,消息持久、事务,投递方式,确认机制和死信队列[通俗易懂]

参考地址:https://activemq.apache.org/configuring-transports.html

具体配置:

<sslContext>
    <sslContext keyStore="file:${activemq.base}/conf/broker1.ks"
                keyStorePassword="qwerty"
                trustStore="file:${activemq.base}/conf/broker1.ts"
                trustStorePassword="asdfgh"
    />
</sslContext>

<transportConnectors>
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="ssl" uri="ssl://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

IT知识分享网

ActiveMQ的消息

Java,ActiveMQ,消息持久、事务,投递方式,确认机制和死信队列[通俗易懂]

持久化消息(3种消息存储)

持久化是保证消息不丢失的重要手段,三种消息存储方式:

1、Memory 消息存储-基于内存的消息存储;

2、基于日志消息存储方式,KahaDB是ActiveMQ的默认日志存储方式,它提供了容量的提升和恢复能力;

3、基于JDBC的消息存储方式-数据存储于数据库(如:MySQL)中。

持久化消息模型:

Java,ActiveMQ,消息持久、事务,投递方式,确认机制和死信队列[通俗易懂]

Java,ActiveMQ,消息持久、事务,投递方式,确认机制和死信队列[通俗易懂]

代码设置,JMS生产者(发送方):

IT知识分享网// 设置传递模式为持久化模式
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

代码设置,JMS消费者(接收方):

Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("client-topic-01");

持久化方式发送案例:发布订阅模式、队列/点对点模式

消息事务

消息事务,是保证消息传递原子性的一个重要特征,和JDBC的事务特征类似。一个事务性发送,其中一组消息要么能够全部保证到达服务器,要么都不到达服务器。

生产者、消费者与消息服务器直接都支持事务性;ActionMQ的事务主要偏向在生产者的应用。

ActionMQ 消息事务流程图:

Java,ActiveMQ,消息持久、事务,投递方式,确认机制和死信队列[通俗易懂]

代码设置,JMS消费者(发送方):

IT知识分享网// 参数一:是否开启消息事务,true为开启
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

// 一旦开启事务发送,那么就必须使用commit方法进行事务提交,否则消息无法到达MQ服务器
session.commit();

// 消息事务回滚
session.rollback();

代码设置,JMS消费者(接收方):

// 消息消费提交事务
session.commit();

// 消息消费事务回滚
session.rollback();

消息投递方式

同步投递(同步发送):

消息生产者使用持久(Persistent)传递模式发送消息的时候,Producer.send() 方法会被阻塞,直到broker发送一个确认消息给生产者(ProducerAck),这个确认消息说明broker已经成功接收到消息并把消息保存到二级存储中。

异步投递(异步发送):

应用程序能够容忍一些消息的丢失,就可以使用异步发送。异步发送不会在受到broker的确认之前一直阻塞Producer.send()方法。

使用异步,在brokerURL中增加:jms.alwaysSyncSend=false&jms.useAsyncSend=true属性:

1)如果设置了alwaysSyncSend=true系统将会忽略useAsyncSend设置的值都采用同步;

2)当alwaysSyncSend=false时,“NON_PERSISTENT”(非持久化)、事务中的消息将使用“异步发送”;

3)当alwaysSyncSend=false时,如果指定了useAsyncSend=true,“PERSISTENT”类型的消息使用异步发送。

4)当alwaysSyncSend=false时,如果指定了useAsyncSend=false,“PERSISTENT”类型的消息使用同步发送。

5)默认情况(alwaysSyncSend=false,useAsyncSend=false),非持久化消息、事务内的消息均采用异步发送,对于持久化消息采用同步发送。

延迟投递:

生产者提供两个发送消息的方法,一个是即时发送消息,一个是延时发送消息。

支持延迟投递,配置(修改activemq.xml):

<broker xmlns="http://activemq.apache.org/schema/core" ...
         schedulerSupport="true" >
</broker>

添加 schedulerSupport=”true”配置。

代码中设置延迟时长:

// 设置延时时长(延时10秒)
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10000);

定时投递

SpringBoot下开启:

// 开启定时功能
@EnableScheduling 

消息生产者:

//每隔3秒定投
@Scheduled(fixedDelay = 3000)

消息确认机制

JMS消息只有在被确认之后,才认为已经被成功地消费了。

消息的成功消费通常包含三个阶段:客户接收消息客户处理消息消息被确认。在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。

应答模式(acknowledgement mode)的参数,有以下三个可选值:

1、Session.AUTO_ACKNOWLEDGE,当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。

2、Session.CLIENT_ACKNOWLEDGE,客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。

3、Session.DUPS_ACKNOWLEDGE,该选择只是会话迟钝确认消息的提交。如果JMS provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS provider必须把消息头的JMSRedelivered字段设置为true。

应答模式:自动应答模式案例、手动应答模式案例

死信队列

DLQ-Dead Letter Queue,死信队列,用来保存处理失败或者过期的消息。

出现以下情况时,消息会被重发:

A transacted session is used and rollback() is called.

A transacted session is closed before commit is called.

A session is using CLIENT_ACKNOWLEDGE and Session.recover() is called.

当一个消息被重发超过6(缺省为6次)次数时,会给broker发送一个”Poison ack”,这个消息被认为是apoison pill,这时broker会将这个消息发送到死信队列,以便后续处理。

需要注意的点:

1)缺省持久消息过期,会被送到DLQ,非持久消息不会送到DLQ。

2)缺省的死信队列是:ActiveMQ.DLQ,如果没有特别指定,死信都会被发送到这个队列。

3)可以通过配置文件(activemq.xml)来调整死信发送策略。

死信队列配置:

<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry topic=">">
                <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                </pendingMessageLimitStrategy>
            </policyEntry>
        </policyEntries>
    </policyMap>
</destinationPolicy>

ActiveMQ常遇的经典问题

ActiveMQ宕机如何处理:

ActiveMQ的部署方式采用主从集群方案:Zookeeper集群 + ActiveMQ集群。

防止消费方消息重复消费:

核心问题:消费方存在幂等性问题,解决即可。

问题:如果因为网络延迟等原因,MQ无法及时接收到消费方的应答,导致MQ重试,在重试过程中造成重复消费的问题。

1)如果消费方是做数据库操作,可以把消息的ID作为表的唯一主键,这样在重试的情况下,会触发主键冲突,从而避免数据出现脏数据。

2)如果消费方不是做数据库操作,可以借助第三方的应用,例如:Redis,来记录消费记录。每次消息被消费完成时候,把当前消息的ID作为key存入Redis,每次消费前,先到Redis查询有没有该消息的消费记录。

防止消息丢失:

1)在消息生产者方和消费者方使用事务;

2)在消费者方采用手动消息确认(ACK);

3)消息持久化,使用日志或者JDBC方式;

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/7077.html

(0)
上一篇 2023-01-03 09:53
下一篇 2023-01-03 09:53

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信