消息中间件ActiveMQ[通俗易懂]

消息中间件ActiveMQ[通俗易懂]JMS是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口

大家好,欢迎来到IT知识分享网。

消息中间件


消息队列 是指利用 高效可靠 消息传递机制 进行与平台无关的 数据交流,并基于 数据通信 来进行分布式系统的集成。

特点(作用)

  • 应用解耦
  • 异步通信
  • 流量削峰
  • (海量)日志处理
  • 消息通讯
  • ……

应用场景

根据消息队列的特点,可以衍生出很多场景,或者说很多场景都能用到。下面举几个例子:

1)异步通信:注册时的短信、邮件通知,减少响应时间;

2)应用解耦:信息发送者和消息接受者无需耦合,比如调用第三方;

3)流量削峰:例如秒杀系统;

ActiveMQ


官网:https://activemq.apache.org/

简介:

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持 JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,翻译为Java消息服务。

特点:

  1. 支持来自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各种跨语言客户端和协议
  2. 完全支持JMS客户端和Message Broker中的企业集成模式
  3. 支持许多高级功能,如消息组,虚拟目标,通配符和复合目标
  4. 完全支持JMS 1.1和J2EE 1.4,支持瞬态,持久,事务和XA消息
  5. Spring支持,以便ActiveMQ可以轻松嵌入到Spring应用程序中,并使用Spring的XML配置机制进行配置
  6. 专为高性能集群,客户端 – 服务器,基于对等的通信而设计的
  7. CXF和Axis支持,以便ActiveMQ可以轻松地放入这些Web服务堆栈中以提供可靠的消息传递
  8. 可以用作内存JMS提供程序,非常适合单元测试JMS支持可插拔传输协议,例如in-VM,TCP,SSL,NIO,UDP,多播,JGroups和JXTA传输
  9. 使用JDBC和高性能日志支持非常快速的持久性

Docker安装


[root@xxx ~]# docker pull activemq

消息中间件ActiveMQ[通俗易懂]

[root@xxx~]# docker run –name=’activemq’ \

-itd \
> -p 8161:8161 \
> -p 61616:61616 \
> -e ACTIVEMQ_ADMIN_LOGIN=admin \
> -e ACTIVEMQ_ADMIN_PASSWORD=123456 \
> –restart=always \
> -v /usr/soft/activemq:/data/activemq \
> -v /usr/soft/activemq/log:/var/log/activemq \
> webcenter/activemq

  • 61616是 activemq 的容器使用端口
  • 8161是 web 页面管理端口
  • /usr/soft/activemq 是将activeMQ运行文件挂载到该目录
  • /usr/soft/activemq/log是将activeMQ运行日志挂载到该目录
  • -e ACTIVEMQ_ADMIN_LOGIN=admin 指定登录名
  • -e ACTIVEMQ_ADMIN_PASSWORD=123456 登录密码
消息中间件ActiveMQ[通俗易懂]

访问ActiveMQ前台

默认前台端口8161

默认端口61616提供JMS服务

消息中间件ActiveMQ[通俗易懂]

访问ActiveMQ前台界面

Queues

消息中间件ActiveMQ[通俗易懂]

Queues

  • Name:消息队列的名称。
  • Number Of Pending Messages:未被消费的消息数目。
  • Number Of Consumers:消费者的数量。
  • Messages Enqueued:进入队列的消息 ;进入队列的总消息数目,包括已经被消费的和未被消费的。 这个数量只增不减。
  • Messages Dequeued:出了队列的消息,可以理解为是被消费掉的消息数量。在Queues里它和进入队列的总数量相等(因为一个消息只会被成功消费一次),如果暂时不等是因为消费者还没来得及消费。

通信方式


队列模式

其实就是分食模式。

比如生产方发送了10条消息到activeMQ服务器, 而此时有多个消费方,那么这些消费方就会瓜分这10条消息,一条消息只会被一个消费方得到。

主题模式

就是订阅模式。

比如生产方发了10条消息,而此时有多个消费方,那么多个消费方都能得到这10条消息,就如同订阅公众号那样。

区别

