Springboot 2.x集成kafka 2.2.0的方法(kafka,springboot,开发技术)

时间:2024-05-02 15:08:16 作者 : 石家庄SEO 分类 : 开发技术
  • TAG :

    引言

    kafka近几年更新非常快,也可以看出kafka在企业中是用的频率越来越高,在springboot中集成kafka还是比较简单的,但是应该注意使用的版本和kafka中基本配置,这个地方需要信心,防止进入坑中。

    基本环境

    springboot版本2.1.4

    kafka版本2.2.0

    jdk 1.8

    代码编写

    1、基本引用pom

    <?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> <relativePath/><!--lookupparentfromrepository--> </parent> <groupId>com.example</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafkademo</name> <description>DemoprojectforSpringBoot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.0.RELEASE</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.7</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build></project>

    2、基本配置

    spring.kafka.bootstrap-servers=2.1.1.1:9092spring.kafka.consumer.group-id=test-consumer-groupspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#logging.level.root=debug

    3、实体类

    packagecom.example.demo.model;importjava.util.Date;publicclassMessages{privateLongid;privateStringmsg;privateDatesendTime;publicLonggetId(){returnid;}publicvoidsetId(Longid){this.id=id;}publicStringgetMsg(){returnmsg;}publicvoidsetMsg(Stringmsg){this.msg=msg;}publicDategetSendTime(){returnsendTime;}publicvoidsetSendTime(DatesendTime){this.sendTime=sendTime;}}

    4、生产者端

    packagecom.example.demo.service;importcom.example.demo.model.Messages;importcom.google.gson.Gson;importcom.google.gson.GsonBuilder;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.support.SendResult;importorg.springframework.stereotype.Service;importorg.springframework.util.concurrent.ListenableFuture;importjava.util.Date;importjava.util.UUID;@ServicepublicclassKafkaSender{@AutowiredprivateKafkaTemplate<String,String>kafkaTemplate;privateGsongson=newGsonBuilder().create();publicvoidsend(){Messagesmessage=newMessages();message.setId(System.currentTimeMillis());message.setMsg("123");message.setSendTime(newDate());ListenableFuture<SendResult<String,String>>test0=kafkaTemplate.send("newtopic",gson.toJson(message));}}

    5、消费者

    packagecom.example.demo.service;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Service;importjava.util.Optional;@ServicepublicclassKafkaReceiver{@KafkaListener(topics={"newtopic"})publicvoidlisten(ConsumerRecord<?,?>record){Optional<?>kafkaMessage=Optional.ofNullable(record.value());if(kafkaMessage.isPresent()){Objectmessage=kafkaMessage.get();System.out.println("record="+record);System.out.println("message="+message);}}}

    6、测试

    在启动方法中模拟消息生产者,向kafka中发送消息

    packagecom.example.demo;importcom.example.demo.service.KafkaSender;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;importorg.springframework.context.ConfigurableApplicationContext;@SpringBootApplicationpublicclassKafkademoApplication{ publicstaticvoidmain(String[]args){ ConfigurableApplicationContextcontext=SpringApplication.run(KafkademoApplication.class,args); KafkaSendersender=context.getBean(KafkaSender.class); for(inti=0;i<1000;i++){ sender.send(); try{ Thread.sleep(300); }catch(InterruptedExceptione){ e.printStackTrace(); } } }}

    效果展示

    Springboot 2.x集成kafka 2.2.0的方法

    命令行直接消费消息

    Springboot 2.x集成kafka 2.2.0的方法

    遇到的问题

    生产端连接kafka超时

    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119)

    解决方案:

    修改kafka中的server.properties中的下面配置,将原来的默认配置替换成下面ip+端口的形式,重启kafka

    Springboot 2.x集成kafka 2.2.0的方法

     </div> <div class="zixun-tj-product adv-bottom"></div> </div> </div> <div class="prve-next-news">
    本文:Springboot 2.x集成kafka 2.2.0的方法的详细内容,希望对您有所帮助,信息来源于网络。
    上一篇:asp.net怎么使用WebAPI和EF框架结合实现数据的基本操作下一篇:

    15 人围观 / 0 条评论 ↓快速评论↓

    (必须)

    (必须,保密)

    阿狸1 阿狸2 阿狸3 阿狸4 阿狸5 阿狸6 阿狸7 阿狸8 阿狸9 阿狸10 阿狸11 阿狸12 阿狸13 阿狸14 阿狸15 阿狸16 阿狸17 阿狸18