Browse Source

feat:“添加MQTT"

soobin 10 months ago
parent
commit
2af7aa5980

+ 6 - 0
pom.xml

@@ -301,6 +301,12 @@
 <!--            <version>1.10.77</version>-->
 <!--        </dependency>-->
 
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+            <version>1.2.5</version>
+        </dependency>
+        <!-- MQTT相关配置 -->
     </dependencies>
 
     <build>

+ 2 - 1
src/main/java/com/szwl/aspect/MyWebMvcConfigurer.java

@@ -53,7 +53,8 @@ public class MyWebMvcConfigurer extends WebMvcConfigurationSupport {
                 "/tLocationCheck/**",
                 "/tTest/**",
                 "/tHotUpdate/**",
-                "/tAlarmClean/**"
+                "/tAlarmClean/**",
+                "/mqtt/**"
         };
         registry.addInterceptor(headTokenInterceptor)
                 .addPathPatterns("/**")

+ 110 - 0
src/main/java/com/szwl/config/MqttConfig.java

@@ -0,0 +1,110 @@
+package com.szwl.config;
+
+import com.szwl.event.MqttMessageEvent;
+import com.szwl.service.MqttTopicsService;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.core.StringRedisTemplate;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+@Slf4j
+@Configuration
+public class MqttConfig {
+
+    @Value("${mqtt.url}")
+    private String url;
+
+    @Value("${mqtt.clientId}")
+    private String clientId;
+
+    @Value("${mqtt.username}")
+    private String username;
+
+    @Value("${mqtt.password}")
+    private String password;
+
+    private MqttClient mqttClient;
+
+    String serverAddress = "";
+
+    @Autowired
+    private ApplicationEventPublisher eventPublisher;
+
+    @Autowired
+    private MqttTopicsService mqttTopicsService;
+
+    @Autowired
+    private StringRedisTemplate redisTemplate;
+
+    @Bean
+    public MqttClient mqttClient() throws MqttException, UnknownHostException {
+        serverAddress = InetAddress.getLocalHost().getHostAddress();
+        MemoryPersistence persistence = new MemoryPersistence();
+        // 客户端ID
+        clientId = clientId + "-" + serverAddress;
+        mqttClient = new MqttClient(url, clientId, persistence);
+        // 设置回调
+        mqttClient.setCallback(new MqttCallback() {
+            @Override
+            public void connectionLost(Throwable cause) {
+                log.info("MQTT连接断开:{}", cause.getMessage());
+                log.info("MQTT连接断开,正在重连...");
+                connectMqtt(clientId);
+            }
+
+            @Override
+            public void messageArrived(String topic, MqttMessage message) {
+                String payload = new String(message.getPayload());
+                eventPublisher.publishEvent(new MqttMessageEvent(this, topic, payload));
+            }
+
+            @Override
+            public void deliveryComplete(IMqttDeliveryToken token) {
+                // Handle delivery completion if needed
+            }
+
+        });
+        connectMqtt(clientId);
+        return mqttClient;
+    }
+
+    /**
+     * 连接MQTT
+     */
+    public void connectMqtt(String clientId) {
+        //  持久化
+        try {
+            MqttConnectOptions options = new MqttConnectOptions();
+            options.setUserName(username);
+            options.setPassword(password.toCharArray());
+            // 使会话保持
+            options.setCleanSession(false);
+            options.setKeepAliveInterval(60);
+            // 建立连接
+            mqttClient.connect(options);
+            // 订阅主题
+            if (serverAddress.equals("10.0.0.153")) {
+                mqttTopicsService.list().forEach(topic -> {
+                    try {
+                        mqttClient.subscribe(topic.getTopic(), topic.getQos());
+                    } catch (MqttException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            }
+            log.info("MQTT客户端已连接,客户端ID:{}", clientId);
+        } catch (MqttException e) {
+            log.info("MQTT客户端连接失败");
+            e.printStackTrace();
+        }
+    }
+}
+

+ 100 - 0
src/main/java/com/szwl/controller/MqttController.java

@@ -0,0 +1,100 @@
+package com.szwl.controller;
+
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.szwl.model.bo.R;
+import com.szwl.model.bo.ResponseModel;
+import com.szwl.model.dto.MqttMsgVo;
+import com.szwl.model.entity.MqttMsg;
+import com.szwl.model.entity.MqttTopics;
+import com.szwl.model.param.MqttMsgParam;
+import com.szwl.service.MqttMsgService;
+import com.szwl.service.MqttTopicsService;
+import com.szwl.service.base.MqttService;
+import io.swagger.annotations.ApiOperation;
+import org.apache.commons.lang.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.*;
+
+import java.util.Date;
+
+/**
+ * <p>
+ *  前端控制器
+ * </p>
+ *
+ * @author wuhs
+ * @since 2024-10-17
+ */
+@RestController
+@RequestMapping("/mqtt")
+public class MqttController {
+
+
+    @Autowired
+    private MqttService mqttService;
+
+    @Autowired
+    private MqttTopicsService mqttTopicsService;
+
+    @Autowired
+    private MqttMsgService mqttMsgService;
+
+    @ApiOperation("发送消息")
+    @PostMapping("/sendMsg")
+    public ResponseModel<?> sendMsg(@RequestBody MqttMsgVo mqttMsgVo){
+        String topic = mqttMsgVo.getTopic();
+        String message = mqttMsgVo.getMessage();
+        Integer qos = mqttMsgVo.getQos();
+        mqttService.sendMessage(topic, message, qos);
+        return R.ok();
+    }
+
+    @ApiOperation("添加订阅主题")
+    @PostMapping("/addTopic")
+    public ResponseModel<?> addTopic(@RequestBody MqttTopics mqttTopics){
+        mqttTopics.setCreateDate(new Date());
+        mqttTopicsService.save(mqttTopics);
+        mqttService.subscribe(mqttTopics.getTopic(), mqttTopics.getQos());
+        return R.ok(mqttTopics.getTopic());
+    }
+
+    @ApiOperation("删除订阅主题")
+    @PostMapping("/delTopic")
+    public ResponseModel<?> delTopic(@RequestBody MqttTopics mqtt) {
+        LambdaQueryWrapper<MqttTopics> query = Wrappers.lambdaQuery();
+        query.eq(MqttTopics::getTopic, mqtt.getTopic());
+        MqttTopics mqttTopic = mqttTopicsService.getOne(query);
+        if(mqttTopic != null) {
+            mqttTopicsService.removeById(mqttTopic);
+            mqttService.unsubscribe(mqttTopic.getTopic());
+            return R.ok();
+        } else {
+            return R.fail("主题不存在");
+        }
+    }
+
+    @ApiOperation("获取订阅列表")
+    @GetMapping("/getTopicList")
+    public ResponseModel<?> getTopicList(){
+        return R.ok(mqttTopicsService.list());
+    }
+
+    @ApiOperation("获取消息列表")
+    @PostMapping("/getMsgList")
+    public ResponseModel<IPage<?>> getMsgList(@RequestBody MqttMsgParam param){
+        LambdaQueryWrapper<MqttMsg> query = Wrappers.lambdaQuery();
+        if (StringUtils.isNotEmpty(param.getStartTime()) && StringUtils.isNotEmpty(param.getEndTime())) {
+            query.between(MqttMsg::getCreateDate, param.getStartTime(), param.getEndTime());
+        }
+        query.orderByDesc(MqttMsg::getCreateDate);
+        Page<MqttMsg> page = new Page<>(param.getCurrent(), param.getSize(), true);
+        Page<MqttMsg> recordPage = mqttMsgService.page(page, query);
+        return R.ok(recordPage);
+    }
+
+}
+

+ 26 - 0
src/main/java/com/szwl/event/MqttMessageEvent.java

@@ -0,0 +1,26 @@
+package com.szwl.event;
+
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * MQTT消息事件类
+ * @author: soobin
+ */
+public class MqttMessageEvent extends ApplicationEvent {
+    private final String topic;
+    private final String payload;
+
+    public MqttMessageEvent(Object source, String topic, String payload) {
+        super(source);
+        this.topic = topic;
+        this.payload = payload;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public String getPayload() {
+        return payload;
+    }
+}

+ 51 - 0
src/main/java/com/szwl/event/MqttMessageEventListener.java

@@ -0,0 +1,51 @@
+package com.szwl.event;
+
+import com.alibaba.fastjson.JSONObject;
+import com.szwl.model.entity.MqttMsg;
+import com.szwl.service.MqttMsgService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+
+@Slf4j
+@Component
+public class MqttMessageEventListener {
+
+    @Autowired
+    private MqttMsgService mqttMsgService;
+
+    /**
+     * 监听主题:/connected,/disconnected,作用:监听设备上线、下线
+     */
+    @EventListener
+    public void connectionMessage(MqttMessageEvent event) {
+        String topic = event.getTopic();
+        String result = topic.substring(topic.lastIndexOf("/") + 1);
+        String payload = event.getPayload();
+        MqttMsg mqttMsg = new MqttMsg();
+        mqttMsg.setCreateDate(new Date());
+        if (result.equals("connected")) {
+            mqttMsg.setTopic(result);
+            mqttMsg.setMessage("设备上线");
+            JSONObject jsonObject = JSONObject.parseObject(payload);
+            String clientId = jsonObject.getString("clientid");
+            mqttMsg.setClientId(clientId);
+        } else if (result.equals("disconnected")) {
+            mqttMsg.setTopic(result);
+            mqttMsg.setMessage("设备离线");
+            JSONObject jsonObject = JSONObject.parseObject(payload);
+            String clientId = jsonObject.getString("clientid");
+            mqttMsg.setClientId(clientId);
+        } else {
+            log.info("主题:{},消息:{}", topic, payload);
+            mqttMsg.setTopic(topic);
+            mqttMsg.setMessage(payload);
+        }
+        mqttMsgService.save(mqttMsg);
+    }
+
+    // 更多主题的处理逻辑
+}

+ 16 - 0
src/main/java/com/szwl/mapper/MqttMsgMapper.java

@@ -0,0 +1,16 @@
+package com.szwl.mapper;
+
+import com.szwl.model.entity.MqttMsg;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+/**
+ * <p>
+ *  Mapper 接口
+ * </p>
+ *
+ * @author wuhs
+ * @since 2024-10-17
+ */
+public interface MqttMsgMapper extends BaseMapper<MqttMsg> {
+
+}

+ 16 - 0
src/main/java/com/szwl/mapper/MqttTopicsMapper.java

@@ -0,0 +1,16 @@
+package com.szwl.mapper;
+
+import com.szwl.model.entity.MqttTopics;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+/**
+ * <p>
+ *  Mapper 接口
+ * </p>
+ *
+ * @author wuhs
+ * @since 2024-10-17
+ */
+public interface MqttTopicsMapper extends BaseMapper<MqttTopics> {
+
+}

+ 19 - 0
src/main/java/com/szwl/mapper/xml/MqttMsgMapper.xml

@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.szwl.mapper.MqttMsgMapper">
+
+    <!-- 通用查询映射结果 -->
+    <resultMap id="BaseResultMap" type="com.szwl.model.entity.MqttMsg">
+        <id column="id" property="id" />
+        <result column="create_date" property="createDate" />
+        <result column="topic" property="topic" />
+        <result column="message" property="message" />
+        <result column="client_id" property="clientId" />
+    </resultMap>
+
+    <!-- 通用查询结果列 -->
+    <sql id="Base_Column_List">
+        id, create_date, topic, message, client_id
+    </sql>
+
+</mapper>

+ 19 - 0
src/main/java/com/szwl/mapper/xml/MqttTopicsMapper.xml

@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.szwl.mapper.MqttTopicsMapper">
+
+    <!-- 通用查询映射结果 -->
+    <resultMap id="BaseResultMap" type="com.szwl.model.entity.MqttTopics">
+        <id column="id" property="id" />
+        <result column="create_date" property="createDate" />
+        <result column="topic" property="topic" />
+        <result column="qos" property="qos" />
+        <result column="topic_desc" property="topicDesc" />
+    </resultMap>
+
+    <!-- 通用查询结果列 -->
+    <sql id="Base_Column_List">
+        id, create_date, topic, qos, topic_desc
+    </sql>
+
+</mapper>

+ 17 - 0
src/main/java/com/szwl/model/dto/MqttMsgVo.java

@@ -0,0 +1,17 @@
+package com.szwl.model.dto;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+@Data
+public class MqttMsgVo {
+
+    @ApiModelProperty(value = "主题")
+    private String topic;
+
+    @ApiModelProperty(value = "消息")
+    private String message;
+
+    @ApiModelProperty(value = "等级")
+    private Integer qos;
+}

+ 42 - 0
src/main/java/com/szwl/model/entity/MqttMsg.java

@@ -0,0 +1,42 @@
+package com.szwl.model.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import java.util.Date;
+import com.baomidou.mybatisplus.annotation.TableId;
+import java.io.Serializable;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+/**
+ * <p>
+ * 
+ * </p>
+ *
+ * @author wuhs
+ * @since 2024-10-17
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+@ApiModel(value="MqttMsg对象", description="")
+public class MqttMsg implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @TableId(value = "id", type = IdType.AUTO)
+    private Long id;
+
+    private Date createDate;
+
+    @ApiModelProperty(value = "主题")
+    private String topic;
+
+    @ApiModelProperty(value = "消息")
+    private String message;
+
+    @ApiModelProperty(value = "设备编号")
+    private String clientId;
+
+
+}

+ 42 - 0
src/main/java/com/szwl/model/entity/MqttTopics.java

@@ -0,0 +1,42 @@
+package com.szwl.model.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import java.util.Date;
+import com.baomidou.mybatisplus.annotation.TableId;
+import java.io.Serializable;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+/**
+ * <p>
+ * 
+ * </p>
+ *
+ * @author wuhs
+ * @since 2024-10-17
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+@ApiModel(value="MqttTopics对象", description="")
+public class MqttTopics implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @TableId(value = "id", type = IdType.AUTO)
+    private Long id;
+
+    private Date createDate;
+
+    @ApiModelProperty(value = "主题名")
+    private String topic;
+
+    @ApiModelProperty(value = "消息等级")
+    private Integer qos;
+
+    @ApiModelProperty(value = "主题描述")
+    private String topicDesc;
+
+
+}

+ 16 - 0
src/main/java/com/szwl/model/param/MqttMsgParam.java

@@ -0,0 +1,16 @@
+package com.szwl.model.param;
+
+import com.szwl.model.entity.MqttMsg;
+import lombok.Data;
+
+@Data
+public class MqttMsgParam extends MqttMsg {
+
+    private long current;
+
+    private long size;
+
+    private String startTime;
+
+    private String endTime;
+}

+ 16 - 0
src/main/java/com/szwl/service/MqttMsgService.java

@@ -0,0 +1,16 @@
+package com.szwl.service;
+
+import com.szwl.model.entity.MqttMsg;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+/**
+ * <p>
+ *  服务类
+ * </p>
+ *
+ * @author wuhs
+ * @since 2024-10-17
+ */
+public interface MqttMsgService extends IService<MqttMsg> {
+
+}

+ 16 - 0
src/main/java/com/szwl/service/MqttTopicsService.java

@@ -0,0 +1,16 @@
+package com.szwl.service;
+
+import com.szwl.model.entity.MqttTopics;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+/**
+ * <p>
+ *  服务类
+ * </p>
+ *
+ * @author wuhs
+ * @since 2024-10-17
+ */
+public interface MqttTopicsService extends IService<MqttTopics> {
+
+}

+ 52 - 0
src/main/java/com/szwl/service/base/MqttService.java

@@ -0,0 +1,52 @@
+package com.szwl.service.base;
+
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+public class MqttService {
+
+    @Autowired
+    private MqttClient mqttClient;
+
+    /**
+     * 发送消息
+     * @param topic
+     * @param message
+     */
+    public void sendMessage(String topic, String message, int qos) {
+        try {
+            mqttClient.publish(topic, message.getBytes(), qos, false);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 订阅主题
+     * @param topic
+     * @param qos
+     */
+    public void subscribe(String topic, int qos) {
+        try {
+            mqttClient.subscribe(topic, qos);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 取消订阅主题
+     */
+    public void unsubscribe(String topic) {
+        try {
+            mqttClient.unsubscribe(topic);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+}

+ 20 - 0
src/main/java/com/szwl/service/impl/MqttMsgServiceImpl.java

@@ -0,0 +1,20 @@
+package com.szwl.service.impl;
+
+import com.szwl.model.entity.MqttMsg;
+import com.szwl.mapper.MqttMsgMapper;
+import com.szwl.service.MqttMsgService;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import org.springframework.stereotype.Service;
+
+/**
+ * <p>
+ *  服务实现类
+ * </p>
+ *
+ * @author wuhs
+ * @since 2024-10-17
+ */
+@Service
+public class MqttMsgServiceImpl extends ServiceImpl<MqttMsgMapper, MqttMsg> implements MqttMsgService {
+
+}

+ 20 - 0
src/main/java/com/szwl/service/impl/MqttTopicsServiceImpl.java

@@ -0,0 +1,20 @@
+package com.szwl.service.impl;
+
+import com.szwl.model.entity.MqttTopics;
+import com.szwl.mapper.MqttTopicsMapper;
+import com.szwl.service.MqttTopicsService;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import org.springframework.stereotype.Service;
+
+/**
+ * <p>
+ *  服务实现类
+ * </p>
+ *
+ * @author wuhs
+ * @since 2024-10-17
+ */
+@Service
+public class MqttTopicsServiceImpl extends ServiceImpl<MqttTopicsMapper, MqttTopics> implements MqttTopicsService {
+
+}

+ 17 - 4
src/main/resources/bootstrap.yml

@@ -31,17 +31,17 @@ management:
 oauth:
   wx:
     #测试帐号
-#    appid: wx6959f112e9ffbfa3
-#    appsecret: 32f6fdf12bc25361110e1786f386eaab
+    #    appid: wx6959f112e9ffbfa3
+    #    appsecret: 32f6fdf12bc25361110e1786f386eaab
     #正式账户
     appid: wxcd5b1b2636c9f611
     appsecret: e2854aa99f8279f33b4f065b2ffb75b1
 
   callback:
     #个人测试
-#    http: http://d.freehk.svipss.top
+    #    http: http://d.freehk.svipss.top
     #系统测试
-#    http: http://szwltest.sunzee.com.cn
+    #    http: http://szwltest.sunzee.com.cn
     #正式服务
     http: http://szwlh.sunzee.com.cn
 
@@ -83,6 +83,12 @@ eureka:
 #  instance:
 #    prefer-ip-address: true
 #    ip-address: 112.96.106.247
+# mqtt配置-测试环境
+mqtt:
+  url: tcp://120.25.151.99:1883
+  clientId: SUNZEE
+  username: sunzeeSuper
+  password: sunzee@020
 
 ---
 ##正式环境
@@ -104,6 +110,13 @@ eureka:
   instance:
     prefer-ip-address: true
 
+#mqtt配置-正式环境
+mqtt:
+  url: tcp://112.74.33.65:1883
+  clientId: SUNZEE
+  username: sunzeeSuper
+  password: sunzee@020
+