点对点(point-to-point,简称PTP)Queue消息传递模型:

在该消息传递模型下,一个消息生产者向消息服务器端一个特定的队列发送消息,一个消费者从该队列中读取消息。在这种模型下,消息生产者知道消息消费者的队列并直接将消息发送到消息消费者的队列。这种模型的特点是能够保证数据安全

发布/订阅(publish/subscribe,简称pub/sub)Topic消息传递模型:

在该消息传递模型下,一个消息发布者向一个特定的消息主题发布消息,0或多个对此消息主题感兴趣的并且处于活动状态的消息订阅者或者建立了持久订阅的消息订阅者才可以接收到所发布的消息。可能造成数据丢失

集成SpringBoot


项目目录结构

消息中间件ActiveMQ[通俗易懂]

项目目录结构

依赖

<dependency>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--连接池依赖-->
<dependency>
				<groupId>org.apache.activemq</groupId>
				<artifactId>activemq-pool</artifactId>
				<version>5.14.5</version>
</dependency>

IT知识分享网

Producer

目录结构

消息中间件ActiveMQ[通俗易懂]

目录结构

配置文件application.yml

IT知识分享网server: 
		port: 8080
spring: 
		activemq:
    		user: admin 
   		 	password: 123456
				broker-url: tcp://47.98.56.177:61616 
				pool: # 开启连接池
    				enabled: true
						max-connections: 10

queueName: publish.queue 
topicName: publish.topic

ActiveMQConfig.class

import org.apache.activemq.ActiveMQConnectionFactory; 
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value; 
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import javax.jms.Queue; 
import javax.jms.Topic;

@Configuration
public class ActiveMQConfig { 
  
  @Value("${queueName}") 
  private String queueName;
  
  @Value("${topicName}") 
  private String topicName;
  
  @Value("${spring.activemq.user}")
  private String usrName;
  
  @Value("${spring.activemq.password}") 
  private String password;
  
  @Value("${spring.activemq.broker-url}") 
  private String brokerUrl;
  
  @Bean
  public Queue queue(){
    return new ActiveMQQueue(queueName);
  }
  @Bean
  public Topic topic(){
    return new ActiveMQTopic(topicName);
  }
  @Bean
  public ActiveMQConnectionFactory connectionFactory() {
    return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
  }
  @Bean
  public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
    DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
    bean.setConnectionFactory(connectionFactory); return bean;
  }

@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
  DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
  //设置为发布订阅方式, 默认情况下使用的生产消费者方式
  bean.setPubSubDomain(true); bean.setConnectionFactory(connectionFactory); return bean;
  }
}

PublishController.class

IT知识分享网import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.jms.Queue; 
import javax.jms.Topic;
@RestController @RequestMapping("/publish") 
public class PublishController {
  @Autowired
  private JmsMessagingTemplate jms;
  
  @Autowired
  private Queue queue;
  
  @Autowired
  private Topic topic;
  
  @RequestMapping("/queue") public String queue(){
    for (int i = 0; i < 10 ; i++){ 
      jms.convertAndSend(queue, "queue"+i);
    }
    return "queue 发送成功";
  }
  
  @JmsListener(destination = "out.queue") 
  public void consumerMsg(String msg){
    System.out.println(msg);
  }
  
  @RequestMapping("/topic") 
  public String topic(){
    for (int i = 0; i < 10 ; i++){
      jms.convertAndSend(topic, "topic"+i);
    }
    return "topic 发送成功";
  }
}

Comsumer

目录结构

消息中间件ActiveMQ[通俗易懂]

Consumer目录结构

配置文件和配置类与Producer一样

QueueListener.class

@Component
public class QueueListener {
  
  @JmsListener(destination = "publish.queue", containerFactory = "jmsListenerContainerQueue")
  @SendTo("out.queue")
  public String receive(String text){
    System.out.println("QueueListener: consumer-b 收到一条信息: " + text); return "consumer-a received : " + text;
  }
  
}

TopicListenner.class

@Component
public class TopicListener {
  @JmsListener(destination = "publish.topic", containerFactory = "jmsListenerContainerTopic")
  public void receive(String text){
    System.out.println("TopicListener: consumer-b 收到一条信息: " + text);
  }
}

