消息队列Kafka、RabbitMQ

消息队列Kafka、RabbitMQDubbo 消费者发送调用后进入阻塞状态 这个状态表示改线程仍占用内存资源 但是什么动作都不做 如果生产者运行耗时较久 消费者就一直等待 如果消费者利用这个时间 那么可以处理更多请求 业务整体效率

大家好,欢迎来到IT知识分享网。

Dubbo远程调用的性能问题

Dubbo调用在微服务项目中普遍存在

这些Dubbo调用都是同步的

“同步”指:A(消费者)调用B(生产者)的服务A在发起调用后,在B返回之前只能等待

直到B返回结果后A才能运行

消息队列Kafka、RabbitMQ

订单减库

Dubbo消费者发送调用后进入阻塞状态,这个状态表示改线程仍占用内存资源,但是什么动作都不做

如果生产者运行耗时较久,消费者就一直等待,如果消费者利用这个时间,那么可以处理更多请求,业务整体效率

实际情况下,Dubbo有些必要的返回值必须等待,但是不必要等待的服务返回值,我们可以不等待去做别的事情

这种情况下我们就要使用消息队列

什么是消息队列

消息队列(Message Queue)简称MQ

消息队列是采用”异步(两个微服务项目并不需要同时完成请求)”的方式来传递数据完成业务操作流程的业务处理方式

消息队列的特征

消息队列Kafka、RabbitMQ

常见面试题:消息队列的特征

  • 利用异步的特性,提高服务器的运行效率,减少因为远程调用出现的线程等待\阻塞
  • 削峰填谷:在并发峰值超过当前系统处理能力时,我们将没处理的信息保存在消息队列中,在后面出现的较闲的时间中去处理,直到所有数据依次处理完成,能够防止在并发峰值时短时间大量请求而导致的系统不稳定
  • 消息队列的延时:因为是异步执行,请求的发起者并不知道消息何时能处理完,如果业务不能接收这种延迟,就不要使用消息队列

常见消息队列

  • Kafka:性能好\功能弱:适合大数据量,高并发的情况,大数据领域使用较多
  • RabbitMQ:功能强\性能一般:适合发送需求复杂的消息队列,java业务中使用较多
  • RocketMQ:阿里的
  • ActiveMQ:前几年流行的,老项目可能用到

常见面试题:消息队列异常处理

如果我们真的将上面生成订单业务里,减少库存的操作从正常流程中剥离到消息队列

那么如果库存减少过程中发送异常,就不能由Seata接收了,因为异步的处理无法和Seata通信

意思是如果使用了消息队列,队列中处理数据过程发送异常,那么就要用特殊的方法处理问题

处理方式就是手写代码进行回滚,一般情况就是在stock,模块再向order模块发送消息,order模块接收到消息后进行进一步处理

如果order模块进一步处理时又发生异常,我们可以再向一个实现设置好的消息队列中发送消息

这个消息队列没有处理者,我们称之为”死信队列”,一个正常运行的程序,会定期有人处理死信队列信息

Kafka

什么是Kafka

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。Kafka最初是由LinkedIn开发,并随后于2011年初开源。

kafka软件结构

消息队列Kafka、RabbitMQ

Kafka Cluster(Kafka集群)

Partition(分片)

Producer:消息的发送方,也就是消息的来源,Kafka中的生产者

order就是消息的发送方

Consumer:消息的接收方,也是消息的目标,Kafka中的消费者

​ stock就是消息的接收方法

Topic:话题或主题的意思,消息的收发双方要依据同一个话题名称,才不会将信息错发给别人

Record:消息记录,就是生产者和消费者传递的信息内容,保存在指定的Topic中

Kafka的特征与优势

Kafka作为消息队列,它和其他同类产品相比,突出的特点就是性能强大

Kafka将消息队列中的信息保存在硬盘中

Kafka对硬盘的读取规则进行优化后,效率能够接近内存

硬盘的优化规则主要依靠”顺序读写,零拷贝,日志压缩等技术”

