Просмотр исходного кода

feat:“优化mqtt监听接口,优化消息逻辑处理"

soobin 8 месяцев назад
Родитель
Сommit
692a0cb903

+ 24 - 2
src/main/java/com/szwl/config/MqttConfig.java

@@ -1,6 +1,6 @@
 package com.szwl.config;
 
-import com.szwl.event.MqttMessageEvent;
+import com.szwl.manager.MqttHandlerManager;
 import com.szwl.service.MqttTopicsService;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.*;
@@ -14,6 +14,9 @@ import org.springframework.context.annotation.Configuration;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
+/**
+ * mqtt配置
+ */
 @Slf4j
 @Configuration
 public class MqttConfig {
@@ -32,6 +35,8 @@ public class MqttConfig {
 
     private MqttClient mqttClient;
 
+    private final MqttHandlerManager handlerManager;
+
     String serverAddress = "";
 
     @Autowired
@@ -40,6 +45,10 @@ public class MqttConfig {
     @Autowired
     private MqttTopicsService mqttTopicsService;
 
+    public MqttConfig(MqttHandlerManager handlerManager) {
+        this.handlerManager = handlerManager;
+    }
+
     @Bean
     public MqttClient mqttClient() throws MqttException, UnknownHostException {
         serverAddress = InetAddress.getLocalHost().getHostAddress();
@@ -59,7 +68,12 @@ public class MqttConfig {
             @Override
             public void messageArrived(String topic, MqttMessage message) {
                 String payload = new String(message.getPayload());
-                eventPublisher.publishEvent(new MqttMessageEvent(this, topic, payload));
+                // 如果是上下线消息
+                if (topic.contains("connected")) {
+                    topic = topic.substring(topic.lastIndexOf("/") + 1);
+                }
+                // 将消息转发到处理逻辑
+                handleIncomingMessage(topic, payload);
             }
 
             @Override
@@ -102,5 +116,13 @@ public class MqttConfig {
             e.printStackTrace();
         }
     }
+
+    /**
+     * 消息处理逻辑
+     */
+    private void handleIncomingMessage(String topic, String payload) {
+        // 后续逻辑会通过策略模式或注解来实现动态处理
+        handlerManager.dispatch(topic, payload);
+    }
 }
 

+ 1 - 1
src/main/java/com/szwl/controller/MqttController.java

@@ -14,7 +14,7 @@ import com.szwl.model.entity.MqttTopics;
 import com.szwl.model.param.MqttMsgParam;
 import com.szwl.service.MqttMsgService;
 import com.szwl.service.MqttTopicsService;
-import com.szwl.service.base.MqttService;
+import com.szwl.service.mqtt.MqttService;
 import com.szwl.util.MqttUtils;
 import io.swagger.annotations.ApiOperation;
 import org.apache.commons.lang.StringUtils;

+ 0 - 26
src/main/java/com/szwl/event/MqttMessageEvent.java

@@ -1,26 +0,0 @@
-package com.szwl.event;
-
-import org.springframework.context.ApplicationEvent;
-
-/**
- * MQTT消息事件类
- * @author: soobin
- */
-public class MqttMessageEvent extends ApplicationEvent {
-    private final String topic;
-    private final String payload;
-
-    public MqttMessageEvent(Object source, String topic, String payload) {
-        super(source);
-        this.topic = topic;
-        this.payload = payload;
-    }
-
-    public String getTopic() {
-        return topic;
-    }
-
-    public String getPayload() {
-        return payload;
-    }
-}

+ 0 - 62
src/main/java/com/szwl/event/MqttMessageEventListener.java

@@ -1,62 +0,0 @@
-package com.szwl.event;
-
-import com.alibaba.fastjson.JSONObject;
-import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
-import com.baomidou.mybatisplus.core.toolkit.Wrappers;
-import com.szwl.model.entity.MqttMsg;
-import com.szwl.model.entity.TEquipment;
-import com.szwl.service.MqttMsgService;
-import com.szwl.service.TEquipmentService;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.event.EventListener;
-import org.springframework.stereotype.Component;
-
-import java.util.Date;
-
-@Slf4j
-@Component
-public class MqttMessageEventListener {
-
-    @Autowired
-    private MqttMsgService mqttMsgService;
-
-    @Autowired
-    private TEquipmentService equipmentService;
-
-    /**
-     * 监听主题:/connected,/disconnected,作用:监听设备上线、下线
-     */
-    @EventListener
-    public void connectionMessage(MqttMessageEvent event) {
-        String topic = event.getTopic();
-        String result = topic.substring(topic.lastIndexOf("/") + 1);
-        String payload = event.getPayload();
-        MqttMsg mqttMsg = new MqttMsg();
-        mqttMsg.setCreateDate(new Date());
-        if (result.equals("connected") || result.equals("disconnected")) {
-            JSONObject jsonObject = JSONObject.parseObject(payload);
-            String clientId = jsonObject.getString("clientid");
-            LambdaQueryWrapper<TEquipment> query = Wrappers.lambdaQuery();
-            query.eq(TEquipment::getClientId, clientId);
-            TEquipment equipment = equipmentService.getOne(query);
-            if (equipment != null) {
-                if (result.equals("connected")) {
-                    // 上线
-                    equipment.setEqeStatus(1);
-                } else {
-                    // 下线
-                    equipment.setEqeStatus(0);
-                }
-                equipmentService.updateById(equipment);
-            }
-        } else {
-            log.info("主题:{},消息:{}", topic, payload);
-            mqttMsg.setTopic(topic);
-            mqttMsg.setMessage(payload);
-            mqttMsgService.save(mqttMsg);
-        }
-    }
-
-    // 更多主题的处理逻辑
-}

+ 33 - 0
src/main/java/com/szwl/handle/ConnectedMessageHandler.java

@@ -0,0 +1,33 @@
+package com.szwl.handle;
+
+import com.alibaba.fastjson.JSONObject;
+import com.szwl.model.entity.MqttMsg;
+import com.szwl.service.MqttMsgService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+
+/**
+ * 连接消息处理
+ */
+@Slf4j
+@Component
+public class ConnectedMessageHandler implements MqttMessageHandler{
+
+    @Autowired
+    private MqttMsgService mqttMsgService;
+
+    @Override
+    public void handle(String topic, String payload) {
+        JSONObject jsonObject = JSONObject.parseObject(payload);
+        String clientId = jsonObject.getString("clientid");
+        MqttMsg mqttMsg = new MqttMsg();
+        mqttMsg.setCreateDate(new Date());
+        mqttMsg.setClientId(clientId);
+        mqttMsg.setTopic(topic);
+        mqttMsg.setMessage(payload);
+        mqttMsgService.save(mqttMsg);
+    }
+}

+ 33 - 0
src/main/java/com/szwl/handle/DisconnectedMessageHandler.java

@@ -0,0 +1,33 @@
+package com.szwl.handle;
+
+import com.alibaba.fastjson.JSONObject;
+import com.szwl.model.entity.MqttMsg;
+import com.szwl.service.MqttMsgService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+
+/**
+ * 断开连接处理
+ */
+@Slf4j
+@Component
+public class DisconnectedMessageHandler implements MqttMessageHandler{
+
+    @Autowired
+    private MqttMsgService mqttMsgService;
+
+    @Override
+    public void handle(String topic, String payload) {
+        JSONObject jsonObject = JSONObject.parseObject(payload);
+        String clientId = jsonObject.getString("clientid");
+        MqttMsg mqttMsg = new MqttMsg();
+        mqttMsg.setCreateDate(new Date());
+        mqttMsg.setClientId(clientId);
+        mqttMsg.setTopic(topic);
+        mqttMsg.setMessage(payload);
+        mqttMsgService.save(mqttMsg);
+    }
+}

+ 83 - 0
src/main/java/com/szwl/handle/HeartMessageHandler.java

@@ -0,0 +1,83 @@
+package com.szwl.handle;
+
+import com.alibaba.fastjson.JSON;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.szwl.model.entity.MqttMsg;
+import com.szwl.model.entity.TEquipment;
+import com.szwl.model.entity.TEquipmentDesc;
+import com.szwl.model.jsonParm.HeartParam;
+import com.szwl.service.MqttMsgService;
+import com.szwl.service.TEquipmentDescService;
+import com.szwl.service.TEquipmentService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+
+/**
+ * 心跳消息处理
+ */
+@Slf4j
+@Component
+public class HeartMessageHandler implements MqttMessageHandler{
+
+    @Autowired
+    private MqttMsgService mqttMsgService;
+
+    @Autowired
+    private TEquipmentService equipmentService;
+
+    @Autowired
+    private TEquipmentDescService equipmentDescService;
+
+    @Override
+    public void handle(String topic, String payload) {
+        HeartParam heartParam = JSON.parseObject(payload, HeartParam.class);
+        String clientId = heartParam.getClientId();
+        MqttMsg mqttMsg = new MqttMsg();
+        mqttMsg.setCreateDate(new Date());
+        mqttMsg.setClientId(clientId);
+        mqttMsg.setTopic(topic);
+        mqttMsg.setMessage(payload);
+        mqttMsgService.save(mqttMsg);
+
+        // 查询设备
+        LambdaQueryWrapper<TEquipment> query = Wrappers.lambdaQuery();
+        query.eq(TEquipment::getClientId, clientId);
+        TEquipment equipment = equipmentService.getOne(query);
+        if (equipment != null) {
+            equipment.setCabinetHd(heartParam.getCabinetHd());
+            equipment.setCabinetTm(heartParam.getCabinetTm());
+            equipment.setEqeStatus(heartParam.getEqeStatus());
+            equipment.setFurnaceSp(heartParam.getFurnaceSp());
+            equipment.setFurnaceTm(heartParam.getFurnaceTm());
+            equipment.setGtClientId(heartParam.getGtClientId());
+            equipment.setIsSleep(heartParam.isSleep());
+            equipment.setLastUpdateTime(heartParam.getLastUpdateTime());
+            equipment.setLatitude(heartParam.getLatitude());
+            equipment.setLongitude(heartParam.getLongitude());
+            equipment.setManagerId(heartParam.getManagerId());
+            equipment.setNetWorkingMode(heartParam.getNetWorkingMode());
+            equipment.setVolume(heartParam.getVolume());
+            equipmentService.updateById(equipment);
+            // 更新数据
+            TEquipmentDesc equipmentDesc = equipmentDescService.getById(equipment.getId());
+            if (equipmentDesc != null) {
+                equipmentDesc.setBlueSugar(heartParam.getBlueSugar());
+                equipmentDesc.setCandyGeneratorTm(heartParam.getCandyGeneratorTm());
+                equipmentDesc.setNumberOne(heartParam.getNumberOne());
+                equipmentDesc.setOutsideHd(heartParam.getOutsideHd());
+                equipmentDesc.setOutsideTm(heartParam.getOutsideTm());
+                equipmentDesc.setRedSugar(heartParam.getRedSugar());
+                equipmentDesc.setStick(heartParam.getStick());
+                equipmentDesc.setWater(heartParam.getWater());
+                equipmentDesc.setWasteWater(heartParam.getWasteWater());
+                equipmentDesc.setWhiteSugar(heartParam.getWhiteSugar());
+                equipmentDesc.setYellowSugar(heartParam.getYellowSugar());
+                equipmentDescService.updateById(equipmentDesc);
+            }
+        }
+    }
+}

+ 6 - 0
src/main/java/com/szwl/handle/MqttMessageHandler.java

@@ -0,0 +1,6 @@
+package com.szwl.handle;
+
+public interface MqttMessageHandler {
+    void handle(String topic, String payload);
+}
+

+ 13 - 0
src/main/java/com/szwl/handle/ParametersMessageHandler.java

@@ -0,0 +1,13 @@
+package com.szwl.handle;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class ParametersMessageHandler implements MqttMessageHandler{
+    @Override
+    public void handle(String topic, String payload) {
+        log.info("topic:{},payload:{}", topic, payload);
+    }
+}

+ 45 - 0
src/main/java/com/szwl/manager/MqttHandlerManager.java

@@ -0,0 +1,45 @@
+package com.szwl.manager;
+
+import com.szwl.handle.*;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * 策略管理器
+ */
+@Component
+public class MqttHandlerManager {
+
+    private final Map<String, MqttMessageHandler> handlerMap = new HashMap<>();
+
+    public MqttHandlerManager(List<MqttMessageHandler> handlers) {
+        // 手动映射主题到处理器
+        handlerMap.put("connected", findHandler(handlers, ConnectedMessageHandler.class));
+        handlerMap.put("disconnected", findHandler(handlers, DisconnectedMessageHandler.class));
+        handlerMap.put("heart", findHandler(handlers, HeartMessageHandler.class));
+        handlerMap.put("parameters", findHandler(handlers, ParametersMessageHandler.class));
+    }
+
+    public void dispatch(String topic, String payload) {
+        Optional<Map.Entry<String, MqttMessageHandler>> handlerEntry = handlerMap.entrySet().stream()
+                .filter(entry -> topic.matches(entry.getKey())) // 通过正则匹配主题
+                .findFirst();
+
+        if (handlerEntry.isPresent()) {
+            handlerEntry.get().getValue().handle(topic, payload);
+        } else {
+            System.out.printf("No handler found for topic [%s]%n", topic);
+        }
+    }
+
+    private MqttMessageHandler findHandler(List<MqttMessageHandler> handlers, Class<?> clazz) {
+        return handlers.stream()
+                .filter(handler -> clazz.isAssignableFrom(handler.getClass()))
+                .findFirst()
+                .orElseThrow(() -> new IllegalArgumentException("No handler found for: " + clazz.getName()));
+    }
+}

+ 87 - 0
src/main/java/com/szwl/model/jsonParm/HeartParam.java

@@ -0,0 +1,87 @@
+package com.szwl.model.jsonParm;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Data;
+
+import java.util.Date;
+
+@Data
+public class HeartParam {
+
+    @JsonProperty("blueSugar")
+    private String blueSugar;
+
+    @JsonProperty("cabinetHd")
+    private String cabinetHd;
+
+    @JsonProperty("cabinetTm")
+    private String cabinetTm;
+
+    @JsonProperty("candyGeneratorTm")
+    private String candyGeneratorTm;
+
+    @JsonProperty("clientId")
+    private String clientId;
+
+    @JsonProperty("eqeStatus")
+    private Integer eqeStatus;
+
+    @JsonProperty("furnaceSp")
+    private String furnaceSp;
+
+    @JsonProperty("furnaceTm")
+    private String furnaceTm;
+
+    @JsonProperty("gtClientId")
+    private String gtClientId;
+
+    @JsonProperty("is_sleep")
+    private boolean isSleep;
+
+    @JsonProperty("lastUpdateTime")
+    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
+    private Date lastUpdateTime;
+
+    @JsonProperty("latitude")
+    private double latitude;
+
+    @JsonProperty("longitude")
+    private double longitude;
+
+    @JsonProperty("managerId")
+    private String managerId;
+
+    @JsonProperty("netWorkingMode")
+    private String netWorkingMode;
+
+    @JsonProperty("numberOne")
+    private String numberOne;
+
+    @JsonProperty("outsideHd")
+    private String outsideHd;
+
+    @JsonProperty("outsideTm")
+    private String outsideTm;
+
+    @JsonProperty("redSugar")
+    private String redSugar;
+
+    @JsonProperty("stick")
+    private String stick;
+
+    @JsonProperty("volume")
+    private String volume;
+
+    @JsonProperty("wasteWater")
+    private String wasteWater;
+
+    @JsonProperty("water")
+    private String water;
+
+    @JsonProperty("whiteSugar")
+    private String whiteSugar;
+
+    @JsonProperty("yellowSugar")
+    private String yellowSugar;
+}

+ 1 - 1
src/main/java/com/szwl/service/base/MqttService.java

@@ -1,4 +1,4 @@
-package com.szwl.service.base;
+package com.szwl.service.mqtt;
 
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.MqttClient;