package com.szwl.config; import com.szwl.manager.MqttHandlerManager; import com.szwl.model.entity.MqttMsg; import com.szwl.service.MqttMsgService; import com.szwl.service.MqttTopicsService; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Lazy; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Date; /** * mqtt配置 */ @Slf4j @Configuration public class MqttConfig { @Value("${mqtt.url}") private String url; @Value("${mqtt.clientId}") private String clientId; @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; private MqttClient mqttClient; private final MqttHandlerManager handlerManager; String serverAddress = ""; @Autowired private ApplicationEventPublisher eventPublisher; @Autowired private MqttTopicsService mqttTopicsService; @Autowired private MqttMsgService mqttMsgService; public MqttConfig(MqttHandlerManager handlerManager) { this.handlerManager = handlerManager; } @Bean @Lazy public MqttClient mqttClient() throws MqttException, UnknownHostException { serverAddress = InetAddress.getLocalHost().getHostAddress(); MemoryPersistence persistence = new MemoryPersistence(); // 客户端ID clientId = clientId + "-" + serverAddress; mqttClient = new MqttClient(url, clientId, persistence); // 设置回调 mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { log.info("MQTT连接断开:{}", cause.getMessage()); log.info("MQTT连接断开,正在重连..."); connectMqtt(clientId); } @Override public void messageArrived(String topic, MqttMessage message) { try { String payload = new String(message.getPayload()); // 如果是上下线消息 if (topic.contains("connected")) { topic = topic.substring(topic.lastIndexOf("/") + 1); } // 将消息转发到处理逻辑 handleIncomingMessage(topic, payload); // 如果不是上下线消息 if (!topic.contains("connected")) { MqttMsg mqttMsg = new MqttMsg(); mqttMsg.setCreateDate(new Date()); mqttMsg.setTopic(topic); mqttMsg.setMessage(payload); mqttMsgService.save(mqttMsg); } } catch (Exception e) { log.info("--处理消息失败:{}", e.getMessage()); } } @Override public void deliveryComplete(IMqttDeliveryToken token) { // Handle delivery completion if needed } }); connectMqtt(clientId); return mqttClient; } /** * 连接MQTT */ public void connectMqtt(String clientId) { // 持久化 try { MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(username); options.setPassword(password.toCharArray()); // 使会话保持 options.setCleanSession(false); options.setKeepAliveInterval(60); options.setAutomaticReconnect(true); options.setConnectionTimeout(15); // 建立连接 mqttClient.connect(options); // 订阅主题 if (serverAddress.equals("10.0.0.153")) { mqttTopicsService.list().forEach(topic -> { try { mqttClient.subscribe(topic.getTopic(), topic.getQos()); } catch (MqttException e) { throw new RuntimeException(e); } }); } log.info("MQTT客户端已连接,客户端ID:{}", clientId); } catch (MqttException e) { log.info("MQTT客户端连接失败"); e.printStackTrace(); } } /** * 消息处理逻辑 */ private void handleIncomingMessage(String topic, String payload) { // 后续逻辑会通过策略模式或注解来实现动态处理 handlerManager.dispatch(topic, payload); } }