运行测试

Topic模式

消息中间件ActiveMQ[通俗易懂]

Topic模式测试

消息中间件ActiveMQ[通俗易懂]

测试信息1

消息中间件ActiveMQ[通俗易懂]

测试信息2

结论:两个消费者都接收到了相同多的消息

消息中间件ActiveMQ[通俗易懂]

结果统计

Queue模式

消息中间件ActiveMQ[通俗易懂]

queue模式测试

消息中间件ActiveMQ[通俗易懂]

queue测试结果1

结论:两个消费者平分了消息

消息中间件ActiveMQ[通俗易懂]

消息持久化


持久化机制

为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制,即:若MQ挂了,消息不会消失的机制 。

ActiveMQ的消息持久化机制JDBC、AMQ、KahaDB、LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。

在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或远程数据库等再试图将消息发送给接收者,成功则将消息从存储中删除,失败则继续尝试发送。消息中心启动后首先要检查指定的存储位置,若有未发送成功的消息,则需要把消息发送出去。

AMQ Message Store

AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特定,消息存储在一个个文件中,文件的默认大小为32M,当一个存储文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段这个文件被删除,AMQ适用于ActiveMQ5.3之前的版本。

KahaDB消息存储(默认)

KahaDB是基于日志文件的持久性数据库,是自ActiveMQ 5.4以来的默认存储机制,可用于任何场景,提高了性能和恢能力,它是基于文件的本地数据库存储形式。它已针对快速持久性进行了优化。 KahaDB使用较少的文件描述符,并提供比其前身AMQ消息存储更快的恢复。

消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址:(文件在apache- activemq\data\kahadb下)

消息中间件ActiveMQ[通俗易懂]

  1. db-1.log(主要存数据)是KahaDB存储消息到预定义大小的数据记录文件,文件命名为db-1.log,当数据文件已满时,一个新的文件会随之创建,number数值也会随之递增,它随之消息数量的增多,如每32M一个文件,文件名按照数字进行编号,如sb-1.log……当不再有引用到数据文件中的任何消息时,文件会被删除或归档。
  2. db.data文件包含了持久化的BTree索引,索引了消息数据记录中的消息,它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-.log里面存储的消息。
  3. db.free文件表示当前db.data文件哪些页面是空闲的,文件具体内容是所有空闲页的ID。
  4. db.redo文件是用来进行消息恢复,如果KahaDB消息存储在强制退出后启动,用于恢复BTree索引。
  5. lock文件表示当前获得KahaDB读写权限的broker。

使用

要使用KahaDB作为代理的持久性适配器,请按如下方式配置ActiveMQ(ActiveMQ.xml文件)(示例):

<broker brokerName="broker">
  <persistenceAdapter>
  		<kahaDB directory="activemq-data" journalMaxFileLength="32mb"/>
  </persistenceAdapter>
</broker>

LevelDB(适用ActiveMQ 5.8及更高版本)

该文件系统是从ActiveMQ5.8之后引进的,它和KahaDB很相似,也是基于文件的本地数据库存储形式,但是它提供比KahaDB更快的持久性,但它不再使用自定义B-Tree实现来索引预写日志,而是使用基于 LevelDB的索引。其索引具有几个不错的属性:

  • 快速更新(无需进行随机磁盘更新)
  • 并发读取
  • 使用硬链接快速索引快照
  • KahaDB和LevelDB存储都必须定期执行垃圾收集周期,以确定可以删除哪些日志文件。KahaDB由于增加了存储的数据量并且在收集发生时可能导致读/写停顿,因此可能非常慢。LevelDB存储使用更快的算法来确定何时收集日志文件并避免这些停顿。

可以将ActiveMQ配置为使用LevelDB作为其持久性适配器(ActiveMQ.xml文件) – 如下所示

<broker brokerName="broker" ... >
		...
		<persistenceAdapter>
		<levelDB directory="activemq-data"/>
		</persistenceAdapter>
		...
</broker>

JDBC消息存储

