123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- package com.szwl.config;
- import com.szwl.manager.MqttHandlerManager;
- import com.szwl.model.entity.MqttMsg;
- import com.szwl.service.MqttMsgService;
- import com.szwl.service.MqttTopicsService;
- import lombok.extern.slf4j.Slf4j;
- import org.eclipse.paho.client.mqttv3.*;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.ApplicationEventPublisher;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Lazy;
- import java.net.InetAddress;
- import java.net.UnknownHostException;
- import java.util.Date;
- /**
- * mqtt配置
- */
- @Slf4j
- @Configuration
- public class MqttConfig {
- @Value("${mqtt.url}")
- private String url;
- @Value("${mqtt.clientId}")
- private String clientId;
- @Value("${mqtt.username}")
- private String username;
- @Value("${mqtt.password}")
- private String password;
- private MqttClient mqttClient;
- private final MqttHandlerManager handlerManager;
- String serverAddress = "";
- @Autowired
- private ApplicationEventPublisher eventPublisher;
- @Autowired
- private MqttTopicsService mqttTopicsService;
- @Autowired
- private MqttMsgService mqttMsgService;
- public MqttConfig(MqttHandlerManager handlerManager) {
- this.handlerManager = handlerManager;
- }
- @Bean
- @Lazy
- public MqttClient mqttClient() throws MqttException, UnknownHostException {
- serverAddress = InetAddress.getLocalHost().getHostAddress();
- MemoryPersistence persistence = new MemoryPersistence();
- // 客户端ID
- clientId = clientId + "-" + serverAddress;
- mqttClient = new MqttClient(url, clientId, persistence);
- // 设置回调
- mqttClient.setCallback(new MqttCallback() {
- @Override
- public void connectionLost(Throwable cause) {
- log.info("MQTT连接断开:{}", cause.getMessage());
- log.info("MQTT连接断开,正在重连...");
- connectMqtt(clientId);
- }
- @Override
- public void messageArrived(String topic, MqttMessage message) {
- try {
- String payload = new String(message.getPayload());
- // 如果是上下线消息
- if (topic.contains("connected")) {
- topic = topic.substring(topic.lastIndexOf("/") + 1);
- }
- // 将消息转发到处理逻辑
- handleIncomingMessage(topic, payload);
- // 如果不是上下线消息
- if (!topic.contains("connected")) {
- MqttMsg mqttMsg = new MqttMsg();
- mqttMsg.setCreateDate(new Date());
- mqttMsg.setTopic(topic);
- mqttMsg.setMessage(payload);
- mqttMsgService.save(mqttMsg);
- }
- } catch (Exception e) {
- log.info("--处理消息失败:{}", e.getMessage());
- }
- }
- @Override
- public void deliveryComplete(IMqttDeliveryToken token) {
- // Handle delivery completion if needed
- }
- });
- connectMqtt(clientId);
- return mqttClient;
- }
- /**
- * 连接MQTT
- */
- public void connectMqtt(String clientId) {
- // 持久化
- try {
- MqttConnectOptions options = new MqttConnectOptions();
- options.setUserName(username);
- options.setPassword(password.toCharArray());
- // 使会话保持
- options.setCleanSession(false);
- options.setKeepAliveInterval(60);
- options.setAutomaticReconnect(true);
- options.setConnectionTimeout(15);
- // 建立连接
- mqttClient.connect(options);
- // 订阅主题
- if (serverAddress.equals("10.0.0.153")) {
- mqttTopicsService.list().forEach(topic -> {
- try {
- mqttClient.subscribe(topic.getTopic(), topic.getQos());
- } catch (MqttException e) {
- throw new RuntimeException(e);
- }
- });
- }
- log.info("MQTT客户端已连接,客户端ID:{}", clientId);
- } catch (MqttException e) {
- log.info("MQTT客户端连接失败");
- e.printStackTrace();
- }
- }
- /**
- * 消息处理逻辑
- */
- private void handleIncomingMessage(String topic, String payload) {
- // 后续逻辑会通过策略模式或注解来实现动态处理
- handlerManager.dispatch(topic, payload);
- }
- }
|