MqttConfig.java 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package com.szwl.config;
  2. import com.szwl.manager.MqttHandlerManager;
  3. import com.szwl.model.entity.MqttMsg;
  4. import com.szwl.service.MqttMsgService;
  5. import com.szwl.service.MqttTopicsService;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.eclipse.paho.client.mqttv3.*;
  8. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.beans.factory.annotation.Value;
  11. import org.springframework.context.ApplicationEventPublisher;
  12. import org.springframework.context.annotation.Bean;
  13. import org.springframework.context.annotation.Configuration;
  14. import org.springframework.context.annotation.Lazy;
  15. import java.net.InetAddress;
  16. import java.net.UnknownHostException;
  17. import java.util.Date;
  18. /**
  19. * mqtt配置
  20. */
  21. @Slf4j
  22. @Configuration
  23. public class MqttConfig {
  24. @Value("${mqtt.url}")
  25. private String url;
  26. @Value("${mqtt.clientId}")
  27. private String clientId;
  28. @Value("${mqtt.username}")
  29. private String username;
  30. @Value("${mqtt.password}")
  31. private String password;
  32. private MqttClient mqttClient;
  33. private final MqttHandlerManager handlerManager;
  34. String serverAddress = "";
  35. @Autowired
  36. private ApplicationEventPublisher eventPublisher;
  37. @Autowired
  38. private MqttTopicsService mqttTopicsService;
  39. @Autowired
  40. private MqttMsgService mqttMsgService;
  41. public MqttConfig(MqttHandlerManager handlerManager) {
  42. this.handlerManager = handlerManager;
  43. }
  44. @Bean
  45. @Lazy
  46. public MqttClient mqttClient() throws MqttException, UnknownHostException {
  47. serverAddress = InetAddress.getLocalHost().getHostAddress();
  48. MemoryPersistence persistence = new MemoryPersistence();
  49. // 客户端ID
  50. clientId = clientId + "-" + serverAddress;
  51. mqttClient = new MqttClient(url, clientId, persistence);
  52. // 设置回调
  53. mqttClient.setCallback(new MqttCallback() {
  54. @Override
  55. public void connectionLost(Throwable cause) {
  56. log.info("MQTT连接断开:{}", cause.getMessage());
  57. log.info("MQTT连接断开,正在重连...");
  58. connectMqtt(clientId);
  59. }
  60. @Override
  61. public void messageArrived(String topic, MqttMessage message) {
  62. try {
  63. String payload = new String(message.getPayload());
  64. // 如果是上下线消息
  65. if (topic.contains("connected")) {
  66. topic = topic.substring(topic.lastIndexOf("/") + 1);
  67. }
  68. // 将消息转发到处理逻辑
  69. handleIncomingMessage(topic, payload);
  70. // 如果不是上下线消息
  71. if (!topic.contains("connected")) {
  72. MqttMsg mqttMsg = new MqttMsg();
  73. mqttMsg.setCreateDate(new Date());
  74. mqttMsg.setTopic(topic);
  75. mqttMsg.setMessage(payload);
  76. mqttMsgService.save(mqttMsg);
  77. }
  78. } catch (Exception e) {
  79. log.info("--处理消息失败:{}", e.getMessage());
  80. }
  81. }
  82. @Override
  83. public void deliveryComplete(IMqttDeliveryToken token) {
  84. // Handle delivery completion if needed
  85. }
  86. });
  87. connectMqtt(clientId);
  88. return mqttClient;
  89. }
  90. /**
  91. * 连接MQTT
  92. */
  93. public void connectMqtt(String clientId) {
  94. // 持久化
  95. try {
  96. MqttConnectOptions options = new MqttConnectOptions();
  97. options.setUserName(username);
  98. options.setPassword(password.toCharArray());
  99. // 使会话保持
  100. options.setCleanSession(false);
  101. options.setKeepAliveInterval(60);
  102. options.setAutomaticReconnect(true);
  103. options.setConnectionTimeout(15);
  104. // 建立连接
  105. mqttClient.connect(options);
  106. // 订阅主题
  107. if (serverAddress.equals("10.0.0.153")) {
  108. mqttTopicsService.list().forEach(topic -> {
  109. try {
  110. mqttClient.subscribe(topic.getTopic(), topic.getQos());
  111. } catch (MqttException e) {
  112. throw new RuntimeException(e);
  113. }
  114. });
  115. }
  116. log.info("MQTT客户端已连接,客户端ID:{}", clientId);
  117. } catch (MqttException e) {
  118. log.info("MQTT客户端连接失败");
  119. e.printStackTrace();
  120. }
  121. }
  122. /**
  123. * 消息处理逻辑
  124. */
  125. private void handleIncomingMessage(String topic, String payload) {
  126. // 后续逻辑会通过策略模式或注解来实现动态处理
  127. handlerManager.dispatch(topic, payload);
  128. }
  129. }