1、将原来的kshadb的持久化数据的方式更改为jdbc:

<persistenceAdapter>
		<!--<kahaDB directory="${activemq.data}/kahadb"/>-->
		<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/>
</persistenceAdapter>

注意:dataSource指定将要引用的持久化数据库的bean名称,createTablesOnStartup表示是否在启动时创建数据表,默认值为true,这样每次启动都会创建数据表,一般是第一次启动的时候设置为true之后改为false

2、添加mysql数据库的驱动jar包到activwmq/lib的文件夹下

注意:若使用第三方连接池或是连接器(例如c3p0、druid),应同时将其jar包添加到lib文件夹下

丢消息怎么办

  • 解决方案:用持久化消息可以使用对数据进行持久化JDBC,AMQ(日志文件),KahaDB和LevelDB,或者非持久化消息及时处理不要堆积,或者启动事务,启动事务后,commit()方法会负责任的等待服务器的返回,也就不会关闭连接导致消息丢失了。

持久化消息非常慢

  • 默认的情况下,非持久化的消息是异步发送的,持久化的消息是同步发送的,遇到慢一点的硬盘,发送消息的速度是无法忍受的。但是在开启事务的情况下,消息都是异步发送的,效率会有2个数量级的提升。所以在发送持久化消息时,请务必开启事务模式。其实发送非持久化消息时也建议开启事务,因为根本不会影响性能。

服务挂掉

  • 这得从ActiveMQ的储存机制说起。在通常的情况下,非持久化消息是存储在内存中的,持久化消息是存储在文件中的,它们的最大限制在配置文件的节点中配置。但是,在非持久化消息堆积到一定程度,内存告急的时候,ActiveMQ会将内存中的非持久化消息写入临时文件中,以腾出内存。虽然都保存到了文件里,但它和持久化消息的区别是,重启后持久化消息会从文件中恢复,非持久化的临时文件会直接删除

ActiveMQ【JMS的同步与异步】发送消息的方式有哪些

同步方式

两个通信应用服务之间必须要进行同步,两个服务之间必须都是正常运行的。发送程序和接收程序都必须一直处于运行状态,并且随时做好相互通信的准备。

发送程序首先向接收程序发起一个请求,称之为发送消息,发送程序紧接着就会堵塞当前自身的进程,不与其他应用进行任何的通信以及交互,等待接收程序的响应,待发送消息得到接收程序的返回消息之后会继续向下运行,进行下一步的业务处理。

异步方式

两个通信应用之间可以不用同时在线等待,任何一方只需各自处理自己的业务,比如发送方发送消息以后不用登录接收方的响应,可以接着处理其他的任务。也就是说发送方和接收方都是相互独立存在的,发送方只管方,接收方只能接收,无须去等待对方的响应。

Java中JMS就是典型的异步消息处理机制,JMS消息有两种类型:点对点、发布/订阅

重复消费和数据丢失


如何保证消息不被重复消费?

保证消息不被重复消费的关键是保证消息队列的幂等性,这个问题针对业务场景来答分以下几点:

1.比如,你拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。

2.再比如,你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。

3.如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

如何解决丢数据的问题?

1.生产者丢数据

生产者的消息没有投递到MQ中怎么办?从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。

transaction机制就是说,发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())。

然而缺点就是吞吐量下降了。因此,按照博主的经验,生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。

2.消息队列丢数据

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。

那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步

①、将queue的持久化标识durable设置为true,则代表是一个持久的队列

②、发送消息的时候将deliveryMode=2

这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入mirrored-queue即镜像队列,但也不能保证消息百分百不丢失(整个集群都挂掉)

3.消费者丢数据

启用手动确认模式可以解决这个问题

①自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断地被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试。

②手动确认模式,如果消费者来不及处理就死掉时,没有响应ack时会重复发送一条信息给其他消费者;如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常;如果对异常进行了捕获,但是没有在finally里ack,也会一直重复发送消息(重试机制)。

③不确认模式,acknowledge=”none” 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/7069.html

(0)
上一篇 2023-01-03 09:53
下一篇 2023-01-03 09:53

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

关注微信