SpringBoot集成ActiveMQ[通俗易懂]

SpringBoot集成ActiveMQ[通俗易懂]SpringBoot集成ActiveMQ2017年01月07日17:32:58 帅天下 阅读数:11466 标签: ActiveMQ消息中间件jmsspringboot更多个人分类: ActiveMQ版权声明:本文为博主原创文章,未经博主允许不得转载。https://blog.csdn.net/songhaifengshuaige/article/details/54176520…

大家好,欢迎来到IT知识分享网。

SpringBoot集成ActiveMQ

2017年01月07日 17:32:58 帅天下 阅读数:11466 标签: ActiveMQ消息中间件jmsspringboot更多

个人分类: ActiveMQ

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/songhaifengshuaige/article/details/54176520

在实际项目中,很多时候需要消息中间件来进行分布式系统间的通信。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能。

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。本章主要介绍ActiveMQ 相关概念以及应用(springboot集成实现、消息的广播以及消息持久化配置)。具体每个功能实现将在后续的专栏里面实践整理。

 

本章概要

1、概念

2、应用

2.1、springboot与activemq集成&消息广播

2.2、使用外部ActiveMQ服务

2.3、消息持久化设置

2.4、生产环境密码设置

 

概念


1、消息机制主要有三种

1.1、点对点(p2p):包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

P2P的特点

  • 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
  • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
  • 接收者在成功接收消息之后需向队列应答成功

如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式。

SpringBoot集成ActiveMQ[通俗易懂]

SpringBoot集成ActiveMQ[通俗易懂]

1.2、订阅/发布:包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

Pub/Sub的特点

  • 每个消息可以有多个消费者
  • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
  • 为了消费消息,订阅者必须保持运行的状态。

为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。

SpringBoot集成ActiveMQ[通俗易懂]

SpringBoot集成ActiveMQ[通俗易懂]

1.3、应答模式:和前面两种方式比较起来,request-response的通信方式很常见,但是不是默认提供的一种模式。在前面的两种模式中都是一方负责发送消息而另外一方负责处理。而我们实际中的很多应用相当于一种一应一答的过程,需要双方都能给对方发送消息。于是请求-应答的这种通信方式也很重要。它也应用的很普遍。      请求-应答方式并不是JMS规范系统默认提供的一种通信方式,而是通过在现有通信方式的基础上稍微运用一点技巧实现的。

SpringBoot集成ActiveMQ[通俗易懂]

SpringBoot集成ActiveMQ[通俗易懂]

2、JMS消息基本组件

2.1、ConnectionFactory

创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。

2.2、Destination

Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。

所以,Destination实际上就是两种类型的对象:Queue、Topic可以通过JNDI来查找Destination。

2.3、Connection

Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。

2.4、Session

Session是操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。

2.5、消息的生产者

消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。

2.6、消息消费者

消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。

2.7、MessageListener

消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。

 

应用


在实际项目中,通过生产者和消费者会是两个独立的应用工程, 也正是如此通过消息队列实现了解耦和广播,考虑到仅仅案例使用,本案例将两者放置在一个工程应用。

1、springboot与activemq集成&消息广播

1.1、添加依赖:

<!– activeMq support –>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-activemq</artifactId>

</dependency>

<!– activeMq end –>

1.2、首先我们定义点对点消息队列和一个主题:

package com.shf.springboot.activeMQ.config;

 

import javax.jms.Queue;

import javax.jms.Topic;

 

import org.apache.activemq.command.ActiveMQQueue;

import org.apache.activemq.command.ActiveMQTopic;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.jms.annotation.EnableJms;

 

/**

* 定义队列||主题

* @author song

*

*/

@Configuration

@EnableJms

public class ActiveMQConfiguration {

 

/**

* 定义点对点队列

* @return

*/

@Bean

public Queue queue() {

return new ActiveMQQueue(“sample.queue”);

}

/**

* 定义一个主题

* @return

*/

@Bean

public Topic topic() {

return new ActiveMQTopic(“sample.topic”);

}

}

1.3、定义一个生产者,定时5S中生产发出一次数据,通过springboot提供的JmsMessagingTemplate实现send动作,并生产p2p与topic主题消息各一个:

package com.shf.springboot.activeMQ.producer;

 

import javax.jms.Queue;

import javax.jms.Topic;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Configuration;

import org.springframework.jms.core.JmsMessagingTemplate;

import org.springframework.jms.core.JmsTemplate;

import org.springframework.scheduling.annotation.Scheduled;

 

/**

* 定义生产者

* @author song

*

*/

@Configuration

public class Producer {

private static final Logger logger=LoggerFactory.getLogger(Producer.class);

@Autowired

private JmsMessagingTemplate jmsMessagingTemplate;

 

@Autowired

private Queue queue;

 

@Autowired

private Topic topic;

/**

* 每5S执行一次

*/

@Scheduled(fixedRate=5000,initialDelay=3000)

public void send() {

//发送队列消息

this.jmsMessagingTemplate.convertAndSend(this.queue, “生产者辛苦生产的点对点消息成果”);

System.out.println(“生产者:辛苦生产的点对点消息成果”);

//发送订阅消息

this.jmsMessagingTemplate.convertAndSend(this.topic, “生产者辛苦生产的订阅/发布消息成果”);

System.out.println(“生产者:辛苦生产的订阅/发布消息成果”);

}

}

