springboot如何整合mqtt(mqtt,springboot,开发技术)

时间:2024-04-29 13:52:24 作者 : 石家庄SEO 分类 : 开发技术
  • TAG :

springboot 整合 mqtt

搭建的时候如果你使用的是集群 记得开放以下端口:

springboot如何整合mqtt

好了, 搭建成功下一步就是我们的java程序要与mqtt连接, 这里有两种方式(其实不止两种)进行连接.

一是 直接使用 MQTT Java 客户端库

二是使用 spring integration mqtt也是比较推荐的一种,也是我们主讲这种.

第一步 添加 maven dependency

<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.14</version></dependency>

第二步 添加配置

1 先写好一些基本配置

mqtt:username:test#账号password:123456#密码host-url:tcp://127.0.0.1:1883#mqtt连接tcp地址in-client-id:${random.value}#随机值,使出入站clientID不同out-client-id:${random.value}client-id:${random.int}#客户端Id,不能相同,采用随机数${random.value}default-topic:test/#,topic/+/+/up#默认主题timeout:60#超时时间keepalive:60#保持连接clearSession:true#清除会话(设置为false,断开连接,重连后使用原来的会话保留订阅的主题,能接收离线期间的消息)

2.然后写一个对应的类MqttProperties

importorg.springframework.beans.factory.annotation.Value;importorg.springframework.stereotype.Component;/***MqttProperties**@authorhengzi*@date2022/8/23*/@ComponentpublicclassMqttProperties{/***用户名*/@Value("${mqtt.username}")privateStringusername;/***密码*/@Value("${mqtt.password}")privateStringpassword;/***连接地址*/@Value("${mqtt.host-url}")privateStringhostUrl;/***进-客户Id*/@Value("${mqtt.in-client-id}")privateStringinClientId;/***出-客户Id*/@Value("${mqtt.out-client-id}")privateStringoutClientId;/***客户Id*/@Value("${mqtt.client-id}")privateStringclientId;/***默认连接话题*/@Value("${mqtt.default-topic}")privateStringdefaultTopic;/***超时时间*/@Value("${mqtt.timeout}")privateinttimeout;/***保持连接数*/@Value("${mqtt.keepalive}")privateintkeepalive;/**是否清除session*/@Value("${mqtt.clearSession}")privatebooleanclearSession; //...getterandsetter}

接下来就是配置一些乱七八糟的东西, 这里有很多概念性的东西 比如 管道channel, 适配器 adapter, 入站Inbound, 出站Outbound,等等等等, 看起来是非常头痛的

好吧,那就一个一个来,

首先连接mqtt需要一个客户端, 那么我们就开一个客户端工厂, 这里可以产生很多很多的客户端

@BeanpublicMqttPahoClientFactorymqttPahoClientFactory(){DefaultMqttPahoClientFactoryfactory=newDefaultMqttPahoClientFactory();MqttConnectOptionsoptions=newMqttConnectOptions();options.setServerURIs(mqttProperties.getHostUrl().split(","));options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());factory.setConnectionOptions(options);returnfactory;}

然后再搞两根管子(channel),一个出站,一个入站

//出站消息管道,@BeanpublicMessageChannelmqttOutboundChannel(){returnnewDirectChannel();}//入站消息管道@BeanpublicMessageChannelmqttInboundChannel(){returnnewDirectChannel();}

为了使这些管子能流通 就需要一个适配器(adapter)

//Mqtt管道适配器@BeanpublicMqttPahoMessageDrivenChannelAdapteradapter(MqttPahoClientFactoryfactory){returnnewMqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));}

然后定义消息生产者

//消息生产者@BeanpublicMessageProducermqttInbound(MqttPahoMessageDrivenChannelAdapteradapter){adapter.setCompletionTimeout(5000);adapter.setConverter(newDefaultPahoMessageConverter());//入站投递的通道adapter.setOutputChannel(mqttInboundChannel());adapter.setQos(1);returnadapter;}

那我们收到消息去哪里处理呢,答案是这里:

