EsTCoinOrderService.java 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package com.szwl.service.es;
  2. import cn.com.crbank.ommo.esclient.EsBaseService;
  3. import cn.com.crbank.ommo.exception.MyException;
  4. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  5. import com.baomidou.mybatisplus.core.metadata.IPage;
  6. import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
  7. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  8. import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
  9. import com.szwl.model.entity.TCoinOrder;
  10. import com.szwl.model.query.TCoinOrderParam;
  11. import com.szwl.model.utils.DateUtils;
  12. import com.szwl.service.TCoinOrderService;
  13. import lombok.extern.slf4j.Slf4j;
  14. //import org.apache.commons.collections4.CollectionUtils;
  15. import org.elasticsearch.client.RequestOptions;
  16. import org.elasticsearch.client.indices.GetIndexRequest;
  17. import org.springframework.beans.factory.annotation.Autowired;
  18. import org.springframework.stereotype.Service;
  19. import java.text.ParseException;
  20. import java.text.SimpleDateFormat;
  21. import java.util.Calendar;
  22. import java.util.Date;
  23. import java.util.List;
  24. @Slf4j
  25. @Service
  26. public class EsTCoinOrderService extends EsBaseService<TCoinOrder, TCoinOrderParam> {
  27. private static boolean InitEsTableStatus = false; // 是否正常完成 es 初始化
  28. public static final int MAX_ROW = 10000;
  29. @Autowired
  30. TCoinOrderService tCoinOrderService;
  31. @Override
  32. public String getTableName() {
  33. return "es_t_coin_order";
  34. }
  35. @Override
  36. public TCoinOrder getInstanceOfEntity() {
  37. return new TCoinOrder();
  38. }
  39. @Override
  40. public void setInitTableStatus(boolean flag) {
  41. InitEsTableStatus = flag;
  42. }
  43. @Override
  44. public boolean getInitTableStatus() {
  45. return InitEsTableStatus;
  46. }
  47. @Override
  48. public String getEntityPrimaryKey(TCoinOrder tCoinOrder) {
  49. return String.valueOf(tCoinOrder.getId());
  50. }
  51. @Override
  52. public TCoinOrder setEntityPrimaryKey(TCoinOrder tCoinOrder, String value) {
  53. return null;
  54. }
  55. // @Override
  56. // public TCoinOrder setEntityPrimaryKey(TCoinOrder tCoinOrder, String value) {
  57. // return tCoinOrder.setId(Long.parseLong(value));
  58. // }
  59. @Override
  60. public void initTableFun() {
  61. try{
  62. String tableName = getTableName();
  63. GetIndexRequest request = new GetIndexRequest(tableName);
  64. // DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(this.getTableName());
  65. // restHighLevelClient.indices().delete(deleteIndexRequest,RequestOptions.DEFAULT);
  66. boolean isExists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
  67. if (!isExists) {
  68. log.info("es 索引 开始创建"+tableName);
  69. createTable();
  70. // 初始化旧流水
  71. int num = 0;
  72. while (true) {
  73. int limit = MAX_ROW;
  74. // int offset= num * MAX_ROW;
  75. int offset= num;
  76. // TCoinOrderExample example = new TCoinOrderExample();
  77. // example.setLimit(limit);
  78. // example.setOffset(offset);
  79. // List<TCoinOrder> list = tCoinOrderService.selectByOption(example);
  80. LambdaQueryWrapper<TCoinOrder> query = Wrappers.lambdaQuery();
  81. SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  82. String time = "2022-06-01 00:00:00";
  83. Date date = ft.parse(time);
  84. query.gt(TCoinOrder::getCreateDate,date);
  85. Page<TCoinOrder> page = new Page<>(offset, limit, true);
  86. IPage<TCoinOrder> iPage = tCoinOrderService.page(page, query);
  87. List<TCoinOrder> list = iPage.getRecords();
  88. insertBatch(list);
  89. num++;
  90. if(list.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
  91. break;
  92. }
  93. }
  94. } else {
  95. log.info("es 索引 "+tableName+" 已存在不再创建");
  96. }
  97. InitEsTableStatus = true;
  98. }catch (Exception e){
  99. log.error("ElasticsearchRunner InitEsTCoinOrderThread 发生错误:{}" , e);
  100. throw new MyException("ElasticsearchRunner InitEsTCoinOrderThread 发生错误:" + e.getMessage());
  101. }
  102. }
  103. /**
  104. * 根据时间 重新同步es
  105. * @param
  106. * @return
  107. */
  108. public void updateEsByDate(String startTime, String endTime) {
  109. try {
  110. startTime = startTime.replace("/", "-");
  111. endTime = endTime.replace("/", "-");
  112. Date start = DateUtils.parseDate(startTime, DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss, new Date());
  113. Date end = DateUtils.parseDate(endTime, DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss, new Date());
  114. // 查询es 现有的数据,删除
  115. TCoinOrderParam param = new TCoinOrderParam();
  116. param.setCreateDate_start(start);
  117. param.setCreateDate_end(end);
  118. List<TCoinOrder> list_es = this.selectEntityByEqualToOption(param);
  119. for (TCoinOrder entity : list_es) {
  120. this.deleteTableById(String.valueOf(entity.getId()));
  121. }
  122. // 插入 新的
  123. // List<TCoinOrder> list = tCoinOrderService.selectForEs(param);
  124. LambdaQueryWrapper<TCoinOrder> query = Wrappers.lambdaQuery();
  125. query.gt(TCoinOrder::getCreateDate,start);
  126. query.lt(TCoinOrder::getCreateDate,end);
  127. List<TCoinOrder> list = tCoinOrderService.list(query);
  128. if (CollectionUtils.isNotEmpty(list)) {
  129. insertBatch(list);
  130. }
  131. } catch (Exception e) {
  132. log.error(this.getTableName()+" updateEsByDate 发生错误:{}", e);
  133. throw new MyException(this.getTableName()+" updateEsByDate 发生错误:" + e.getMessage());
  134. }
  135. }
  136. public void tongbuByHour() throws ParseException {
  137. String startDate = null;
  138. String endDate = null;
  139. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  140. Calendar calendar = Calendar.getInstance();
  141. calendar.setTime(new Date());
  142. endDate = sdf.format(calendar.getTime());
  143. calendar.add(calendar.HOUR_OF_DAY, -1);
  144. startDate = sdf.format(calendar.getTime());
  145. Date start = sdf.parse(startDate);
  146. Date end = sdf.parse(endDate);
  147. // 查询es 现有的数据
  148. // TCoinOrderExample example = new TCoinOrderExample();
  149. // TCoinOrderExample.Criteria criteria = example.createCriteria();
  150. // criteria.andCreateDateGreaterThanOrEqualTo(start);
  151. // criteria.andCreateDateLessThanOrEqualTo(end);
  152. // criteria.andEsEqualTo("1");
  153. // List<TCoinOrder> list_es = tCoinOrderService.selectByOption(example);
  154. LambdaQueryWrapper<TCoinOrder> query = Wrappers.lambdaQuery();
  155. query.gt(TCoinOrder::getCreateDate,start);
  156. query.lt(TCoinOrder::getCreateDate,end);
  157. List<TCoinOrder> list_es = tCoinOrderService.list(query);
  158. if (CollectionUtils.isNotEmpty(list_es)) {
  159. // insertBatch(list_es);
  160. if(list_es.size()>0){
  161. for(TCoinOrder coinOrder : list_es){
  162. try {
  163. updateDataById(coinOrder);
  164. } catch (Exception e) {
  165. e.printStackTrace();
  166. }
  167. }
  168. }
  169. }
  170. }
  171. }