Kafka处理队列中数据的默认设置:

  • Kafka队列信息能够一直向硬盘中保存(理论上没有大小限制)
  • Kafka默认队列中的信息保存7天,可以配置这个时间,缩短这个时间可以减少Kafka的磁盘消耗

Kafka的安装和配置

最好将我们kafka软件的解压位置设置在一个根目录

然后路径不要有空格和中文

消息队列Kafka、RabbitMQ

文件位置

我们要创建一个空目录用于保存Kafka运行过程中产生的数据

本次创建名称为data的空目录

下面开始配置启动信息

先到G:\kafka\config下配置有文件zookeeper.properties

找到dataDir属性修改如下

dataDir=G:/data

注意G盘和data文件夹名称,匹配自己电脑的真实路径和文件夹名称

还要修改server.properties配置文件

log.dirs=G:/data

修改注意事项和上面相同

Zookeeper简介

我们在启动kafka前必须先启动Zookeeper

zoo:动物园

keeper:园长

可以引申为管理动物的人

早期,每个服务器系统中的软件在安装后运行都需要一些配置

那么软件多了,配置会比较复杂

我们使用Zookeeper之后,可以创建一个新的管理各种软件配置的文件管理系统

在Zookeeper中,可以修改服务器系统中的所有软件配置

长此以往,很多软件就删除了自己写配置文件的功能,而直接从Zookeeper中获取

Kafka就是需要将配置编写在Zookeeper中的软件之一

Kafka的启动

启动Zookeeper

进入路径G:\kafka\bin\windows

输入cmd进入dos命令行

G:\kafka\bin\windows>zookeeper-server-start.bat ..\..\config\zookeeper.properties

启动kafka

总体方式一样,输入不同指令

G:\kafka\bin\windows>kafka-server-start.bat ..\..\config\server.properties

附录

Mac系统启动Kafka服务命令(参考):

# 进入Kafka文件夹 cd Documents/kafka_2.13-2.4.1/bin/ # 动Zookeeper服务 ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties # 启动Kafka服务 ./kafka-server-start.sh -daemon ../config/server.properties 

Mac系统关闭Kafka服务命令(参考):

# 关闭Kafka服务 ./kafka-server-stop.sh # 启动Zookeeper服务 ./zookeeper-server-stop.sh

在启动kafka时有一个常见错误

wmic不是内部或外部命令

这样的提示,需要安装wmic命令,安装方式参考百度查找

Kafka使用Demo

不要关闭Zookeeper和Kafka的dos窗口

我们再csmall项目中编写一个简单的Demo学习Kafka的使用

在csmall-cart-webapi模块中

添加依赖

<dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>

修改yml文件配置

spring: kafka: # 定义kafka的位置 bootstrap-servers: localhost:9092 # 话题的分组名称,是必须配置的 # 为了区分当前项目和其他项目使用的,防止了不同项目相同话题的干扰或错乱 # 本质是在话题名称前添加项目名称为前缀来防止的 consumer: group-id: csmall

SpringBoot启动类中添加注解

@SpringBootApplication @EnableDubbo // 启动kafka的功能 @EnableKafka // 为了测试kafka,我们可以周期性的发送消息到消息队列 // 使用SpringBoot自带的调度工具即可 @EnableScheduling public class CsmallCartWebapiApplication { public static void main(String[] args) { SpringApplication.run(CsmallCartWebapiApplication.class, args); } }

下面我们就可以实现周期性的向kafka发送消息并接收的操作了

编写消息的发送

cart-webapi包下创建kafka包

包中创建Producer类来发送消息