@Bean//使用ServiceActivator指定接收消息的管道为mqttInboundChannel,投递到mqttInboundChannel管道中的消息会被该方法接收并执行@ServiceActivator(inputChannel="mqttInboundChannel")publicMessageHandlerhandleMessage(){ //这个mqttMessageHandle其实就是一个MessageHandler的实现类(这个类我放下面)returnmqttMessageHandle; //你也可以这样写//returnnewMessageHandler(){//@Override//publicvoidhandleMessage(Message<?>message)throwsMessagingException{////dosomething//}//};

到这里我们其实已经可以接受到来自mqtt的消息了

接下来配置向mqtt发送消息

配置 出站处理器

//出站处理器@Bean@ServiceActivator(inputChannel="mqttOutboundChannel")publicMessageHandlermqttOutbound(MqttPahoClientFactoryfactory){MqttPahoMessageHandlerhandler=newMqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);handler.setAsync(true);handler.setConverter(newDefaultPahoMessageConverter());handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);returnhandler;}

这个 出站处理器 在我看来就是让别人 (MqttPahoMessageHandler)处理了, 我就不处理了,我只管我要发送什么,至于怎么发送,由MqttPahoMessageHandler来完成

接下来我们定义一个接口即可

importorg.springframework.integration.annotation.MessagingGateway;importorg.springframework.integration.mqtt.support.MqttHeaders;importorg.springframework.messaging.handler.annotation.Header;importorg.springframework.stereotype.Component;/***MqttGateway**@authorhengzi*@date2022/8/23*/@Component@MessagingGateway(defaultRequestChannel="mqttOutboundChannel")publicinterfaceMqttGateway{voidsendToMqtt(@Header(MqttHeaders.TOPIC)Stringtopic,Stringdata);voidsendToMqtt(@Header(MqttHeaders.TOPIC)Stringtopic,@Header(MqttHeaders.QOS)IntegerQos,Stringdata);}

我们直接调用这个接口就可以向mqtt 发送数据

到目前为止,整个配置文件长这样:

