springboot集成mqtt的实践开发
时间:2021-01-27 10:38:13|栏目:JAVA代码|点击: 次
序
MQTT(Message Queuing Telemetry Transport)是基于二进制消息的发布/订阅编程模式的消息协议,非常适合需要低功耗和网络带宽有限的IoT场景。这里简单介绍一下如何在springboot中集成。
maven
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
配置client factory
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs("tcp://demo:1883");
// factory.setUserName("guest");
// factory.setPassword("guest");
return factory;
}
配置consumer
@Bean
public IntegrationFlow mqttInFlow() {
return IntegrationFlows.from(mqttInbound())
.transform(p -> p + ", received from MQTT")
.handle(logger())
.get();
}
private LoggingHandler logger() {
LoggingHandler loggingHandler = new LoggingHandler("INFO");
loggingHandler.setLoggerName("siSample");
return loggingHandler;
}
@Bean
public MessageProducerSupport mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer",
mqttClientFactory(), "siSampleTopic");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
return adapter;
}
配置producer
@Bean
public IntegrationFlow mqttOutFlow() {
//console input
// return IntegrationFlows.from(CharacterStreamReadingMessageSource.stdin(),
// e -> e.poller(Pollers.fixedDelay(1000)))
// .transform(p -> p + " sent to MQTT")
// .handle(mqttOutbound())
// .get();
return IntegrationFlows.from(outChannel())
.handle(mqttOutbound())
.get();
}
@Bean
public MessageChannel outChannel() {
return new DirectChannel();
}
@Bean
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("siSamplePublisher", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("siSampleTopic");
return messageHandler;
}
配置MessagingGateway
@MessagingGateway(defaultRequestChannel = "outChannel")
public interface MsgWriter {
void write(String note);
}
这样就大功告成了
doc
上一篇:使用IDEA启动项目遇见ClassNotFoundException的解决方案
栏 目:JAVA代码
下一篇:基于Rest的API解决方案(jersey与swagger集成)
本文地址:http://www.codeinn.net/misctech/51921.html


阅读排行
- 1Java Swing组件BoxLayout布局用法示例
- 2java中-jar 与nohup的对比
- 3Java邮件发送程序(可以同时发给多个地址、可以带附件)
- 4Caused by: java.lang.ClassNotFoundException: org.objectweb.asm.Type异常
- 5Java中自定义异常详解及实例代码
- 6深入理解Java中的克隆
- 7java读取excel文件的两种方法
- 8解析SpringSecurity+JWT认证流程实现
- 9spring boot里增加表单验证hibernate-validator并在freemarker模板里显示错误信息(推荐)
- 10深入解析java虚拟机




