Przeglądaj źródła

feat:"增加MQTT订单上传功能"

soobin 1 miesiąc temu
rodzic
commit
a1b05794c0

+ 17 - 0
src/main/java/com/szwl/feign/bean/OrderFeign.java

@@ -0,0 +1,17 @@
+package com.szwl.feign.bean;
+
+import com.szwl.model.bean.CoinOrderVO;
+import com.szwl.model.bo.ResponseModel;
+import org.springframework.cloud.openfeign.FeignClient;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+
+
+@FeignClient(name = "order-server")
+//@FeignClient(name = "order-server", url = "http://127.0.0.1:49012/")
+public interface OrderFeign {
+
+    @PostMapping("/tCoinOrder/uploadCoinOrder")
+    ResponseModel<String> uploadCoinOrder(@RequestBody CoinOrderVO coinOrderVO);
+
+}

+ 2 - 1
src/main/java/com/szwl/handle/response/LogMessageProcessor.java

@@ -49,7 +49,8 @@ public class LogMessageProcessor implements ResponseProcessor {
                 sendExecutor.execute(() -> {
                     // 推送消息
                     try {
-                        equipmentService.sendRemoteMessage(clientId, OperationType.LOG, message, equipment.getAdminId());
+//                        equipmentService.sendRemoteMessage(clientId, OperationType.LOG, message, equipment.getAdminId());
+                        equipmentService.responseSendMessage(messageLog, message, equipment.getAdminId());
                     } catch (Exception e) {
                         e.printStackTrace();
                         log.info("--发送消息失败:{}", e.getMessage());

+ 45 - 2
src/main/java/com/szwl/handle/response/UploadOrderProcessor.java

@@ -1,22 +1,65 @@
 package com.szwl.handle.response;
 
+import com.alibaba.fastjson.JSON;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import com.szwl.constant.OperationType;
+import com.szwl.feign.bean.OrderFeign;
+import com.szwl.model.bean.CoinOrderVO;
+import com.szwl.model.bo.R;
 import com.szwl.model.entity.MessageLog;
+import com.szwl.model.entity.TEquipment;
+import com.szwl.model.utils.PushUtils;
+import com.szwl.service.TEquipmentService;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
+@Slf4j
 @Component
-public class UploadOrderProcessor implements ResponseProcessor{
+public class UploadOrderProcessor implements ResponseProcessor {
 
+    @Resource
+    private OrderFeign orderFeign;
+
+    @Resource
+    private TEquipmentService equipmentService;
+
+    private final ExecutorService sendExecutor =
+            Executors.newSingleThreadExecutor(); // 独立发送线程
 
 
     @Override
     public void process(MessageLog messageLog) {
         Integer statusCode = messageLog.getStatusCode();
         if (statusCode == 200) {
-            String dataContent = messageLog.getDataContent();
+            Integer direction = messageLog.getDirection();
+            String clientId = messageLog.getClientId();
+            String responseContent = messageLog.getResponseContent();
 
+            // 查询设备
+            LambdaQueryWrapper<TEquipment> query = Wrappers.lambdaQuery();
+            query.eq(TEquipment::getClientId, clientId);
+            TEquipment equipment = equipmentService.getOne(query);
+            if (equipment != null) {
+                // 转换为对象
+                try {
+                    CoinOrderVO coinOrderVO = JSON.parseObject(responseContent, CoinOrderVO.class);
+                    String result = R.getDataIfSuccess(orderFeign.uploadCoinOrder(coinOrderVO));
+                    if ("success".equals(result)) {
+                        // 上传成功,通知设备端
+                        String message = PushUtils.buildJson(OperationType.UPLOAD_ORDER.getCode(), coinOrderVO.getSn()).toString();
+                        sendExecutor.execute(() -> {
+                            equipmentService.responseSendMessage(messageLog, message, equipment.getAdminId());
+                        });
+                    }
+                } catch (Exception e) {
+                    log.info("--解析失败:{}", e.getMessage());
+                }
+            }
         }
     }
 

+ 7 - 1
src/main/java/com/szwl/service/TEquipmentService.java

@@ -1,6 +1,7 @@
 package com.szwl.service;
 
 import com.szwl.constant.OperationType;
+import com.szwl.model.entity.MessageLog;
 import com.szwl.model.entity.TEquipment;
 import com.szwl.model.query.StatisticsParam;
 import com.szwl.service.base.MyIService;
@@ -22,13 +23,18 @@ public interface TEquipmentService extends MyIService<TEquipment> {
     String sentMessage(String clientId,String json);
 
     /**
-     * 发送远程操作消息
+     * 主动发送远程操作消息
      * @param clientId
      * @param operationType
      * @param message
      */
     void sendRemoteMessage(String clientId, OperationType operationType, String message, Long adminId);
 
+    /**
+     * 响应安卓消息
+     */
+    void responseSendMessage(MessageLog messageLog, String message, Long adminId);
+
     String findMachineTotalNum(StatisticsParam param);
 
     String findMachineUseNum(StatisticsParam param);

+ 15 - 0
src/main/java/com/szwl/service/impl/TEquipmentServiceImpl.java

@@ -135,6 +135,21 @@ public class TEquipmentServiceImpl extends ServiceImpl<TEquipmentMapper, TEquipm
         mqttService.sendMessage(clientId, data.toJSONString(), 0);
     }
 
+    @Override
+    public void responseSendMessage(MessageLog messageLog, String message, Long adminId) {
+        messageLog.setDataContent(message);
+        messageLog.setAdminId(adminId);
+        messageLogService.updateById(messageLog);
+        // 参数
+        JSONObject data = new JSONObject();
+        // 请求流水号
+        data.put("msgId", messageLog.getMsgId());
+        data.put("clientId", messageLog.getClientId());
+        data.put("timestamp", messageLog.getTimestamp());
+        data.put("dataContent", messageLog.getDataContent());
+        mqttService.sendMessage(messageLog.getClientId(), data.toJSONString(), 0);
+    }
+
 
     @Override
     public String findMachineTotalNum(StatisticsParam param) {