importorg.eclipse.paho.client.mqttv3.MqttConnectOptions;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.integration.annotation.ServiceActivator;importorg.springframework.integration.channel.DirectChannel;importorg.springframework.integration.core.MessageProducer;importorg.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;importorg.springframework.integration.mqtt.core.MqttPahoClientFactory;importorg.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;importorg.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;importorg.springframework.integration.mqtt.support.DefaultPahoMessageConverter;importorg.springframework.messaging.Message;importorg.springframework.messaging.MessageChannel;importorg.springframework.messaging.MessageHandler;importorg.springframework.messaging.MessagingException;/***MqttConfig**@authorhengzi*@date2022/8/23*/@ConfigurationpublicclassMqttConfig{/***以下属性将在配置文件中读取**/@AutowiredprivateMqttPropertiesmqttProperties;//Mqtt客户端工厂@BeanpublicMqttPahoClientFactorymqttPahoClientFactory(){DefaultMqttPahoClientFactoryfactory=newDefaultMqttPahoClientFactory();MqttConnectOptionsoptions=newMqttConnectOptions();options.setServerURIs(mqttProperties.getHostUrl().split(","));options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());factory.setConnectionOptions(options);returnfactory;}//Mqtt管道适配器@BeanpublicMqttPahoMessageDrivenChannelAdapteradapter(MqttPahoClientFactoryfactory){returnnewMqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));}//消息生产者@BeanpublicMessageProducermqttInbound(MqttPahoMessageDrivenChannelAdapteradapter){adapter.setCompletionTimeout(5000);adapter.setConverter(newDefaultPahoMessageConverter());//入站投递的通道adapter.setOutputChannel(mqttInboundChannel());adapter.setQos(1);returnadapter;}//出站处理器@Bean@ServiceActivator(inputChannel="mqttOutboundChannel")publicMessageHandlermqttOutbound(MqttPahoClientFactoryfactory){MqttPahoMessageHandlerhandler=newMqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);handler.setAsync(true);handler.setConverter(newDefaultPahoMessageConverter());handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);returnhandler;}@Bean//使用ServiceActivator指定接收消息的管道为mqttInboundChannel,投递到mqttInboundChannel管道中的消息会被该方法接收并执行@ServiceActivator(inputChannel="mqttInboundChannel")publicMessageHandlerhandleMessage(){returnmqttMessageHandle;}//出站消息管道,@BeanpublicMessageChannelmqttOutboundChannel(){returnnewDirectChannel();}//入站消息管道@BeanpublicMessageChannelmqttInboundChannel(){returnnewDirectChannel();}}

处理消息的 MqttMessageHandle

@ComponentpublicclassMqttMessageHandleimplementsMessageHandler{@OverridepublicvoidhandleMessage(Message<?>message)throwsMessagingException{}}

在进一步了解之后,发现可以优化的地方,比如channel 的类型是有很多种的, 这里使用的DirectChannel,是Spring Integration默认的消息通道,它将消息发送给为一个订阅者,然后阻碍发送直到消息被接收,传输方式都是同步的方式,都是由一个线程来运行的.

这里我们可以将入站channel改成 ExecutorChannel一个可以使用多线程的channel

@BeanpublicThreadPoolTaskExecutormqttThreadPoolTaskExecutor(){ThreadPoolTaskExecutorexecutor=newThreadPoolTaskExecutor();//最大可创建的线程数intmaxPoolSize=200;executor.setMaxPoolSize(maxPoolSize);//核心线程池大小intcorePoolSize=50;executor.setCorePoolSize(corePoolSize);//队列最大长度intqueueCapacity=1000;executor.setQueueCapacity(queueCapacity);//线程池维护线程所允许的空闲时间intkeepAliveSeconds=300;executor.setKeepAliveSeconds(keepAliveSeconds);//线程池对拒绝任务(无线程可用)的处理策略executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());returnexecutor;}//入站消息管道@BeanpublicMessageChannelmqttInboundChannel(){//用线程池returnnewExecutorChannel(mqttThreadPoolTaskExecutor());}

到这里其实可以运行了.

但是这样配置其实还是有点多, 有点乱, 于是我查找官网, f发现一种更简单的配置方法 叫 Java DSL

我们参考官网,稍微改一下,使用 DSL的方式进行配置:

importorg.eclipse.paho.client.mqttv3.MqttConnectOptions;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.integration.channel.ExecutorChannel;importorg.springframework.integration.dsl.IntegrationFlow;importorg.springframework.integration.dsl.IntegrationFlows;importorg.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;importorg.springframework.integration.mqtt.core.MqttPahoClientFactory;importorg.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;importorg.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;importorg.springframework.integration.mqtt.support.DefaultPahoMessageConverter;importorg.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;importjava.util.concurrent.ThreadPoolExecutor;/***MqttConfigV2**@authorhengzi*@date2022/8/24*/@ConfigurationpublicclassMqttConfigV2{@AutowiredprivateMqttPropertiesmqttProperties;@AutowiredprivateMqttMessageHandlemqttMessageHandle;//Mqtt客户端工厂所有客户端从这里产生@BeanpublicMqttPahoClientFactorymqttPahoClientFactory(){DefaultMqttPahoClientFactoryfactory=newDefaultMqttPahoClientFactory();MqttConnectOptionsoptions=newMqttConnectOptions();options.setServerURIs(mqttProperties.getHostUrl().split(","));options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());factory.setConnectionOptions(options);returnfactory;}//Mqtt管道适配器@BeanpublicMqttPahoMessageDrivenChannelAdapteradapter(MqttPahoClientFactoryfactory){returnnewMqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));}//消息生产者(接收,处理来自mqtt的消息)@BeanpublicIntegrationFlowmqttInbound(MqttPahoMessageDrivenChannelAdapteradapter){adapter.setCompletionTimeout(5000);adapter.setQos(1);returnIntegrationFlows.from(adapter).channel(newExecutorChannel(mqttThreadPoolTaskExecutor())).handle(mqttMessageHandle).get();}@BeanpublicThreadPoolTaskExecutormqttThreadPoolTaskExecutor(){ThreadPoolTaskExecutorexecutor=newThreadPoolTaskExecutor();//最大可创建的线程数intmaxPoolSize=200;executor.setMaxPoolSize(maxPoolSize);//核心线程池大小intcorePoolSize=50;executor.setCorePoolSize(corePoolSize);//队列最大长度intqueueCapacity=1000;executor.setQueueCapacity(queueCapacity);//线程池维护线程所允许的空闲时间intkeepAliveSeconds=300;executor.setKeepAliveSeconds(keepAliveSeconds);//线程池对拒绝任务(无线程可用)的处理策略executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());returnexecutor;}//出站处理器(向mqtt发送消息)@BeanpublicIntegrationFlowmqttOutboundFlow(MqttPahoClientFactoryfactory){MqttPahoMessageHandlerhandler=newMqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);handler.setAsync(true);handler.setConverter(newDefaultPahoMessageConverter());handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);returnIntegrationFlows.from("mqttOutboundChannel").handle(handler).get();}}

这样看起来真的简单多了, 头也没那么大了, 我要是早知道多好.

好了以上就是配置相关的, 到这里其实是已经完成springboot 与 mqtt 的整合了.

但其实我一直有个想法, 就是我们接收的消息 都是在 handleMessage这个方法里面执行的,

 @OverridepublicvoidhandleMessage(Message<?>message)throwsMessagingException{}

所以我就有了一个想法, 能不能根据 我订阅的主题,在不同的方法执行, 对于这个问题,其实你用if ... else ...也能实现, 但很明显,如果我订阅的主题很多的话, 那写起来就很头痛了.

对于这个问题,有两种思路, 一个是添加Spring Integration的路由 router,根据不同topic路由到不同的channel, 这个我也知道能不能实现, 我这里就不讨论了.

第二种是, 我也不知道名字改如何叫, 我是参考了 spring@Controller的设计, 暂且叫他注解模式.

众所周知,我们的接口都是在类上加 @Controller这个注解, 就代表这个类是 http 接口, 再在方法加上 @RequestMapping就能实现不同的 url 调用不同的方法.

参数这个设计 我们在类上面加 @MqttService就代表这个类是专门处理mqtt消息的服务类
同时 在这个类的方法上 加上 @MqttTopic就代表 这个主题由这个方法处理.

OK, 理论有了,接下来就是 实践.

先定义 两个注解

importorg.springframework.core.annotation.AliasFor;importorg.springframework.stereotype.Component;importjava.lang.annotation.*;@Documented@Target({ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@Componentpublic@interfaceMqttService{@AliasFor(annotation=Component.class)Stringvalue()default"";}

加上 @Component注解 spring就会扫描, 并注册到IOC容器里

importjava.lang.annotation.ElementType;importjava.lang.annotation.Retention;importjava.lang.annotation.RetentionPolicy;importjava.lang.annotation.Target;@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)public@interfaceMqttTopic{/***主题名字*/Stringvalue()default"";}

参考 @RequestMapping我们使用起来应该是这样的:

importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.messaging.Message;/***MqttTopicHandle**@authorhengzi*@date2022/8/24*/@MqttServicepublicclassMqttTopicHandle{publicstaticfinalLoggerlog=LoggerFactory.getLogger(MqttTopicHandle.class); //这里的#号是通配符@MqttTopic("test/#")publicvoidtest(Message<?>message){log.info("test="+message.getPayload());} //这里的+号是通配符@MqttTopic("topic/+/+/up")publicvoidup(Message<?>message){log.info("up="+message.getPayload());} //注意你必须先订阅@MqttTopic("topic/1/2/down")publicvoiddown(Message<?>message){log.info("down="+message.getPayload());}}

OK 接下来就是实现这样的使用

分析 :

当我们收到消息时, 我们从IOC容器中 找到所有 带 @MqttService注解的类

然后 遍历这些类, 找到带有 @MqttTopic的方法

接着 把 @MqttTopic注解的的值 与 接受到的topic 进行对比

如果一致则执行这个方法

废话少说, 上代码

importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.messaging.Message;importorg.springframework.messaging.MessageHandler;importorg.springframework.messaging.MessagingException;importorg.springframework.stereotype.Component;importjava.lang.reflect.InvocationTargetException;importjava.lang.reflect.Method;importjava.util.Map;/***MessageHandleService**@authorhengzi*@date2022/8/24*/@ComponentpublicclassMqttMessageHandleimplementsMessageHandler{publicstaticfinalLoggerlog=LoggerFactory.getLogger(MqttMessageHandle.class);//包含@MqttService注解的类(Component)publicstaticMap<String,Object>mqttServices;/***所有mqtt到达的消息都会在这里处理*要注意这个方法是在线程池里面运行的*@parammessagemessage*/@OverridepublicvoidhandleMessage(Message<?>message)throwsMessagingException{getMqttTopicService(message);}publicMap<String,Object>getMqttServices(){if(mqttServices==null){mqttServices=SpringUtils.getBeansByAnnotation(MqttService.class);}returnmqttServices;}publicvoidgetMqttTopicService(Message<?>message){//在这里我们根据不同的主题分发不同的消息StringreceivedTopic=message.getHeaders().get("mqtt_receivedTopic",String.class);if(receivedTopic==null||"".equals(receivedTopic)){return;}for(Map.Entry<String,Object>entry:getMqttServices().entrySet()){ //把所有带有@MqttService的类遍历Class<?>clazz=entry.getValue().getClass();//获取他所有方法Method[]methods=clazz.getDeclaredMethods();for(Methodmethod:methods){if(method.isAnnotationPresent(MqttTopic.class)){ //如果这个方法有这个注解MqttTopichandleTopic=method.getAnnotation(MqttTopic.class);if(isMatch(receivedTopic,handleTopic.value())){ //并且这个topic匹配成功try{method.invoke(SpringUtils.getBean(clazz),message);return;}catch(IllegalAccessExceptione){e.printStackTrace();log.error("代理炸了");}catch(InvocationTargetExceptione){log.error("执行{}方法出现错误",handleTopic.value(),e);}}}}}}/***mqtt订阅的主题与我实际的主题是否匹配*@paramtopic是实际的主题*@parampattern是我订阅的主题可以是通配符模式*@return是否匹配*/publicstaticbooleanisMatch(Stringtopic,Stringpattern){if((topic==null)||(pattern==null)){returnfalse;}if(topic.equals(pattern)){//完全相等是肯定匹配的returntrue;}if("#".equals(pattern)){//#号代表所有主题肯定匹配的returntrue;}String[]splitTopic=topic.split("/");String[]splitPattern=pattern.split("/");booleanmatch=true;//如果包含#则只需要判断#前面的for(inti=0;i<splitPattern.length;i++){if(!"#".equals(splitPattern[i])){//不是#号正常判断if(i>=splitTopic.length){//此时长度不相等不匹配match=false;break;}if(!splitTopic[i].equals(splitPattern[i])&&!"+".equals(splitPattern[i])){//不相等且不等于+match=false;break;}}else{//是#号肯定匹配的break;}}returnmatch;}}

工具类 SpringUtils

importorg.springframework.aop.framework.AopContext;importorg.springframework.beans.BeansException;importorg.springframework.beans.factory.NoSuchBeanDefinitionException;importorg.springframework.beans.factory.config.BeanFactoryPostProcessor;importorg.springframework.beans.factory.config.ConfigurableListableBeanFactory;importorg.springframework.context.ApplicationContext;importorg.springframework.context.ApplicationContextAware;importorg.springframework.stereotype.Component;importjava.util.Map;/***spring工具类方便在非spring管理环境中获取bean**/@ComponentpublicfinalclassSpringUtilsimplementsBeanFactoryPostProcessor,ApplicationContextAware{/**Spring应用上下文环境*/privatestaticConfigurableListableBeanFactorybeanFactory;privatestaticApplicationContextapplicationContext;publicstaticMap<String,Object>getBeansByAnnotation(ClassclsName)throwsBeansException{returnbeanFactory.getBeansWithAnnotation(clsName);}@OverridepublicvoidpostProcessBeanFactory(ConfigurableListableBeanFactorybeanFactory)throwsBeansException{SpringUtils.beanFactory=beanFactory;}@OverridepublicvoidsetApplicationContext(ApplicationContextapplicationContext)throwsBeansException{SpringUtils.applicationContext=applicationContext;}/***获取对象**@paramname*@returnObject一个以所给名字注册的bean的实例*@throwsorg.springframework.beans.BeansException**/@SuppressWarnings("unchecked")publicstatic<T>TgetBean(Stringname)throwsBeansException{return(T)beanFactory.getBean(name);}/***获取类型为requiredType的对象**@paramclz*@return*@throwsorg.springframework.beans.BeansException**/publicstatic<T>TgetBean(Class<T>clz)throwsBeansException{Tresult=(T)beanFactory.getBean(clz);returnresult;}/***如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true**@paramname*@returnboolean*/publicstaticbooleancontainsBean(Stringname){returnbeanFactory.containsBean(name);}/***判断以给定名字注册的bean定义是一个singleton还是一个prototype。如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)**@paramname*@returnboolean*@throwsorg.springframework.beans.factory.NoSuchBeanDefinitionException**/publicstaticbooleanisSingleton(Stringname)throwsNoSuchBeanDefinitionException{returnbeanFactory.isSingleton(name);}/***@paramname*@returnClass注册对象的类型*@throwsorg.springframework.beans.factory.NoSuchBeanDefinitionException**/publicstaticClass<?>getType(Stringname)throwsNoSuchBeanDefinitionException{returnbeanFactory.getType(name);}/***如果给定的bean名字在bean定义中有别名,则返回这些别名**@paramname*@return*@throwsorg.springframework.beans.factory.NoSuchBeanDefinitionException**/publicstaticString[]getAliases(Stringname)throwsNoSuchBeanDefinitionException{returnbeanFactory.getAliases(name);}/***获取aop代理对象**@paraminvoker*@return*/@SuppressWarnings("unchecked")publicstatic<T>TgetAopProxy(Tinvoker){return(T)AopContext.currentProxy();}/***获取当前的环境配置,无配置返回null**@return当前的环境配置*/publicstaticString[]getActiveProfiles(){returnapplicationContext.getEnvironment().getActiveProfiles();}}

OK, 大功告成. 终于舒服了, 终于不用写if...else...了, 个人感觉这样处理起来会更加优雅. 写代码最重要是什么, 是优雅~

以上!

参考文章:

  • 使用 Spring integration 在Springboot中集成Mqtt

  • Spring Integration(一)概述

附:

动态添加主题方式:

importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;importorg.springframework.stereotype.Service;importjava.util.Arrays;/***MqttService**@authorhengzi*@date2022/8/25*/@ServicepublicclassMqttService{@AutowiredprivateMqttPahoMessageDrivenChannelAdapteradapter;publicvoidaddTopic(Stringtopic){addTopic(topic,1);}publicvoidaddTopic(Stringtopic,intqos){String[]topics=adapter.getTopic();if(!Arrays.asList(topics).contains(topic)){adapter.addTopic(topic,qos);}}publicvoidremoveTopic(Stringtopic){adapter.removeTopic(topic);}}

直接调用就行

 </div> <div class="zixun-tj-product adv-bottom"></div> </div> </div> <div class="prve-next-news">
本文:springboot如何整合mqtt的详细内容,希望对您有所帮助,信息来源于网络。
上一篇:Java8中的方法与构造器怎么引用下一篇:

2 人围观 / 0 条评论 ↓快速评论↓

(必须)

(必须,保密)

阿狸1 阿狸2 阿狸3 阿狸4 阿狸5 阿狸6 阿狸7 阿狸8 阿狸9 阿狸10 阿狸11 阿狸12 阿狸13 阿狸14 阿狸15 阿狸16 阿狸17 阿狸18