大家好,欢迎来到IT知识分享网。
SpringBoot集成ActiveMQ
2017年01月07日 17:32:58 帅天下 阅读数:11466 标签: ActiveMQ消息中间件jmsspringboot更多
个人分类: ActiveMQ
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/songhaifengshuaige/article/details/54176520
在实际项目中,很多时候需要消息中间件来进行分布式系统间的通信。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能。
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。本章主要介绍ActiveMQ 相关概念以及应用(springboot集成实现、消息的广播以及消息持久化配置)。具体每个功能实现将在后续的专栏里面实践整理。
本章概要
1、概念
2、应用
2.1、springboot与activemq集成&消息广播
2.2、使用外部ActiveMQ服务
2.3、消息持久化设置
2.4、生产环境密码设置
概念
1、消息机制主要有三种
1.1、点对点(p2p):包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
P2P的特点
- 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
- 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
- 接收者在成功接收消息之后需向队列应答成功
如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式。
1.2、订阅/发布:包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
Pub/Sub的特点
- 每个消息可以有多个消费者
- 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
- 为了消费消息,订阅者必须保持运行的状态。
为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。
1.3、应答模式:和前面两种方式比较起来,request-response的通信方式很常见,但是不是默认提供的一种模式。在前面的两种模式中都是一方负责发送消息而另外一方负责处理。而我们实际中的很多应用相当于一种一应一答的过程,需要双方都能给对方发送消息。于是请求-应答的这种通信方式也很重要。它也应用的很普遍。 请求-应答方式并不是JMS规范系统默认提供的一种通信方式,而是通过在现有通信方式的基础上稍微运用一点技巧实现的。
2、JMS消息基本组件
2.1、ConnectionFactory
创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。
2.2、Destination
Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。
所以,Destination实际上就是两种类型的对象:Queue、Topic可以通过JNDI来查找Destination。
2.3、Connection
Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。
2.4、Session
Session是操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。
2.5、消息的生产者
消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。
2.6、消息消费者
消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。
2.7、MessageListener
消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。
应用
在实际项目中,通过生产者和消费者会是两个独立的应用工程, 也正是如此通过消息队列实现了解耦和广播,考虑到仅仅案例使用,本案例将两者放置在一个工程应用。
1、springboot与activemq集成&消息广播
1.1、添加依赖:
<!– activeMq support –>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!– activeMq end –>
1.2、首先我们定义点对点消息队列和一个主题:
package com.shf.springboot.activeMQ.config;
import javax.jms.Queue;
import javax.jms.Topic;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
/**
* 定义队列||主题
* @author song
*
*/
@Configuration
@EnableJms
public class ActiveMQConfiguration {
/**
* 定义点对点队列
* @return
*/
@Bean
public Queue queue() {
return new ActiveMQQueue(“sample.queue”);
}
/**
* 定义一个主题
* @return
*/
@Bean
public Topic topic() {
return new ActiveMQTopic(“sample.topic”);
}
}
1.3、定义一个生产者,定时5S中生产发出一次数据,通过springboot提供的JmsMessagingTemplate实现send动作,并生产p2p与topic主题消息各一个:
package com.shf.springboot.activeMQ.producer;
import javax.jms.Queue;
import javax.jms.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.scheduling.annotation.Scheduled;
/**
* 定义生产者
* @author song
*
*/
@Configuration
public class Producer {
private static final Logger logger=LoggerFactory.getLogger(Producer.class);
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
@Autowired
private Topic topic;
/**
* 每5S执行一次
*/
@Scheduled(fixedRate=5000,initialDelay=3000)
public void send() {
//发送队列消息
this.jmsMessagingTemplate.convertAndSend(this.queue, “生产者辛苦生产的点对点消息成果”);
System.out.println(“生产者:辛苦生产的点对点消息成果”);
//发送订阅消息
this.jmsMessagingTemplate.convertAndSend(this.topic, “生产者辛苦生产的订阅/发布消息成果”);
System.out.println(“生产者:辛苦生产的订阅/发布消息成果”);
}
}
1.4、定义一个消费者,能够3个消费者:
package com.shf.springboot.activeMQ.consumer;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* 定义消费者
* @author song
*
*/
@Component
public class Consumer {
@JmsListener(destination = “sample.queue”)
public void receiveQueue(String text) {
System.out.println(“消费者:来源于生产者的消息:”+text);
}
@JmsListener(destination = “sample.topic”)
public void receiveSub1(String text) {
System.out.println(“消费者:Consumer1=”+text);
}
@JmsListener(destination = “sample.topic”)
public void receiveSub2(String text) {
System.out.println(“消费者:Consumer2=”+text);
}
}
小结:
@JmsListener注解能够定义对消息主题的监听,并通过destination参数决定监听来源。
1.5、启动服务,查看控制台:
丢了些什么,消费者并没有消费订阅发布类型的消息,这是由于springboot默认采用的是p2p模式进行消息的监听。
1.6、在application.propreties添加如下配置,对订阅发布类型消息监听:
#default point to point
spring.jms.pub-sub-domain=true
1.7、查看控制台:
此时消费者没有消费点对点类型消息,目前默认仅支持接受一个类型的消息。如何将两者兼容消费将在后续的实践中尝试。
特别注意:我们通过添加spring-boot-starter-activemq依赖即可默认采用内嵌的activeMQ,在生产环境下个人认为尽量还是采用外部服务,提高扩展性和维护性。
2、使用外部ActiveMQ服务
2.1、首先至官网下载最新版本http://activemq.apache.org/activemq-5142-release.html:
2.2、启动服务:
目前是win64系统,故直接在D:\apache-activemq-5.14.2\bin\win64目录下启动activemq.bat
2.3、通过如下地址进入控制台查看服务运行情况:
2.4、通过如下方式修改控制台端口&TCP连接端口:
2.4.1、调整连接端口,找到如下文件
修改<transportConnectors>节点下内容即可,本次我们修改为61626,故同步调整application.properties中地址spring.activemq.broker-url=tcp://localhost:61626;
2.4.2、调整监控系统端口,找到jetty.xml文件,修改如下节点:
2.4.3、调整监控登录用户名密码,找到jetty-realm.properties文件按照如下格式修改即可:
2.5、通过请求管理控制台,可以看到目前我们队列相关信息:
由于目前配置的是发布订阅消费模式,故topic下数据消费正常,而p2p模式数据并没有被消费,而p2p能够保证数据一定被消费且仅为一次,故我们切换消费模式,仅消费P2P消息,查看控制效果:
以上不全,在来看activeMQ的控制台:
已经完全被消费。
小结:我们有多个消费者情况下,通过P2P模式保证了每个消息仅被消费一次,在下游消费系统能够消费的时候进行逻辑处理,那么就很容易实现错峰流控、可靠传递。
3、消息持久化设置
3.1、activeMQ的消息持久化可以有多种方式,系统默认采用的是kahaDB方式,该方式属于本地文件存储,同时支持事务。但实际使用过程中,我们可能更加习惯采用关系型数据库的方式存储,下面我们一mysql为例。
3.2、首先调整activemq.xml文件中的配置:
<persistenceAdapter>
<!– default local file model –>
<!–<kahaDB directory=”${activemq.data}/kahadb”/>–>
<!– createTablesOnStartup:default value is true; if true then every time start will create table;so first time we set it be true;others be false –>
<jdbcPersistenceAdapter dataDirectory=”${activemq.base}/data” dataSource=”#activemq-ds” createTablesOnStartup=”false” />
</persistenceAdapter>
<!– add DruidDataSource pool –>
<bean id=”activemq-ds” class=”com.alibaba.druid.pool.DruidDataSource” destroy-method=”close”>
<property name=”driverClassName” value=”com.mysql.jdbc.Driver”/>
<property name=”url” value=”jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true”/>
<property name=”username” value=”root”/>
<property name=”password” value=”root”/>
<property name=”maxActive” value=”200″/>
<property name=”poolPreparedStatements” value=”true”/>
</bean>
特别注意:
3.2.1、createTablesOnStartup:在首次配置启动服务时,我们需要配置为true,启动过程中将默认帮我们创建数据库表;启动后我们将其改为false,后续启动更不会重新创建。
3.2.2、本次我们采用的是mysql数据库和Druid数据库连接池,我们需要在apache-activemq-5.14.2\lib目录下添加如下jar包。
3.3、启动服务,我们将在数据库中看到新建的相关表:
3.4、下面我们启动服务,查询activemq_msgs消息记录表,主要分以下几个场景:
3.4.1:启动服务测试P2P类型消息,并不启用消费者,此时所有的消息将被记录在activemq_msgs表中;
3.4.2:启动服务继续测试P2P类型消息,本次启用消费者,此时消费者会一次性接收到之前保存在activemq_msgs表中的消息,查询activemq_msgs表发现没有数据。说明在生产者生产数据存入activemq_msgs中后,消费者一旦消费则activemq_msgs中数据则删除,从而保证了仅会被消费一次,同时也保证了可靠性。
3.4.3:启动服务测试发布订阅类型消息,首先发现无论是否有开启状态的订阅者,数据库并没有此类型消息。通过对原生代码的练习发现,需要有持久化订阅者后数据库将会保存发布订阅类型消息(否则无),且能够保证一定被每个消费者消费。消费后将在activemq_acks查看到被消费的情况。普通订阅者仅仅在线时才能消费消息。
4、生产环境密码设置
4.1、首先我们在activemq.xml找到<systemUsage>,在其上方变价如下插件
<!– set connection user authentication–>
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username=”${activemq.username}” password=”${activemq.password}” groups=”users,admins”/>
</users>
</simpleAuthenticationPlugin>
</plugins>
4.2、打开**\apache-activemq-5.14.2\conf中credentials.properties文件进行密码设置
activemq.username=admin
activemq.password=123456
guest.password=123456
4.3、在springboot工程中仅需要在application.properties文件中添加一下配置即可
spring.activemq.user=admin
spring.activemq.password=123456
待实现
1、如何同时实现发布订阅与点对点类型消息并同时被消费;
2、实现应答模式消息处理;
3、分布式事务的实现(MQ本地事务、MQ与MySQL单应用分布式事务、多应用分布式部署下最终一致性事务);
参考资料
1、ActiveMQ的使用与遇到的相关坑:SpringBoot集成ActiveMQ
3、消息队列设计精要:SpringBoot集成ActiveMQ
附录
activemq_msgs用于存储消息,Queue和Topic都存储在这个表中:
ID:自增的数据库主键
Container:消息的Destination
MSGID_PROD:消息发送者客户端的主键
MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数
MSG:消息本体的Java序列化对象的二进制数据
PRIORITY:优先级,从0-9,数值越大优先级越高
activemq_acks用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存:
主要的数据库字段如下:
CONTAINER:消息的Destination
SUB_DEST:如果是使用Static集群,这个字段会有集群其他系统的信息
CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用以区分
SUB_NAME:订阅者名称
SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作
LAST_ACKED_ID:记录消费过的消息的ID。
activemq_lock在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,
其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。
这个表用于记录哪个Broker是当前的Master Broker。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/7110.html