|
@@ -45,42 +45,41 @@ public class ResponseMessageHandler implements MqttMessageHandler{
|
|
|
// 1. 解析基础响应
|
|
|
MessageLog messageLog = JSON.parseObject(payload, MessageLog.class);
|
|
|
operationType = messageLog.getOperation();
|
|
|
- if (StringUtils.isNotEmpty(operationType)) {
|
|
|
- // 2. 确定是否为主动上报消息
|
|
|
- Integer direction = messageLog.getDirection();
|
|
|
- if (direction != null && direction == 2) {
|
|
|
- // 3. 主动上报消息,直接添加消息日志
|
|
|
- if(!operationType.equals(OperationType.HEART_BEAT.getCode())) {
|
|
|
- // 心跳不需要保存消息日志
|
|
|
- messageLog.setMsgId(System.currentTimeMillis() + RandomUtil.randomNumbers(10));
|
|
|
- messageLog.setCreatedAt(new Date());
|
|
|
- messageLogService.save(messageLog);
|
|
|
- }
|
|
|
- } else {
|
|
|
- // 3. 响应消息,查询原始消息记录
|
|
|
- String msgId = messageLog.getMsgId();
|
|
|
- LambdaQueryWrapper<MessageLog> query = Wrappers.lambdaQuery();
|
|
|
- query.eq(MessageLog::getMsgId, msgId);
|
|
|
- MessageLog message = messageLogService.getOne(query);
|
|
|
- if (message != null) {
|
|
|
- operationType = message.getOperation();
|
|
|
- // 4. 更新消息日志
|
|
|
- message.setStatusCode(messageLog.getStatusCode());
|
|
|
- message.setResponseContent(messageLog.getResponseContent());
|
|
|
- message.setErrorInfo(messageLog.getErrorInfo());
|
|
|
- messageLogService.updateById(message);
|
|
|
- }
|
|
|
+ // 2. 确定是否为主动上报消息
|
|
|
+ Integer direction = messageLog.getDirection();
|
|
|
+ if (direction != null && direction == 2) {
|
|
|
+ // 3. 主动上报消息,直接添加消息日志
|
|
|
+ if(!operationType.equals(OperationType.HEART.getCode())) {
|
|
|
+ // 心跳不需要保存消息日志
|
|
|
+ messageLog.setMsgId(System.currentTimeMillis() + RandomUtil.randomNumbers(10));
|
|
|
+ messageLog.setCreatedAt(new Date());
|
|
|
+ messageLogService.save(messageLog);
|
|
|
}
|
|
|
-
|
|
|
- // 5. 处理业务逻辑
|
|
|
- ResponseProcessor processor = processorMap.get(operationType);
|
|
|
- if (processor != null) {
|
|
|
- processor.process(messageLog);
|
|
|
- } else {
|
|
|
- log.warn("未找到匹配的处理器,操作类型:{}", operationType);
|
|
|
+ } else {
|
|
|
+ // 3. 响应消息,查询原始消息记录
|
|
|
+ String msgId = messageLog.getMsgId();
|
|
|
+ LambdaQueryWrapper<MessageLog> query = Wrappers.lambdaQuery();
|
|
|
+ query.eq(MessageLog::getMsgId, msgId);
|
|
|
+ MessageLog message = messageLogService.getOne(query);
|
|
|
+ if (message != null) {
|
|
|
+ operationType = message.getOperation();
|
|
|
+ // 4. 更新消息日志
|
|
|
+ message.setStatusCode(messageLog.getStatusCode());
|
|
|
+ message.setResponseContent(messageLog.getResponseContent());
|
|
|
+ message.setErrorInfo(messageLog.getErrorInfo());
|
|
|
+ messageLogService.updateById(message);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // 5. 处理业务逻辑
|
|
|
+ ResponseProcessor processor = processorMap.get(operationType);
|
|
|
+ if (processor != null) {
|
|
|
+ processor.process(messageLog);
|
|
|
+ } else {
|
|
|
+ log.warn("未找到匹配的处理器,操作类型:{}", operationType);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
} catch (Exception e) {
|
|
|
log.error("消息处理失败 topic={}, payload={}", topic, payload, e);
|
|
|
}
|