|
@@ -0,0 +1,113 @@
|
|
|
+package cn.minbb.iot.config;
|
|
|
+
|
|
|
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.context.annotation.Bean;
|
|
|
+import org.springframework.context.annotation.Configuration;
|
|
|
+import org.springframework.integration.annotation.IntegrationComponentScan;
|
|
|
+import org.springframework.integration.annotation.ServiceActivator;
|
|
|
+import org.springframework.integration.channel.DirectChannel;
|
|
|
+import org.springframework.integration.core.MessageProducer;
|
|
|
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
|
|
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
|
|
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
|
|
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
|
|
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
|
|
+import org.springframework.messaging.MessageChannel;
|
|
|
+import org.springframework.messaging.MessageHandler;
|
|
|
+
|
|
|
+/**
|
|
|
+ * MQTT消息推送配置
|
|
|
+ */
|
|
|
+@Configuration
|
|
|
+@IntegrationComponentScan
|
|
|
+public class MqttSenderConfig {
|
|
|
+
|
|
|
+ private Logger logger = LoggerFactory.getLogger(MqttSenderConfig.class);
|
|
|
+
|
|
|
+ @Value("${spring.mqtt.username}")
|
|
|
+ private String username;
|
|
|
+
|
|
|
+ @Value("${spring.mqtt.password}")
|
|
|
+ private String password;
|
|
|
+
|
|
|
+ @Value("${spring.mqtt.url}")
|
|
|
+ private String hostUrl;
|
|
|
+
|
|
|
+ @Value("${spring.mqtt.client.id}")
|
|
|
+ private String clientId;
|
|
|
+
|
|
|
+ @Value("${spring.mqtt.default.topic}")
|
|
|
+ private String defaultTopic;
|
|
|
+
|
|
|
+ @Value("${spring.mqtt.completionTimeout}")
|
|
|
+ private int completionTimeout;
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public MqttConnectOptions getMqttConnectOptions() {
|
|
|
+ MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
|
|
|
+ mqttConnectOptions.setUserName(username);
|
|
|
+ mqttConnectOptions.setPassword(password.toCharArray());
|
|
|
+ mqttConnectOptions.setServerURIs(new String[]{hostUrl});
|
|
|
+ mqttConnectOptions.setKeepAliveInterval(2);
|
|
|
+ return mqttConnectOptions;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public MqttPahoClientFactory mqttClientFactory() {
|
|
|
+ DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
|
|
|
+ factory.setConnectionOptions(getMqttConnectOptions());
|
|
|
+ return factory;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ @ServiceActivator(inputChannel = "mqttOutboundChannel")
|
|
|
+ public MessageHandler mqttOutbound() {
|
|
|
+ MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
|
|
|
+ messageHandler.setAsync(true);
|
|
|
+ messageHandler.setDefaultTopic(defaultTopic);
|
|
|
+ return messageHandler;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public MessageChannel mqttOutboundChannel() {
|
|
|
+ return new DirectChannel();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 接收通道
|
|
|
+ */
|
|
|
+ @Bean
|
|
|
+ public MessageChannel mqttInputChannel() {
|
|
|
+ return new DirectChannel();
|
|
|
+ }
|
|
|
+
|
|
|
+ // 配置 client,监听 topic
|
|
|
+ @Bean
|
|
|
+ public MessageProducer inbound() {
|
|
|
+ MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
|
|
|
+ clientId + "_inbound", mqttClientFactory(),
|
|
|
+ defaultTopic,
|
|
|
+ Const.MQTT_TOPIC_ALL, Const.MQTT_TOPIC_CAR);
|
|
|
+ adapter.setCompletionTimeout(completionTimeout);
|
|
|
+ adapter.setConverter(new DefaultPahoMessageConverter());
|
|
|
+ adapter.setQos(1);
|
|
|
+ adapter.setOutputChannel(mqttInputChannel());
|
|
|
+ return adapter;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 通过通道获取数据
|
|
|
+ @Bean
|
|
|
+ @ServiceActivator(inputChannel = "mqttInputChannel")
|
|
|
+ public MessageHandler handler() {
|
|
|
+ return message -> {
|
|
|
+ Object object = message.getHeaders().get("mqtt_receivedTopic");
|
|
|
+ String topic = null != object ? object.toString() : "";
|
|
|
+ String type = topic.substring(topic.lastIndexOf("/") + 1, topic.length());
|
|
|
+ String order = message.getPayload().toString();
|
|
|
+ logger.info("收到消息 = 主题 = {} 信息 = {}", topic, order);
|
|
|
+ };
|
|
|
+ }
|
|
|
+}
|