大家好,欢迎来到IT知识分享网。
- 目标
目前项目支持的kafka的版本是0.9.0.1,另外一个项目的kafka支持的版本是2.1.2,怎么用一套kafka代码的client端来适配两个版本的broker了?
- 分析
通过查阅kafka官方发行版本(https://kafka.apache.org/downloads)
以及kafka兼容列表https://spring.io/projects/spring-kafka
kafka版本 |
scala版本 |
发布日期 |
Spring for Apache Kafka版本 |
使用银行 |
0.9.0.1 |
2.11 |
2016年2月19日 |
2.2.7 |
A银行 |
2.1.2 |
2.11 |
2019年2月15日 |
B农商行 |
|
3.2.0(最新版本) |
2.13 |
2022年7月16日 |
Spring for Apache Kafka Version |
Spring Integration for Apache Kafka Version |
kafka-clients |
Spring Boot |
3.0.x (pre release) |
6.0.x |
3.2.0 |
3.0.x |
2.9.x (pre release) |
5.5.x |
3.2.0 |
2.7.x (not managed) |
2.8.x |
5.5.x |
3.0.0 |
2.6.x or 2.7.x (not managed) |
2.7.x |
5.5.x |
2.7.0 – 2.8.1 |
2.4.x or 2.5.x (End of Life) |
2.6.x |
5.3.x or 5.4.x |
2.6.0 – 2.8.1 |
2.3.x or 2.4.x (End of Life) |
2.5.x |
3.3.x |
2.5.1 – 2.8.1 |
2.3.x (End of Life) |
2.4.x |
3.2.x |
2.4.1 |
2.2.x (End of Life) |
2.3.x |
3.2.x |
2.3.1 |
2.2.x (End of Life) |
2.2.x |
3.1.x |
2.0.1, 2.1.x, 2.2.x |
2.1.x (End of Life) |
2.1.x |
3.0.x |
1.0.2 |
2.0.x (End of Life) |
1.3.x |
2.3.x |
0.11.0.x, 1.0.x |
1.5.x (End of Life |
- 搭建测试环境
官网下载kafka的版本解压即可
- 启动zookeeper
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
- 启动kafka
bin\windows\kafka-server-start.bat config\server.properties
- 创建topic.test主题(linux)
bin/kafka-topics.sh –create –topic topic.test –replication-factor 1 –partitions 1 –zookeeper localhost:2181
- whindows下使用下面的语句创建topic.test主题
bin\windows\kafka-topics.bat –create –topic topic.test –replication-factor 1 –partitions 1 –zookeeper localhost:2181
- 查看主题列表
bin\windows\kafka-topics.bat –zookeeper localhost:2181 –list
- 消费者监听
bin\windows\kafka-console-consumer.bat –zookeeper localhost:2181 –topic topic.test
- 生产者发送
bin\windows\kafka-console-producer.bat –broker-list localhost:9092 –topic topic.test
- 测试结果
使用spring-kafka2.8.7连接kafka_2.11-0.9.0.1出现如下错误。
14:17:03.824 logback [kafka-producer-network-thread | producer-1] INFO o.apache.kafka.clients.NetworkClient – [Producer clientId=producer-1] Disconnecting from node -1 due to request timeout.
14:17:03.844 logback [kafka-producer-network-thread | producer-1] INFO o.apache.kafka.clients.NetworkClient – [Producer clientId=producer-1] Cancelled in-flight API_VERSIONS request with correlation id 107 due to node -1 being disconnected (elapsed time since creation: 30007ms, elapsed time since send: 30007ms, request timeout: 30000ms)
14:17:03.845 logback [kafka-producer-network-thread | producer-1] WARN o.apache.kafka.clients.NetworkClient – [Producer clientId=producer-1] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
使用spring-kafka2.8.7连接kafka_2.11-2.1.1,一切正常
1、低版本的Producer可以向高版本、低版本的Broker服务器发送msg;高版本的Producer只能向高版本的Broker服务器发送msg。
2、对于Consumer 来说,高版本的Consumer只能消费高版本Broker服务器消息;低版本只能消费低版本Broker服务器消息。
在Kafka 0.10.2.0之前,Kafka服务器端和客户端版本之间的兼容性是“单向”的,即高版本的broker可以处理低版本client的请求。反过来,低版本的broker不能处理高版本client的请求。由于升级client要远比升级broker简单得多,因此这个限制给很多用户带来了麻烦,甚至有很多人都不愿意去升级broker版本。
自0.10.2.0版本开始,社区对这个问题进行了优化——对于低版本broker + 高版本client(0.10.2.0)的环境而言,现在用户可以运行命令先查看当前broker支持的协议版本,然后再选择broker支持的最高版本封装请求即可。命令格式如下(在client端运行该命令):
针对Spring-kafka的1.0.0.RELEASE版本到1.0.6.RELEASE版本,缺少类,无法允许
Caused by: java.lang.ClassNotFoundException: org.springframework.kafka.support.converter.BatchMessageConverter
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
… 26 common frames omitted
所以条件允许的情况下请直接使用kafka-clients而不是spring-kafka,尤其是低版本的kafka。
- 解决方法
- 配置文件 application.yml
kafka: version: 0.9.0.1 #add by dengkaidi bootstrap-servers: 127.0.0.1:9092 producer: # 发生错误后,消息重发的次数。 retries: 0 #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。 batch-size: 16384 # 设置生产者内存缓冲区的大小。 buffer-memory: # 键的序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。 # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。 # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 acks: 1 linger: 1 consumer: # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D auto-commit-interval: 1 # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录) # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录 auto-offset-reset: latest # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 enable-auto-commit: false # 键的反序列化方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化方式 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer group-id: cs-consumer-queue_local concurrency: 10 auto: offset: reset: latest session: timeout: 6000 listener: # 在侦听器容器中运行的线程数。 concurrency: 2 #listner负责ack,每调用一次,就立即commit ack-mode: manual_immediate missing-topics-fatal: false
- 消费者配置类:KafkaConsumerConfig
package com.example.springboot.config; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaConsumerConfig { @Value("${kafka.bootstrap-servers}") private String servers; @Value("${kafka.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto-commit-interval}") private String autoCommitInterval; @Value("${kafka.consumer.group-id}") private String groupId; @Value("${kafka.consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; @Bean public Consumer<String, String> consumer() { return new KafkaConsumer<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(8); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return propsMap; } }
- 生产者配置类:KafkaProducerConfig
package com.example.springboot.config; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaProducerConfig { @Value("${kafka.bootstrap-servers}") private String servers; @Value("${kafka.producer.retries}") private int retries; @Value("${kafka.producer.batch-size}") private int batchSize; @Value("${kafka.producer.linger}") private int linger; @Value("${kafka.producer.buffer-memory}") private int bufferMemory; @Bean public Producer<String, String> producer() { return new KafkaProducer<>(producerConfigs()); } public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } }
- spring boot监听器ConsumerListener
package com.example.springboot; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; @Component public class ConsumerListener implements ApplicationListener<ApplicationReadyEvent> { @Autowired private Consumer<String, String> consumer; @Override public void onApplicationEvent(ApplicationReadyEvent event) { while (true) { List<String> topics = new ArrayList<>(); topics.add("topic.test"); consumer.subscribe(topics); for (int i = 0; i < 2; i++) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.println("record------->" + record); //consumer.seekToBeginning(new TopicPartition(record.topic(), record.partition())); } } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } } } }
- Spring boot控制类KafkaController
package com.example.springboot; import com.example.springboot.config.KafkaProducerConfig; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/Kafka") public class KafkaController { @Autowired private Producer<String, String> produce; @GetMapping("/send") public void sendMsg() { // kafkaProducer.send("this is a test kafka topic message"); produce.send(new ProducerRecord<String, String>("topic.test", "World"), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { e.printStackTrace(); } else { System.out.println("metadata------>" + metadata.toString());//org.apache.kafka.clients.producer.RecordMetadata@1d89e2b5 System.out.println("metadata offset------>" + metadata.offset());//1 } } }); } }
- Spring boot启动主类
package com.example.springboot; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ApplicationContext; @SpringBootApplication public class Application { // @Autowired // private Consumer<String, String> consumer; public static void main(String[] args) { ApplicationContext ctx = SpringApplication.run(Application.class, args); System.out.println("Let's inspect the beans provided by Spring Boot:"); String[] beanNames = ctx.getBeanDefinitionNames(); Arrays.sort(beanNames); // for (String beanName : beanNames) // { // System.out.println(beanName); // } // SpringApplication springApplication = new SpringApplication(Application.class); springApplication.addListeners(new ConsumerListener()); } }
- gradle构建脚本build.gradle
plugins { id 'org.springframework.boot' version '2.7.1' id 'io.spring.dependency-management' version '1.0.11.RELEASE' id 'java' } group = 'com.example' version = '0.0.1-SNAPSHOT' sourceCompatibility = '1.8' repositories { maven { url 'https://maven.aliyun.com/repository/public/' } dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.apache.kafka:kafka-clients:0.9.0.1' implementation 'com.alibaba:fastjson:2.0.9' } mavenLocal() mavenCentral() } test { useJUnitPlatform() }
也就是使用低版本org.apache.kafka:kafka-clients:0.9.0.1的客户端可以兼容高版本的broker。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/54991.html