Springboot 2.x集成kafka 2.2.0的方法
导读:本文共3759字符,通常情况下阅读需要13分钟。同时您也可以点击右侧朗读,来听本文内容。按键盘←(左) →(右) 方向键可以翻页。
摘要: 引言kafka近几年更新非常快,也可以看出kafka在企业中是用的频率越来越高,在springboot中集成kafka还是比较简单的,但是应该注意使用的版本和kafka中基本配置,这个地方需要信心,防止进入坑中。基本环境springboot版本2.1.4kafka版本2.2.0jdk 1.8代码编写1、基本引用pom<?xmlversion="1... ...
音频解说
目录
(为您整理了一些要点),点击可以直达。引言
kafka近几年更新非常快,也可以看出kafka在企业中是用的频率越来越高,在springboot中集成kafka还是比较简单的,但是应该注意使用的版本和kafka中基本配置,这个地方需要信心,防止进入坑中。
基本环境
springboot版本2.1.4
kafka版本2.2.0
jdk 1.8
代码编写
1、基本引用pom
<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> <relativePath/><!--lookupparentfromrepository--> </parent> <groupId>com.example</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafkademo</name> <description>DemoprojectforSpringBoot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.0.RELEASE</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.7</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build></project>
2、基本配置
spring.kafka.bootstrap-servers=2.1.1.1:9092spring.kafka.consumer.group-id=test-consumer-groupspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#logging.level.root=debug
3、实体类
packagecom.example.demo.model;importjava.util.Date;publicclassMessages{privateLongid;privateStringmsg;privateDatesendTime;publicLonggetId(){returnid;}publicvoidsetId(Longid){this.id=id;}publicStringgetMsg(){returnmsg;}publicvoidsetMsg(Stringmsg){this.msg=msg;}publicDategetSendTime(){returnsendTime;}publicvoidsetSendTime(DatesendTime){this.sendTime=sendTime;}}
4、生产者端
packagecom.example.demo.service;importcom.example.demo.model.Messages;importcom.google.gson.Gson;importcom.google.gson.GsonBuilder;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.support.SendResult;importorg.springframework.stereotype.Service;importorg.springframework.util.concurrent.ListenableFuture;importjava.util.Date;importjava.util.UUID;@ServicepublicclassKafkaSender{@AutowiredprivateKafkaTemplate<String,String>kafkaTemplate;privateGsongson=newGsonBuilder().create();publicvoidsend(){Messagesmessage=newMessages();message.setId(System.currentTimeMillis());message.setMsg("123");message.setSendTime(newDate());ListenableFuture<SendResult<String,String>>test0=kafkaTemplate.send("newtopic",gson.toJson(message));}}
5、消费者
packagecom.example.demo.service;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Service;importjava.util.Optional;@ServicepublicclassKafkaReceiver{@KafkaListener(topics={"newtopic"})publicvoidlisten(ConsumerRecord<?,?>record){Optional<?>kafkaMessage=Optional.ofNullable(record.value());if(kafkaMessage.isPresent()){Objectmessage=kafkaMessage.get();System.out.println("record="+record);System.out.println("message="+message);}}}
6、测试
在启动方法中模拟消息生产者,向kafka中发送消息
packagecom.example.demo;importcom.example.demo.service.KafkaSender;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;importorg.springframework.context.ConfigurableApplicationContext;@SpringBootApplicationpublicclassKafkademoApplication{ publicstaticvoidmain(String[]args){ ConfigurableApplicationContextcontext=SpringApplication.run(KafkademoApplication.class,args); KafkaSendersender=context.getBean(KafkaSender.class); for(inti=0;i<1000;i++){ sender.send(); try{ Thread.sleep(300); }catch(InterruptedExceptione){ e.printStackTrace(); } } }}
效果展示
命令行直接消费消息
遇到的问题
生产端连接kafka超时
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119)
解决方案:
修改kafka中的server.properties中的下面配置,将原来的默认配置替换成下面ip+端口的形式,重启kafka
</div> <div class="zixun-tj-product adv-bottom"></div> </div> </div> <div class="prve-next-news">
本文:
Springboot 2.x集成kafka 2.2.0的方法的详细内容,希望对您有所帮助,信息来源于网络。