1.4、定义一个消费者,能够3个消费者:

package com.shf.springboot.activeMQ.consumer;

 

import org.springframework.jms.annotation.JmsListener;

import org.springframework.stereotype.Component;

 

/**

* 定义消费者

* @author song

*

*/

@Component

public class Consumer {

 

@JmsListener(destination = “sample.queue”)

public void receiveQueue(String text) {

System.out.println(“消费者:来源于生产者的消息:”+text);

}

 

@JmsListener(destination = “sample.topic”)

public void receiveSub1(String text) {

System.out.println(“消费者:Consumer1=”+text);

}

@JmsListener(destination = “sample.topic”)

public void receiveSub2(String text) {

System.out.println(“消费者:Consumer2=”+text);

}

}

小结:

@JmsListener注解能够定义对消息主题的监听,并通过destination参数决定监听来源。

 

1.5、启动服务,查看控制台:

SpringBoot集成ActiveMQ[通俗易懂]

SpringBoot集成ActiveMQ[通俗易懂]

SpringBoot集成ActiveMQ[通俗易懂]

SpringBoot集成ActiveMQ[通俗易懂]

丢了些什么,消费者并没有消费订阅发布类型的消息,这是由于springboot默认采用的是p2p模式进行消息的监听。

 

1.6、在application.propreties添加如下配置,对订阅发布类型消息监听:

#default point to point

spring.jms.pub-sub-domain=true

 

1.7、查看控制台:

SpringBoot集成ActiveMQ[通俗易懂]

SpringBoot集成ActiveMQ[通俗易懂]

此时消费者没有消费点对点类型消息,目前默认仅支持接受一个类型的消息。如何将两者兼容消费将在后续的实践中尝试。

 

特别注意:我们通过添加spring-boot-starter-activemq依赖即可默认采用内嵌的activeMQ,在生产环境下个人认为尽量还是采用外部服务,提高扩展性和维护性。

 

2、使用外部ActiveMQ服务

2.1、首先至官网下载最新版本http://activemq.apache.org/activemq-5142-release.html

SpringBoot集成ActiveMQ[通俗易懂]

SpringBoot集成ActiveMQ[通俗易懂]

2.2、启动服务:

目前是win64系统,故直接在D:\apache-activemq-5.14.2\bin\win64目录下启动activemq.bat

2.3、通过如下地址进入控制台查看服务运行情况:

SpringBoot集成ActiveMQ[通俗易懂]

SpringBoot集成ActiveMQ[通俗易懂]

2.4、通过如下方式修改控制台端口&TCP连接端口:

2.4.1、调整连接端口,找到如下文件

SpringBoot集成ActiveMQ[通俗易懂]

SpringBoot集成ActiveMQ[通俗易懂]

修改<transportConnectors>节点下内容即可,本次我们修改为61626,故同步调整application.properties中地址spring.activemq.broker-url=tcp://localhost:61626;

2.4.2、调整监控系统端口,找到jetty.xml文件,修改如下节点:

SpringBoot集成ActiveMQ[通俗易懂]SpringBoot集成ActiveMQ[通俗易懂]

2.4.3、调整监控登录用户名密码,找到jetty-realm.properties文件按照如下格式修改即可:

SpringBoot集成ActiveMQ[通俗易懂]

SpringBoot集成ActiveMQ[通俗易懂]

2.5、通过请求管理控制台,可以看到目前我们队列相关信息:

SpringBoot集成ActiveMQ[通俗易懂]

SpringBoot集成ActiveMQ[通俗易懂]

SpringBoot集成ActiveMQ[通俗易懂]

SpringBoot集成ActiveMQ[通俗易懂]

由于目前配置的是发布订阅消费模式,故topic下数据消费正常,而p2p模式数据并没有被消费,而p2p能够保证数据一定被消费且仅为一次,故我们切换消费模式,仅消费P2P消息,查看控制效果:

SpringBoot集成ActiveMQ[通俗易懂]

SpringBoot集成ActiveMQ[通俗易懂]

以上不全,在来看activeMQ的控制台:

SpringBoot集成ActiveMQ[通俗易懂]

SpringBoot集成ActiveMQ[通俗易懂]

已经完全被消费。

小结:我们有多个消费者情况下,通过P2P模式保证了每个消息仅被消费一次,在下游消费系统能够消费的时候进行逻辑处理,那么就很容易实现错峰流控、可靠传递。

 

3、消息持久化设置

3.1、activeMQ的消息持久化可以有多种方式,系统默认采用的是kahaDB方式,该方式属于本地文件存储,同时支持事务。但实际使用过程中,我们可能更加习惯采用关系型数据库的方式存储,下面我们一mysql为例。