// 我们需要周期性的向Kafka发送消息 // 需要将具备SpringBoot调度功能的类保存到Spring容器才行 @Component public class Producer { // 能够实现将消息发送到Kafka的对象 // 只要Kafka配置正确,这个对象会自动保存到Spring容器中,我们直接装配即可 // KafkaTemplate<[话题名称的类型],[传递消息的类型]> @Autowired private KafkaTemplate<String,String> kafkaTemplate; // 每隔10秒向Kafka发送信息 int i=1; // fixedRate是周期运行,单位毫秒 10000ms就是10秒 @Scheduled(fixedRate = 10000) // 这个方法只要启动SpringBoot项目就会按上面设置的时间运行 public void sendMessage(){ // 实例化一个Cart类型对象,用于发送消息 Cart cart=new Cart(); cart.setId(i++); cart.setCommodityCode("PC100"); cart.setPrice(RandomUtils.nextInt(100)+200); cart.setCount(RandomUtils.nextInt(5)+1); cart.setUserId("UU100"); // 将cart对象转换为json格式字符串 Gson gson=new Gson(); // 执行转换 String json=gson.toJson(cart); System.out.println("本次发送的消息为:"+json); // 执行发送 // send([话题名称],[发送的消息]),需要遵循上面kafkaTemplate声明的泛型类型 kafkaTemplate.send("myCart",json); } }

创建一个叫Consumer的类来接收消息

// 因为Kafka接收消息是自动的,所以这个类也必须交由Spring容器管理0 @Component public class Consumer { // SpringKafka框架接收Kafka中的消息使用监听机制 // SpringKafka框架提供一个监听器,专门负责关注指定的话题名称 // 只要该话题名称中有消息,会自动获取该消息,并调用下面方法 @KafkaListener(topics = "myCart") // 上面注解和下面方法关联,方法的参数就是接收到的消息 public void received(ConsumerRecord<String,String> record){ // 方法参数类型必须是ConsumerRecord // ConsumerRecord<[话题名称类型],[消息类型]> // 获取消息内容 String json=record.value(); // 要想在java中使用,需要转换为java对象 Gson gson=new Gson(); // 将json转换为java对象,需要提供转换目标类型的反射 Cart cart=gson.fromJson(json,Cart.class); System.out.println("接收到对象为:"+cart); } }

RabbitMQ

什么是RabbitMQ

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。 AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。 RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ特征

1.可靠性(Reliability) RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

2.灵活的路由(Flexible Routing) 在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

3.消息集群(Clustering)多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker

4.高可用(Highly Available Queues) 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

5.多种协议(Multi-protocol) RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。

6.多语言客户端(Many Clients) RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。

7.管理界面(Management UI) RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

8.跟踪机制(Tracing) 如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

9.插件机制(Plugin System) RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

下载软件

RabbitMQ是Erlang语言开发的,所以要先安装Erlang语言的运行环境

下载Erlang的官方路径

https://erlang.org/download/otp_versions_tree.html

消息队列Kafka、RabbitMQ

安装的话就是双击

不要安装在中文路径和有空格的路径下!!!

下载RabbitMQ

https://www.rabbitmq.com/install-windows.html

消息队列Kafka、RabbitMQ

下载地址

安装也是双击即可

不要安装在中文路径和有空格的路径下!!!

RabbitMQ的结构

消息队列Kafka、RabbitMQ

结构图

和Kafka不同,Kafka是使用话题名称来收发信息,结构简单

RabbitMQ是使用交换机\路由key指定要发送消息的队列

消息的发送者发送消息时,需要指定交换机和路由key名称

消息的接收方接收消息时,只需要指定队列的名称

在编写代码上,相比于Kafka,每个业务要编写一个配置类

这个配置类中要绑定交换机和路由key的关系,以及路由Key和队列的关系

配置Erlang的环境变量

要想运行RabbitMQ必须保证系统有Erlang的环境变量

配置Erlang环境变量

把安装Erlang的bin目录配置在环境变量Path的属性中

消息队列Kafka、RabbitMQ

环境变量

启动RabbitMQ

找到RabbitMQ的安装目录

可能是:

G:\pgm\rabbit\rabbitmq_server-3.10.1\sbin

具体路径根据自己的情况寻找

地址栏运行cmd

输入启动指令如下

G:\pgm\rabbit\rabbitmq_server-3.10.1\sbin>rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins enable rabbitmq_management

结果如下

