大家好,欢迎来到IT知识分享网。
ActiveMQ知识概括
ActiveMQ简介
ActiveMQ安装:
- 安装步骤:
①去ActiveMQ官网下载压缩包。
②解压压缩包到指定目录。
③启动ActiveMQ:service activemq start
④查看activemq状态:service activemq status
⑤关闭activemq服务:service activemq stop - 启动时指定日志输出文件:
①activemq日志默认的位置是在:%activemq安装目录%/data/activemq.log
②这是我们启动时指定日志输出文件:service activemq start > /usr/local/raohao/activemq.log - 查看程序启动是否成功的3种方式(通用):
①ps -ef | grep activemq
②netstat -anp | grep 61616
③lsof -i: 61616
ActiveMQ控制台:
- 访问activemq管理页面地址:http://IP地址:8161/。
默认的用户名和密码是admin/admin。
- 备注:
①ActiveMQ采用61616端口提供JMS服务。
②ActiveMQ采用8161端口提供管理控制台服务。 默认程序连接activemq(JMS服务)是不需要密码的,为了安装起见,一般都会设置密码,提高安全性。
- ActiveMQ控制台之队列:
①Number Of Pending Messages:等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。
②Number Of Consumers:消费者数量,消费者端的消费者数量。
③Messages Enqueued:进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。
④Messages Dequeued:出队消息数,可以理解为是消费者消费掉的数量。
- ActiveMQ控制台之主题:
- ActiveMQ控制台之订阅者:
Java实现ActiveMQ
pom.xml导入依赖:
<!-- activemq 所需要的jar 包-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<!-- activemq 和 spring 整合的基础包 -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
IT知识分享网
JMS编码总体规范:
- 架构:
- JMS开发的基本步骤:
①创建一个connection factory
②通过connection factory来创建JMS connection
③启动JMS connection
④通过connection创建JMS session
⑤创建JMS destination
⑥创建JMS producer或者创建JMS message并设置destination
⑦创建JMS consumer或者是注册一个JMS message listener
⑧发送或者接受JMS message(s)
⑨关闭所有的JMS资源(connection, session, producer, consumer等)
Destination简介:
- Destination是目的地。下面拿jvm和mq,做个对比。目的地,我们可以理解为是数据存储的地方。
- Destination分为两种:队列和主题。
①在点对点的消息传递域中,目的地被称为队列(queue)
②在发布订阅消息传递域中,目的地被称为主题(topic)
③下图介绍:
队列消息(Queue)总结:
- 两种消费方式:
①同步阻塞方式(receive):订阅者或接收者抵用MessageConsumer的receive()方法来接收消息,receive方法在能接收到消息之前(或超时之前)将一直阻塞。
②异步非阻塞方式(监听器onMessage()):订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统会自动调用监听器MessageListener的onMessage(Message message)方法。 - 队列的特点:
①每个消息只能有一个消费者,类似1对1的关系。好比个人快递自己领取自己的。
②消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,消费者都可以提取消息。好比我们的发送短信,发送者发送后不见得接收者会即收即看。
③消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息。 - 消息消费情况:
①情况1:只启动消费者1。结果:消费者1会消费所有的数据。
②情况2:先启动消费者1,再启动消费者2。结果:消费者1消费所有的数据。消费者2不会消费到消息。
③情况3:生产者发布6条消息,在此之前已经启动了消费者1和消费者2。结果:消费者1和消费者2平摊了消息。各自消费3条消息。
④疑问:怎么去将消费者1和消费者2不平均分摊呢?而是按照各自的消费能力去消费。我觉得,现在activemq就是这样的机制。
主题消息(Topic)介绍:
- 在发布订阅消息传递域中,目的地被称为主题(topic)
- 发布/订阅消息传递域的特点如下:
①生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系;
②生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息。
③生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。
④默认情况下如上所述,但是JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,好比我们的微信公众号订阅
tpoic和queue对比:
比较项目 | Topic模式队列 | Queue模式队列 |
---|---|---|
工作模式. | “订阅-发布”模式,如果当前没有订阅者,消息将会被丢弃。如果有多个订阅者,那么这些订阅者都会收到消息 | “负载均衡”模式,如果当前没有消费者,消息也不会云弃;如果有多个消费者,那么—条消息也只会发送始其中一个消费者,并且要求消费者ack信息 |
有无状态 | 无状态 | Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ—般保存在SAMQ_HOME\datakr-storeldata下面。也可以配置成DB存储。 |
传递完整性 | 如果没有订阅者,消息会被丢弃 | 消息不会云弃 |
处理效率 | 由于消息要按照订阅者的数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异 | 由于—条消息只发送给—个消费者,所以就算消费者再多,性能也不会有明显降低。当然不同消息协议的具体性能也是有差异的 |
JMS规范与落地
JMS是什么:
- JMS是Java消息服务
- Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。
消息头:
- JMS的消息头有哪些属性:
①JMSDestination:消息目的地
②JMSDeliveryMode:消息持久化模式
③JMSExpiration:消息过期时间
④JMSPriority:消息的优先级
⑤JMSMessageID:消息的唯一标识符。后面我们会介绍如何解决幂等性。 - 说明: 消息的生产者可以set这些属性,消息的消费者可以get这些属性。这些属性在send方法里面也可以设置。
消息体:
- 封装具体的消息数据
- 5种消息体格式:
①TextMessage——普通字符串消息,包含一个string
②MapMessage——一个Map类型的消息,key为string类型,而值为Java的基本类型
③BytesMessage——二进制数组消息,包含一个byte[]
④StreamMessage——Java数据流消息,用标准流操作来顺序的填充和读取。
⑤ObjectMessage——对象消息,包含一个可序列化的Java对象 - 发送和接受的消息体类型必须一致对应
消息属性:
- 如果需要除消息头字段之外的值,那么可以使用消息属性。他是识别/去重/重点标注等操作,非常有用的方法。
- 他们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。消息的属性就像可以分配给一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加信息。它们还用于暴露消息选择器在消息过滤时使用的数据。
- 下图是设置消息属性的API:set对应类型Property(String name,对应类型 value)
JMS的可靠性:
- PERSISTENT:持久性
- Transaction:事务
- Acknowledge:签收
消息的持久化:
- 什么是持久化消息?
①保证消息只被传送一次和成功使用一次。在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。
②我的理解:在消息生产者将消息成功发送给MQ消息中间件之后。无论是出现任何问题,如:MQ服务器宕机、消费者掉线等。都保证(topic要之前注册过,queue不用)消息消费者,能够成功消费消息。如果消息生产者发送消息就失败了,那么消费者也不会消费到该消息。 - 参数设置说明:
①非持久:非持久化:当服务器宕机,消息不存在。
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
②持久:持久化:当服务器宕机,消息依然存在。
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT)
③Queue默认是持久。
- 持久的Queue:持久化消息这是队列的默认传递模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。
- 持久的Topic:一定要先运行一次消费者,类似于像MQ注册,我订阅了这个主题。然后再运行主题生产者,无论消费着是否在线,都会接收到,在线的立即接收到,不在线的等下次上线把没接收到的接收。
类似微信公众号订阅发布。
消息事务:
- producer提交时的事务:
①false:只要执行send,就进入到队列中,关闭事务,那第2个签收参数的设置需要有效。
②true:先执行send再执行commit,消息才被真正提交到队列中,消息需要需要批量提交,需要缓冲处理。 - consumer消费时的事务:
①false:activeMQ默认认为你执行了commit,消费了消息。
②true:只有执行了commit,activeMQ才认为你消费了消息,控制台的消费数才会上升。不执行commit的话,会重复消费消息! 事务偏生产者/签收偏消费者!
消息签收:
- 非事务:
①自动签收(Session.AUTO_ACKNOWLEDGE):该方式是默认的。该种方式,无需我们程序做任何操作,框架会帮我们自动签收收到的消息。
②手动签收(Session.CLIENT_ACKNOWLEDGE):手动签收。该种方式,需要我们手动调用Message.acknowledge(),来签收消息。如果不签收消息,该消息会被我们反复消费,只到被签收。
③允许重复消息(Session.DUPS_OK_ACKNOWLEDGE):多线程或多个消费者同时消费到一个消息,因为线程不安全,可能会重复消费。该种方式很少使用到。
④事务下的签收(Session.SESSION_TRANSACTED):开始事务的情况下,可以使用该方式。该种方式很少使用到。 - 事务:
①由于消费者开启了事务,没有提交事务(就算手动签收也没用),服务器认为,消费者没有收到消息。
②生产事务开启,只有commit后才能将全部消息变为已消费。 - 签收和事务的关系:
①在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚,则消息会被再次传送。事务优先于签收,开始事务后,签收机制不再起任何作用。
②非事务性会话中,消息何时被确认取决于创建会话时的应答模式。
③消费者事务开启,只有commit后才能将全部消息变为已消费。
④事务偏向生产者,签收偏向消费者。也就是说生产者使用事务更好点,消费者使用签收机制更好点。
JMS的点对点总结:
- 点对点模型是基于队列的,生产者发送消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。和我们平时给朋友发送短信类似。
①如果在Session关闭时有部分消息被收到但还没有被签收(acknowledge),那当消费者下次连接到相同的队列时,这些消息还会被再次接收。
②队列可以长久的保存消息直到消费者收到消息。消费者不需要因为担心消息会丢失而时刻和队列保持激活的链接状态,充分体现了异步传输模式的优势
JMS的发布订阅总结:
- 非持久订阅:
①非持久订阅只有当客户端处于激活状态,也就是和MQ保持连接状态才能收发到某个主题的消息。如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。一句话:先订阅注册才能接受到发布,只给订阅者发布消息。 - 持久订阅:
①客户端首先向MQ注册一个自己的身份ID识别号,当这个客户端处于离线时,生产者会为这个ID保存所有发送到主题的消息,当客户再次连接到MQ的时候,会根据消费者的ID得到所有当自己处于离线时发送到主题的消息当持久订阅状态下,不能恢复或重新派送一个未签收的消息。持久订阅才能恢复或重新派送一个未签收的消息。 - 用哪个?
①当所有的消息必须被接收,则用持久订阅。当消息丢失能够被容忍,则用非持久订阅。
ActiveMQ的broker
简介:
- 相当于一个ActiveMQ服务器实例说白了,Broker其实就是实现了用代码的形式启动ActiveMQ将MQ嵌入到Java代码中,以便随时用随时启动,在用的时候再去启动这样能节省了资源,也保证了可用性。
嵌入式Broker:
- POM.XML:
IT知识分享网<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.11</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.15</version>
</dependency>
- 主启动类:
import org.apache.activemq.broker.BrokerService;
public class EmbedBroker {
public static void main(String[] args) throws Exception {
//ActiveMQ也支持在vm中通信基于嵌入的broker
BrokerService brokerService = new BrokerService();
brokerService.setPopulateJMSXUserID(true);
brokerService.addConnector("tcp://127.0.0.1:61616");
brokerService.start();
}
}
- 和Linux上的ActiveMQ是一样的,Broker相当于一个Mini版本的ActiveMQ
Spring,SpringBoot整合ActiveMQ
Spring整合ActiveMQ:
- Maven修改,需要添加Spring支持JMS的包:
IT知识分享网 <!-- activemq核心依赖包 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.10.0</version>
</dependency>
<!-- 嵌入式activemq的broker所需要的依赖包 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.1</version>
</dependency>
<!-- activemq连接池 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.10</version>
</dependency>
<!-- spring支持jms的包 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.2.1.RELEASE</version>
</dependency>
<!--spring相关依赖包-->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.15</version>
</dependency>
- Spring配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd">
<!-- 开启包的自动扫描 -->
<context:component-scan base-package="com.activemq.demo"/>
<!-- 配置生产者 -->
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<!-- 正真可以生产Connection的ConnectionFactory,由对应的JMS服务商提供 -->
<bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.10.130:61616"/>
</bean>
</property>
<property name="maxConnections" value="100"/>
</bean>
<!-- 这个是队列目的地,点对点的Queue -->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 通过构造注入Queue名 -->
<constructor-arg index="0" value="spring-active-queue"/>
</bean>
<!-- 这个是队列目的地, 发布订阅的主题Topic-->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-active-topic"/>
</bean>
<!-- Spring提供的JMS工具类,他可以进行消息发送,接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 传入连接工厂 -->
<property name="connectionFactory" ref="connectionFactory"/>
<!-- 传入目的地 -->
<property name="defaultDestination" ref="destinationQueue"/>
<!-- 消息自动转换器 -->
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
</beans>
- 队列(Queue):
---------------生产者------------------
@Service
public class SpringMQ_Producer {
private JmsTemplate jmsTemplate;
@Autowired
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public static void main(String[] args) {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("Application.xml");
SpringMQ_Producer springMQ_producer = applicationContext.getBean(SpringMQ_Producer.class);
springMQ_producer.jmsTemplate.send(
session -> session.createTextMessage("***Spring和ActiveMQ的整合case111....."));
System.out.println("********send task over"); }
}
---------------消费者------------------
@Service
public class SpringMQ_Consumer {
private JmsTemplate jmsTemplate;
@Autowired
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public static void main(String[] args) {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("Application.xml");
SpringMQ_Consumer springMQ_consumer = applicationContext.getBean(SpringMQ_Consumer.class);
String returnValue = (String) springMQ_consumer.jmsTemplate.receiveAndConvert();
System.out.println("****消费者收到的消息: " + returnValue); }
}
- 主题(Topic):
---------------生产者------------------
@Service
public class SpringMQ_Topic_Producer {
private JmsTemplate jmsTemplate;
public SpringMQ_Topic_Producer(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public static void main(String[] args) {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("Application.xml");
SpringMQ_Topic_Producer springMQ_topic_producer = applicationContext.getBean(SpringMQ_Topic_Producer.class);
//直接调用application.xml里面创建的destinationTopic这个bean设置为目的地就行了
springMQ_topic_producer.jmsTemplate.setDefaultDestination(((Destination) applicationContext.getBean("destinationTopic")));
springMQ_topic_producer.jmsTemplate.send(
session -> session.createTextMessage("***Spring和ActiveMQ的整合TopicCase111.....")
);
}
}
---------------消费者------------------
@Service
public class SpringMQ_Topic_Consumer {
private JmsTemplate jmsTemplate;
public SpringMQ_Topic_Consumer(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public static void main(String[] args) {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("Application.xml");
SpringMQ_Topic_Consumer springMQConsumer = applicationContext.getBean(SpringMQ_Topic_Consumer.class);
//直接调用application.xml里面创建的destinationTopic这个bean设置为目的地就行了
springMQConsumer.jmsTemplate.setDefaultDestination(((Destination)
applicationContext.getBean("destinationTopic")));
String returnValue = (String) springMQConsumer.jmsTemplate.receiveAndConvert();
System.out.println("****消费者收到的消息: " + returnValue);
}
}
- 在Spring里面实现消费者不启动,直接通过配置监听完成:
<!--/配置监听程序-->
<bean id="jmscontainer" class="org.springframework.jms.1listener.DefaultlessageListenerContainer">
<property name="connectionFactory" ref="jmsFactory" />
<property name="destination" ref="destinationTopic" />
<!-- public class MyMessageListener implements MessageListener-->
<property name="messageListener" ref="myMessageListener" />
</bean>
//实现MessageListener的类,需要把这个类交给xml配置里面的DefaultMessageListenerContainer管理
@Component
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费者收到的消息" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
SpringBoot整合ActiveMQ:
- POM文件:
<!--spring boot整合activemq的jar包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.1.5.RELEASE</version>
</dependency>
- YML文件:
# web占用的端口
server:
port: 7777
spring:
activemq:
# activemq的broker的url
broker-url: tcp://192.168.17.3:61616
# 连接activemq的broker所需的账号和密码
user: admin
password: admin
jms:
# 目的地是queue还是topic, false(默认) = queue true = topic
pub-sub-domain: false
# 自定义队列名称。这只是个常量
myQueueName: springboot-activemq-queue
# 自定义主题名称。这只是个常量
myTopicName: springboot-activemq-topic
- 配置bean:
@Component
@EnableJms
//开启Springboot的Jms
public class ConfigBean {
@Value("myQueueName")
private String myQueueName;
@Bean
public ActiveMQQueue queue() {
//创建一个ActiveMQQueue
return new ActiveMQQueue(myQueueName);
}
@Value("${myTopicName}")
private String topicName;
@Bean
public ActiveMQTopic activeMQTopic() {
//创建一个ActiveMQTopic
return new ActiveMQTopic(topicName);
}
}
- 队列(queue):
-------------生产者-------------
@Component
public class Queue_Produce {
// JMS模板
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate ;
// 这个是我们配置的队列目的地
@Autowired
private Queue queue ;
// 发送消息
public void produceMessage(){
// 一参是目的地,二参是消息的内容
jmsMessagingTemplate.convertAndSend(queue,"****"+ UUID.randomUUID().toString().substring(0,6));
}
// 定时任务。每3秒执行一次。非必须代码,仅为演示。
@Scheduled(fixedDelay = 3000)
public void produceMessageScheduled(){
produceMessage();
}
}
-------------消费者-------------
@Component
public class Queue_consummer {
// 注册一个监听器。destination指定监听的主题。
@JmsListener(destination = "${myqueue}")
public void receive(TextMessage textMessage) throws Exception{
System.out.println(" *** 消费者收到消息 ***"+textMessage.getText());
}
}
- 主题(topic):
-------------生产者-------------
@Component
public class Topic_Produce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate ;
@Autowired
private Topic topic ;
@Scheduled(fixedDelay = 3000)
public void produceTopic(){
jmsMessagingTemplate.convertAndSend(topic,"主题消息"+ UUID.randomUUID().toString().substring(0,6));
}
}
-------------消费者-------------
@Component
public class Topic_Consummer {
@JmsListener(destination = "${mytopic}")
public void receive(TextMessage textMessage) throws Exception{
System.out.println("消费者受到订阅的主题:"+textMessage.getText());
}
}
- 持久化订阅:
-------------配置Bean-------------
/**
* 设置持久化订阅
* 配置文件的方式无法进行配置持久化订阅。所以需要自己去生成一个持久化订阅
*/
@Component
@EnableJms
public class ActiveMQConfigBean {
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Value("${spring.activemq.user}")
private String user;
@Value("${spring.activemq.password}")
private String password;
public ConnectionFactory connectionFactory(){
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(brokerUrl);
connectionFactory.setUserName(user);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean(name = "jmsListenerContainerFactory")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory =
new DefaultJmsListenerContainerFactory();
defaultJmsListenerContainerFactory.setConnectionFactory(connectionFactory());
defaultJmsListenerContainerFactory.setSubscriptionDurable(true);
defaultJmsListenerContainerFactory.setClientId("我是持久订阅者一号");
return defaultJmsListenerContainerFactory;
}
}
-------------消费者-------------
@Component
public class Topic_Consumer {
//需要在监听方法指定连接工厂
@JmsListener(destination = "${myTopicName}",
containerFactory = "jmsListenerContainerFactory")
public void consumer(TextMessage textMessage) throws JMSException {
System.out.println("订阅着收到消息: " + textMessage.getText());
}
}
SpringBoot整合ActiveMQ之Queue与Topoic并存:
- application.properties中定义相关配置项:
spring.jms.pub-sub-domain=true
spring.activemq.broker-url=tcp://172.18.1.18:61616
#spring.activemq.user=按实际情况配置
#spring.activemq.password=按实际情况配置
spring.activemq.in-memory=false
spring.activemq.pool.enabled=false
spring.activemq.pool.maxConnections=2
spring.activemq.pool.expiryTimeout=0
spring.activemq.pool.idleTimeout=30000
spring.activemq.packages.trust-all=true
- 定义配置类:
@Configuration
@EnableJms
public class JmsConfiguration {
// topic模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
// queue模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
}
- 定义监听器实现:
@Service
public class MQConsumerService {
@JmsListener(destination = "portal.admin.topic",containerFactory = "jmsListenerContainerTopic") // 监听指定消息主题
public void receiveTopic(String message) {
System.out.println(message);
}
@JmsListener(destination = "portal.admin.queue",containerFactory = "jmsListenerContainerQueue") // 监听指定消息主题
public void receiveQueue(String message) {
System.out.println(message);
}
}
ActiveMQ的传输协议
ActiveMQ传输协议简介:
- ActiveMQ支持的client-broker通讯协议有:TVP、NIO、UDP、SSL、Http(s)、VM。
- 其中配置Transport Connector的文件在ActiveMQ安装目录的conf/activemq.xml中的标签之内。见下图实际配置:
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1884?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
- 在上文给出的配置信息中,URI描述信息的头部都是采用协议名称:例如描述
①amqp协议的监听端口时,采用的URI描述格式为“amqp://······”;
②描述Stomp协议的监听端口时,采用URI描述格式为“stomp://······”;
③唯独在进行openwire协议描述时,URI头却采用的“tcp://······”。这是因为ActiveMQ中默认的消息协议就是openwire
ActiveMQ传输协议有哪些:
- Transmission Control Protocol(TCP)默认:
①这是默认的Broker配置,TCP的Client监听端口61616
②在网络传输数据前,必须要先序列化数据,消息是通过一个叫wire protocol的来序列化成字节流。 ③TCP连接的URI形式如:tcp://HostName:port?key=value&key=value,后面的参数是可选的。 ④TCP传输的的优点:
<1>TCP协议传输可靠性高,稳定性强
<2>高效率:字节流方式传递,效率很高
<3>有效性、可用性:应用广泛,支持任何平台
⑤关于Transport协议的可选配置参数可以参考官网 - New I/O API Protocol(NIO):
①NIO协议和TCP协议类似,但NIO更侧重于底层的访问操作。它允许开发人员对同一资源可有更多的client调用和服务器端有更多的负载。
②适合使用NIO协议的场景:
<1>可能有大量的Client去连接到Broker上,一般情况下,大量的Client去连接Broker是被操作系统的线程所限制的。因此,NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议。
<2>可能对于Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能。
③NIO连接的URI形式:nio://hostname:port?key=value&key=value
④关于Transport协议的可选配置参数可以参考官网 - AMQP协议:
①Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
②基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件限制。 - Stomp协议:
①STOMP,Streaming Text Orientation Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息中间件)设计的简单文本协议。 - Secure Sockets Layer Protocol(SSL):
①安全加密协议。 - MQTT协议:
①MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当作传感器和致动器(比如通过Twitter让房屋联网)的通信协议。 - WS协议(websocket):
①websocket协议。
配置nio协议:
- ActiveMQ这些协议传输的底层默认都是使用BIO网络的IO模型。只有当我们指定使用nio才使用NIO的IO模型。
- 修改配置文件activemq.xml:
①在<transportConnectors>
节点下添加如下内容:
<transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true" />
②修改完成后重启activemq:service activemq restart
③查看管理后台,可以看到页面多了nio - NIO协议增强:
①URI格式以”nio”开头,代表这个端口使用TCP协议为基础的NIO网络模型。但是这样的设置方式,只能使这个端口支持Openwire协议。
②如果我们既需要使用某一个端口支持NIO网络模型,又需要它支持多个协议:
<1>可以使用auto关键字
<2>使用”+”符号来为端口设置多种特性
③配置:<transportConnector name="auto+nio" uri="auto+nio://localhost:5671"/>
ActiveMQ的消息存储和持久化
ActiveMQ的消息持久化简介:
- 为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一半都会采用持久化机制。ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。
- 就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。再试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试尝试发送。消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。
- 一句话:ActiveMQ宕机了,消息不会丢失的机制。
ActiveMQ的消息持久化有哪些:
- AMQ Mesage Store(了解):
①AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储再一个个文件中文件的默认大小为32M,当一个文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本。
②基于文件的存储方式,是以前的默认消息存储,现在不用了。 - KahaDB消息存储(默认):
①基于日志文件,从ActiveMQ5.4开始默认的持久化插件。
②KahaDB是目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模型进行了优化。数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。 - JDBC消息存储:使用JDBC。
- LevelDB消息存储(了解):
①这种文件系统是从ActiveMQ5.8之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库存储形式,但是它提供比KahaDB更快的持久性。
②但它不使用自定义B-Tree实现来索引独写日志,而是使用基于LevelDB的索引。 - JDBC Message Store with ActiveMQ Journal:JDBC加强版。
KahaDB的存储原理:
- KahaDB在消息保存的目录中有4类文件和一个lock,跟ActiveMQ的其他几种文件存储引擎相比,这就非常简洁了。
①db-number.log:
KahaDB存储消息到预定大小的数据纪录文件中,文件名为db-number.log。当数据文件已满时,一个新的文件会随之创建,number数值也会随之递增,它随着消息数量的增多,如没32M一个文件,文件名按照数字进行编号,如db-1.log,db-2.log······。当不再有引用到数据文件中的任何消息时,文件会被删除或者归档。
②db.data:
该文件包含了持久化的BTree索引,索引了消息数据记录中的消息,它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-number。log里面存储消息。
③db.free:
当问当前db.data文件里面哪些页面是空闲的,文件具体内容是所有空闲页的ID
④db.redo:
用来进行消息恢复,如果KahaDB消息存储再强制退出后启动,用于恢复BTree索引。
⑤lock:
文件锁,表示当前kahadb独写权限的broker。
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
JDBC存储消息:
- 添加mysql数据库的驱动包到lib文件夹
- jdbcPersistenceAdapter配置:
①dataSource指定将要引用的持久化数据库的bean名称。
②createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这样每次启动都会去创建数据表了,一股是第一次启动的时候设置为true之后改成false。
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTableOnStartup="true"/>
</persistenceAdapter>
- 数据库连接池配置:
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"I>
<property name="url" value="jdbc:mysql/l自己的数据库IP:3306/activemq?relaxAutoCommit=true">
<property name="username" value="自己的数据库用户名"/>
<property name="password" value="自己的数据库密码""/><property name="maxTotal" value="200""/>
<property name="poolPreparedStatements" value="true"/>
</bean>
- 建库SQL和创表说明:
①建一个名为activemq的数据库
②如果新建数据库ok,上述配置ok,代码运行ok,3张表会自动生成
③如果表没生成,可能需要自己创建
②三张表的说明:
<1>ACTIVEMQ_MSGS
<2>ACTIVEMQ_ACKS
<3>ACTIVEMQ_LOCK
-------------ACTIVEMQ_MSGS-------------
说明:
消息表,缺省表名为ACTIVEMQ MSGS,queue和topic都存在里面,结构如下
数据库字段如下:
ID:自增的数据库主键
CONTAINER:消息的DestinationMSGID_PROD:消息发送者的主键
MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessagelDEXPIRATION:消息的过期时间,存储的是从197O-01-01到现在的毫秒数
MSG:消息本体的Java序列化对象的二进制数据
PRIORITY:优先级,从O-9,数值越大优先级越高
-------------ACTIVEMQ_ACKS-------------
说明:
activemq_acks用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存。
ACTIVEMQ_ACKS表存储持久订阅的信息和最后一个持久订阅接收的消息ID。
数据库字段如下:
CONTAINER:消息的Destination
SUB_DEST:如果是使用Static集群,这个字段会有集群其他系统的信息CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用以区分
SUB_NAME:订阅者名称
SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作LAST_ACKED_ID:记录消费过的消息的ID。
-------------ACTIVEMQ_LOCK-------------
说明:
表activemg_lock在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,其他的只能作为备份等
待MasterBroker不可用,才可能成为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker。
- 验证总结:
①点对点:在点对点类型中当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中当DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中。而且点对点类型中消息一旦被Consumer消费,就从数据中删除。消费前的消息会被存放到数据库,上面的消息被消费后被MQ自动删除。
②发布/订阅:设置了持久订阅数据库里面会保存订阅者的信息,消费者消费所有的数据后。ACTIVEMQ_MSGS数据表的数据并没有消失。持久化topic的消息不管是否被消费,是否有消费者,产生的数据永远都存在,且只存储一条。这个是要注意的,持久化的topic大量数据后可能导致性能下降。这里就像公总号一样,消费者消费完后,消息还会保留。 - 小总结:
①如果是queue在没有消费者消费的情况下会将消息保存到activemq_msgs表中,只要有任意一个消费者消费了,就会删除。
②消费过的消息如果是topic,一般是先启动消费订阅者然后再生产的情况下会将持久订阅者永久保存到qctivemq_acks,而消息则永久保存在activemq_msgs,在acks表中的订阅者有一个last_ack_id对应了activemq_msgs中的id字段,这样就知道订阅者最后收到的消息是哪一条。 - 注意:
①在配置关系型数据库作为ActiveMQ的持久化存储方案时,有坑 数据库jar包注意把对应版本的数据库jar或者你自己使用的非自带的数据库连接池jar包
②createTablesOnStartup属性默认为true,每次启动activemq都会自动创建表,在第一次启动后,应改为false,避免不必要的损失。
③java.lang.IllegalStateException: LifecycleProcessor not initialized确认计算机主机名名称没有下划线
JDBC Message store with ActiveMQ Journal:
- 说明:
①这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库读库。ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。
②举个例子:生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。
③为了高性能,这种方式使用日志文件存储+数据库存储。先将消息持久到日志文件,等待一段时间再将未消费的消息持久到数据库。该方式要比JDBC性能要高。 - 配置:
<persistenceFactory>
<journalPersistenceAdapterFactory journalLogFiles="4” journalLogFileSize="32768" useJournal="true" useQuickJournal="true" dataSource="#mysql-ds" dataDirectory="activemq-data"/>
</persistenceFactory>
- 总结:
以前是实时写入mysql,在使用了journal后,数据会被journal处理,如果在一定时间内journal处理(消费)完了,就不写入mysql,如果没消费完,就写入mysql,起到一个缓存的作用
总结:
- jdbc效率低,kahaDB效率高,jdbc+Journal效率较高。
- 持久化消息主要指的是:MQ所在服务器宕机了消息不会丢试的机制。
- 持久化机制演变的过程:从最初的AMQ Message Store方案到ActiveMQ V4版本退出的High Performance Journal(高性能事务支持)附件,并且同步推出了关于关系型数据库的存储方案。ActiveMQ5.3版本又推出了对KahaDB的支持(5.4版本后被作为默认的持久化方案),后来ActiveMQ 5.8版本开始支持LevelDB,到现在5.9提供了标准的Zookeeper+LevelDB集群化方案。
- ActiveMQ消息持久化机制有:
方案 | 原理 |
---|---|
AMQ | 基于日志文件 |
KahaDB | 基于日志文件,从ActiveMQ5.4开始默认使用 |
JDBC | 基于第三方数据库 |
Replicated LevelDB Store | 从5.9开始提供了LevelDB和Zookeeper的数据复制方法,用于Master-slave方式的首选数据复制方案。 |
ActiveMQ多节点集群
简介:
- 基于zookeeper和LevelDB搭建ActiveMQ集群。集群仅提供主备方式的高可用集群功能,避免单点故障。
引入消息队列之后该如何保证其高可用性。
三种集群方式对比:
- 基于shareFileSystem共享文件系统(KahaDB)
- 基于JDBC
- 基于可复制的LevelDB
官网集群原理图:
- 使用Zookeeper集群注册所有的ActiveMQBroker但只有其中一个Broker可以提供服务,它将被视为Master,其他的Broker处于待机状态被视为Slave。如果Master因故障而不能提供服务,Zookeeper会从Slave中选举出一个Broker充当Master。Slave连接Master并同步他们的存储状态,Slave不接受客户端连接。所有的存储操作都将被复制到连接至Maste的Slaves。如果Master宕机得到了最新更新的Slave会变成Master。故障节点在恢复后会重新加入到集群中并连接Master进入Slave模式。所有需要同步的消息操作都将等待存储状态被复制到其他法定节点的操作完成才能完成。
- 所以,如给你配置了replicas=3,name法定大小是(3/2)+1 =2。Master将会存储更新然后等待(2-1)=1个Slave存储和更新完成,才汇报success,至于为什么是2-1,阳哥的zookeeper讲解过自行复习。有一个ode要作为观察者存在。当一个新的Master被选中,你需要至少保障一个法定mode在线以能够找到拥有最新状态的ode,这个ode才可以成为新的Master。因此,推荐运行至少3个replica nodes以防止一个node失败后服务中断。
zookeeper+replicated-leveldb-store的主从集群简介:
ActiveMQ高级特性
异步投递Async Sends简介:
- 对于一个Slow Consumer,使用同步发送消息可能出现Producer堵塞的情况,慢消费者适合使用异步发送。
- 同步发送与异步发送详解:
①ActiveMQ支持同步,异步两种发送的模式将消息发送到broker,模式的选择对发送延时有巨大的影响。producer能达到怎么样的产出率(产出率=发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著提高发送的性能。
②ActiveMQ默认使用异步发送的模式:除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化的消息,这两种情况都是同步发送的。
③如果你没有使用事务且发送的是持久化的消息,每一次发送都是同步发送的且会阻塞producer知道broker返回一个确认,表示消息已经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端带来了很大的延时。
④很多高性能的应用,允许在失败的情况下有少量的数据丢失。如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化的消息。
⑤异步发送它可以最大化producer端的发送效率。我们通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能;不过这也带来了额外的问题,就是需要消耗更多的Client端内存同时也会导致broker端性能消耗增加;此外它不能有效的确保消息的发送成功。在userAsyncSend=true的情况下客户端需要容忍消息丢失的可能。 - 异步发送配置:
public class Jms_TX_Producer {
// 方式1。3种方式任选一种
private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626?jms.useAsyncSend=true";
private static final String ACTIVEMQ_QUEUE_NAME = "Async";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 方式2
activeMQConnectionFactory.setUseAsyncSend(true);
Connection connection = activeMQConnectionFactory.createConnection();
// 方式3
((ActiveMQConnection)connection).setUseAsyncSend(true);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
try {
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("tx msg--" + i);
producer.send(textMessage);
}
System.out.println("消息发送完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
session.close();
connection.close();
}
}
}
- 异步消息如何确定发送成功?
①异步发送丢失消息的场景是:生产者设置userAsyncSend=true,使用producer.send(msg)持续发送消息。如果消息不阻塞,生产者会认为所有send的消息均被成功发送至MQ。如果MQ突然宕机,此时生产者端内存中尚未被发送至MQ的消息都会丢失。
②所以正确的异步发送方法是需要接收回调的。同步发送和异步发送的区别就在此,同步发送等send不阻塞了就表示一定发送成功了,异步发送需要客户端回执并由客户端再判断一次是否发送成功。
public class Jms_TX_Producer {
private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
private static final String ACTIVEMQ_QUEUE_NAME = "Async";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
activeMQConnectionFactory.setUseAsyncSend(true);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer)session.createProducer(queue);
try {
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("tx msg--" + i);
textMessage.setJMSMessageID(UUID.randomUUID().toString()+"orderAtguigu");
final String msgId = textMessage.getJMSMessageID();
activeMQMessageProducer.send(textMessage, new AsyncCallback() {
public void onSuccess() {
System.out.println("成功发送消息Id:"+msgId);
}
public void onException(JMSException e) {
System.out.println("失败发送消息Id:"+msgId);
}
});
}
System.out.println("消息发送完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
activeMQMessageProducer.close();
session.close();
connection.close();
}
}
}
延迟投递和定时投递简介:
- 四大属性:
Property name | type | description |
---|---|---|
AMQ_SCHEDULED_DELAY | long | 延迟投递的时间 |
AMQ_SCHEDULED_PERIOD | long | 重复投递的时间间隔 |
AMQ_SCHEDULED_REPEAT | int | 重复投递次数 |
AMQ_SCHEDULED_CRON | string | Cron表达式 |
- 配置:要在activemq.xml中配置schedulerSupport属性为true
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true" >
- Java代码里面封装的辅助消息类型:ScheduledMessage
public class Jms_TX_Producer {
private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
private static final String ACTIVEMQ_QUEUE_NAME = "Schedule01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageProducer messageProducer = session.createProducer(queue);
long delay = 10*1000;
long period = 5*1000;
int repeat = 3 ;
try {
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("tx msg--" + i);
// 延迟的时间
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
// 重复投递的时间间隔
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
// 重复投递的次数
textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
// 此处的意思:该条消息,等待10秒,之后每5秒发送一次,重复发送3次。
messageProducer.send(textMessage);
}
System.out.println("消息发送完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
messageProducer.close();
session.close();
connection.close();
}
}
}
消息消费的重试机制:
- 是什么?
①消费者收到消息,之后出现异常了,没有告诉broker确认收到该消息,broker会尝试再将该消息发送给消费者。尝试n次,如果消费者还是没有确认收到该消息,那么该消息将被放到死信队列重,之后broker不会再将该消息发送给消费者。
- 具体哪些情况会引发消息重发?
①Client用了transactions且再session中调用了rollback
②Client用了transactions且再调用commit之前关闭或者没有commit
③Client再CLIENT_ACKNOWLEDGE的传递模式下,session中调用了recover - 请说说消息重发时间间隔和重发次数?
①间隔:1
②次数:6 - 有毒消息Poison ACK:
①一个消息被redelivedred超过默认的最大重发次数(默认6次)时,消费的回个MQ发一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(私信队列)。 - 属性说明:
- 修改配置参数:
public class Jms_TX_Consumer {
private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
private static final String ACTIVEMQ_QUEUE_NAME = "dead01";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 修改默认参数,设置消息消费重试3次
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(3);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("***消费者接收到的消息: " + textMessage.getText());
//session.commit();
}catch (Exception e){
e.printStackTrace();
}
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
死信队列:
- 简介:
①异常消息规避处理的集合,主要处理失败的消息。
②ActiveMQ中引入了“死倍队列”(Dead Letter Queue〉的概念。即一条消息再被重发了多次后(默认为重发6次redeliveryCounter==6),将会被ActiveMQ移入“死信队列”。开发人员可以在这个Queue中查看处理出错的消息,进行人工干预。 - 死信队列控制台:
- 使用:
死信队列的配置(一般采用默认):
①sharedDeadLetterStrategy:
<1>不管是queue还是topic,失败的消息都放到这个队列中。下面修改activemq.xml的配置,可以达到修改队列的名字。
<2>将所有的eadLetter保存在一个共享的队列中,这是ActiveMQ broker端默认的策略。共享队列默认为“ActiveMQ.DLQ”,可以通过“deadLetterQueue”属性来设定。
②individualDeadLetterStrategy:
<1>可以为queue和topic单独指定两个死信队列。还可以为某个话题,单独指定一个死信队列。
③自动删除过期消息:
<1>过期消息是值生产者指定的过期时间,超过这个时间的消息。
<2>有时需要直接删除过期的消息而不需要发送到死队列中,“processExpired”表示是否将过期消息放入死信队列,默认为true。
④存放非持久消息到死信队列中:
<1>默认情况下,Activemq不会把非持久的死消息发送到死信队列中。
<2>processNonPersistent”表示是否将“非持久化”消息放入死信队列,默认为false。
<3>非持久性如果你想把非持久的消息发送到死队列中,需要设置属性processNonPersistent=“true”
消息不被重复消费,幂等性:
- 网络延迟传输中,会造成进行MQ重试中,在重试过程中,可能会造成重复消费。
- 如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
- 如果上面两种情况还不行,准备一个第三服务方来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。
幂等性如何解决,根据messageid去查这个消息是否被消费了。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/7096.html