3.2、首先调整activemq.xml文件中的配置:

<persistenceAdapter>

<!– default local file model –>

<!–<kahaDB directory=”${activemq.data}/kahadb”/>–>

<!– createTablesOnStartup:default value is true; if true then every time start will create table;so first time we set it be true;others be false –>

<jdbcPersistenceAdapter dataDirectory=”${activemq.base}/data” dataSource=”#activemq-ds” createTablesOnStartup=”false” />

</persistenceAdapter>

<!– add DruidDataSource pool –>

<bean id=”activemq-ds” class=”com.alibaba.druid.pool.DruidDataSource” destroy-method=”close”>

<property name=”driverClassName” value=”com.mysql.jdbc.Driver”/>

<property name=”url” value=”jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true”/>

<property name=”username” value=”root”/>

<property name=”password” value=”root”/>

<property name=”maxActive” value=”200″/>

<property name=”poolPreparedStatements” value=”true”/>

</bean>

 

特别注意:

3.2.1、createTablesOnStartup:在首次配置启动服务时,我们需要配置为true,启动过程中将默认帮我们创建数据库表;启动后我们将其改为false,后续启动更不会重新创建。

3.2.2、本次我们采用的是mysql数据库和Druid数据库连接池,我们需要在apache-activemq-5.14.2\lib目录下添加如下jar包。

SpringBoot集成ActiveMQ[通俗易懂]

SpringBoot集成ActiveMQ[通俗易懂]

3.3、启动服务,我们将在数据库中看到新建的相关表:

SpringBoot集成ActiveMQ[通俗易懂]

SpringBoot集成ActiveMQ[通俗易懂]

3.4、下面我们启动服务,查询activemq_msgs消息记录表,主要分以下几个场景:

3.4.1:启动服务测试P2P类型消息,并不启用消费者,此时所有的消息将被记录在activemq_msgs表中;

3.4.2:启动服务继续测试P2P类型消息,本次启用消费者,此时消费者会一次性接收到之前保存在activemq_msgs表中的消息,查询activemq_msgs表发现没有数据。说明在生产者生产数据存入activemq_msgs中后,消费者一旦消费则activemq_msgs中数据则删除,从而保证了仅会被消费一次,同时也保证了可靠性。

3.4.3:启动服务测试发布订阅类型消息,首先发现无论是否有开启状态的订阅者,数据库并没有此类型消息。通过对原生代码的练习发现,需要有持久化订阅者后数据库将会保存发布订阅类型消息(否则无),且能够保证一定被每个消费者消费。消费后将在activemq_acks查看到被消费的情况。普通订阅者仅仅在线时才能消费消息。

 

4、生产环境密码设置

4.1、首先我们在activemq.xml找到<systemUsage>,在其上方变价如下插件

<!– set connection user authentication–>

<plugins>

<simpleAuthenticationPlugin>

<users>

<authenticationUser username=”${activemq.username}” password=”${activemq.password}” groups=”users,admins”/>

</users>

</simpleAuthenticationPlugin>

</plugins>

4.2、打开**\apache-activemq-5.14.2\conf中credentials.properties文件进行密码设置

activemq.username=admin

activemq.password=123456

guest.password=123456

4.3、在springboot工程中仅需要在application.properties文件中添加一下配置即可

spring.activemq.user=admin

spring.activemq.password=123456

 

待实现


1、如何同时实现发布订阅与点对点类型消息并同时被消费;

2、实现应答模式消息处理;

3、分布式事务的实现(MQ本地事务、MQ与MySQL单应用分布式事务、多应用分布式部署下最终一致性事务);

 

参考资料


1、ActiveMQ的使用与遇到的相关坑:SpringBoot集成ActiveMQ

2、大型网站架构系列-消息队列:http://mp.weixin.qq.com/s?__biz=MjM5NzMyMjAwMA==&mid=403966545&idx=1&sn=34fde46b427f58b4736fb640a5fa8568&mpshare=1&scene=23&srcid=1216SaLUBxg16Gh85bRjJ5NX#rd

3、消息队列设计精要:SpringBoot集成ActiveMQ

 

附录


activemq_msgs用于存储消息,Queue和Topic都存储在这个表中:

ID:自增的数据库主键

Container:消息的Destination

MSGID_PROD:消息发送者客户端的主键

MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID

EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数

MSG:消息本体的Java序列化对象的二进制数据

PRIORITY:优先级,从0-9,数值越大优先级越高

activemq_acks用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存:

主要的数据库字段如下:

CONTAINER:消息的Destination

SUB_DEST:如果是使用Static集群,这个字段会有集群其他系统的信息

CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用以区分

SUB_NAME:订阅者名称

SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作

LAST_ACKED_ID:记录消费过的消息的ID。

activemq_lock在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,

其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。

这个表用于记录哪个Broker是当前的Master Broker。

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/7110.html

(0)
上一篇 2023-01-03 09:52
下一篇 2023-01-03 09:52

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

关注微信