RabbitMqController.java 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package com.szwl.controller;
  2. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  3. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  4. import com.szwl.model.bo.JsonMessage;
  5. import com.szwl.model.entity.TEquipment;
  6. import com.szwl.service.TAdminService;
  7. import com.szwl.service.TEquipmentService;
  8. import io.swagger.annotations.Api;
  9. import org.apache.commons.lang.StringUtils;
  10. import org.springframework.amqp.core.*;
  11. import org.springframework.amqp.rabbit.core.RabbitAdmin;
  12. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import org.springframework.stereotype.Controller;
  15. import org.springframework.web.bind.annotation.GetMapping;
  16. import org.springframework.web.bind.annotation.RequestMapping;
  17. import org.springframework.web.bind.annotation.ResponseBody;
  18. import org.springframework.web.bind.annotation.RestController;
  19. import javax.annotation.Resource;
  20. import java.util.HashMap;
  21. import java.util.List;
  22. import java.util.Map;
  23. @Api(value = "/api/app_mq/rabbitMq", tags = {"MQ"})
  24. @RestController
  25. @RequestMapping("/api/app_mq/rabbitMq")
  26. //@RequestMapping("/rabbitMqController")
  27. public class RabbitMqController {
  28. @Autowired
  29. TEquipmentService tEquipmentService;
  30. @Autowired
  31. TAdminService tAdminService;
  32. // @Autowired
  33. // private RabbitAdmin rabbitAdmin;
  34. @Autowired
  35. private AmqpAdmin amqpAdmin;
  36. // @Autowired
  37. // private RabbitTemplate rabbitTemplate;
  38. /**
  39. *添加交换机
  40. * @param
  41. * @return
  42. */
  43. @GetMapping(value = "/addExchange.htm")
  44. @ResponseBody
  45. public String addExchange(String exchange,String type,Boolean durable){
  46. if(StringUtils.isEmpty(type)){
  47. return "type is null";
  48. }
  49. if(type.equals("Direct")){
  50. amqpAdmin.declareExchange(new DirectExchange(exchange, durable, false));
  51. }
  52. if(type.equals("Topic")){
  53. amqpAdmin.declareExchange(new TopicExchange(exchange, durable, false));
  54. }
  55. if(type.equals("Fanout")){
  56. amqpAdmin.declareExchange(new FanoutExchange(exchange, durable, false));
  57. }
  58. return "success";
  59. // return JsonMessage.success("成功");
  60. }
  61. /**
  62. *添加队列并绑定
  63. * @param
  64. * @return
  65. */
  66. @GetMapping(value = "/addQueue.htm")
  67. @ResponseBody
  68. public String addQueue(String exchange,String queue,String routingKey,Boolean durable){
  69. Map<String,Object> arg = new HashMap<>();
  70. arg.put("x-message-ttl",1800000);
  71. String queue1 = amqpAdmin.declareQueue(new Queue(queue, durable, false, false, arg));
  72. HashMap<String, Object> objectObjectHashMap = new HashMap<>();
  73. amqpAdmin.declareBinding(new Binding(queue,
  74. Binding.DestinationType.QUEUE,
  75. exchange, routingKey, objectObjectHashMap));
  76. LambdaQueryWrapper<TEquipment> query = Wrappers.lambdaQuery();
  77. query.eq(TEquipment::getClientId,routingKey);
  78. List<TEquipment> list = tEquipmentService.list(query);
  79. if(list.size()>0){
  80. TEquipment equipment = list.get(0);
  81. equipment.setEquimentType(exchange);
  82. tEquipmentService.updateById(equipment);
  83. }
  84. // return JsonMessage.success("成功");
  85. return "success";
  86. }
  87. /**
  88. *修改信道
  89. * @param
  90. * @return
  91. */
  92. @GetMapping(value = "/updateChannel.htm")
  93. @ResponseBody
  94. public String updateChannel(String clientId,String type){
  95. LambdaQueryWrapper<TEquipment> query = Wrappers.lambdaQuery();
  96. query.eq(TEquipment::getClientId,clientId);
  97. List<TEquipment> list = tEquipmentService.list(query);
  98. if(list.size()>0){
  99. TEquipment equipment = list.get(0);
  100. equipment.setChannel(type);
  101. tEquipmentService.updateById(equipment);
  102. }
  103. return "success";
  104. }
  105. /**
  106. * 修改机型重新创建MQ队列
  107. * @param
  108. * @return
  109. */
  110. @GetMapping(value = "/updateEquipmentType")
  111. @ResponseBody
  112. public String updateEquipmentType(String clientId, String equipmentType){
  113. if (StringUtils.isNotEmpty(clientId) && StringUtils.isNotEmpty(equipmentType)) {
  114. // 删除MQ队列
  115. amqpAdmin.deleteQueue(clientId);
  116. // 重新创建队列
  117. Map<String,Object> arg = new HashMap<>();
  118. arg.put("x-message-ttl",1800000);
  119. amqpAdmin.declareQueue(new Queue(clientId, true, false, false, arg));
  120. HashMap<String, Object> objectObjectHashMap = new HashMap<>();
  121. amqpAdmin.declareBinding(new Binding(clientId,
  122. Binding.DestinationType.QUEUE,
  123. equipmentType, clientId, objectObjectHashMap));
  124. LambdaQueryWrapper<TEquipment> query = Wrappers.lambdaQuery();
  125. query.eq(TEquipment::getClientId, clientId);
  126. TEquipment equipment = tEquipmentService.getOne(query);
  127. if(equipment != null){
  128. equipment.setEquimentType(equipmentType);
  129. tEquipmentService.updateById(equipment);
  130. return "success";
  131. }
  132. return "找不到设备";
  133. }
  134. return "参数为空";
  135. }
  136. }