消息队列Kafka、RabbitMQ

命令结果

运行完成后

可以在Window任务管理器中的服务选项卡里找到RabbitMQ的服务(Ctrl+Shift+ESC)

另外的验证方法:

打开浏览器访问http://localhost:15672

登录界面用户名密码

guest

guest

登录成功后看到RabbitMQ运行的状态

如果启动失败,需要重新安装

参考路径如下

https://baijiahao.baidu.com/s?id=&wfr=spider&for=pc

利用RabbitMQ完成消息的收发

csmall-stock-webapi项目中测试RabbitMQ

可以利用之前我们使用Quartz实现的每隔一段时间输出当前日期信息的方法改为发送消息

添加依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

yml文件配置

spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: /

交换机\路由Key\队列的配置类

RabbitMQ要求我们再java代码级别设置交换机\路由Key\队列的关系

我们再quartz包下,创建config包

包中创建配置信息类

// SpringBoot整合RabbitMQ之后 // 这些配置信息要保存在Spring容器中,所以这些配置也要交给SpringBoot管理 @Configuration public class RabbitMQConfig { // 声明需要使用的交换机\路由Key\队列的名称 public static final String STOCK_EX="stock_ex"; public static final String STOCK_ROUT="stock_rout"; public static final String STOCK_QUEUE="stock_queue"; // 声明交换机,需要几个声明几个,这里就一个 // 方法中实例化交换机对象,确定名称,保存到Spring容器 @Bean public DirectExchange stockDirectExchange(){ return new DirectExchange(STOCK_EX); } // 声明队列,需要几个声明几个,这里就一个 // 方法中实例化队列对象,确定名称,保存到Spring容器 @Bean public Queue stockQueue(){ return new Queue(STOCK_QUEUE); } // 声明路由Key(交换机和队列的关系),需要几个声明几个,这里就一个 // 方法中实例化路由Key对象,确定名称,保存到Spring容器 @Bean public Binding stockBinding(){ return BindingBuilder.bind(stockQueue()).to(stockDirectExchange()) .with(STOCK_ROUT); } }

RabbitMQ发送消息

我们再QuartzJob类中输出时间的代码后继续编写代码

实现RabbitMQ消息的发送

public class QuartzJob implements Job { // RabbitTemplate就是amqp框架提供的发送消息的对象 @Autowired private RabbitTemplate rabbitTemplate; @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { //输出当前时间 System.out.println("--------"+ LocalDateTime.now() +"-------"); // 先简单的发送一个字符串 rabbitTemplate.convertAndSend(RabbitMQConfig.STOCK_EX, RabbitMQConfig.STOCK_ROUT,"接收到减少库存的消息"); } }

我们可以通过修改QuartzConfig类中的Cron表达式修改调用的周期

CronScheduleBuilder cronScheduleBuilder= CronScheduleBuilder.cronSchedule("0/10 * * * * ?");

接收RabbitMQ的消息

quartz包下再创建一个新的类用于接收信息

RabbitMQConsumer代码如下

// 这个对象也是需要交由Spring容器管理的,才能实现监听Spring容器中保存的队列的效果 @Component // 和Kafka不同的是Kafka在一个方法上声明监听器 // 而RabbitMQ是在类上声明,监听具体的队列名称 @RabbitListener(queues = {RabbitMQConfig.STOCK_QUEUE}) public class RabbitMQConsumer { // 监听了类,但是运行代码的一定是个方法 // 框架要求这个类中只允许一个方法包含下面这个注解 // 表示这个方法是处理消息的方法 // 方法的参数就是消息的值 @RabbitHandler public void process(String str){ System.out.println("接收到的消息为:"+str); } }

启动Nacos\RabbitMQ\Seata

启动stock-webapi

根据Cron表达式,消息会在0/10/20/30/40/50秒数时运行

测试成功表示一切正常

学习记录,如有侵权请联系删除

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/97788.html

(0)
上一篇 2024-10-06 20:15
下一篇 2024-12-15 08:45

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信