EsTOrderService.java 16 KB


  1. package com.szwl.service.es;
  2. import cn.com.crbank.ommo.esclient.EsBaseService;
  3. import cn.com.crbank.ommo.exception.MyException;
  4. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  5. import com.baomidou.mybatisplus.core.metadata.IPage;
  6. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  7. import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
  8. import com.szwl.model.bean.ChartBean;
  9. import com.szwl.model.bean.ChartColumn;
  10. import com.szwl.model.bo.ChartType;
  11. import com.szwl.model.entity.TEquipment;
  12. import com.szwl.model.entity.TOrder;
  13. import com.szwl.model.query.StatisticsParam;
  14. import com.szwl.model.query.TCoinOrderParam;
  15. import com.szwl.model.query.TEquipmentParam;
  16. import com.szwl.model.query.TOrderParam;
  17. import com.szwl.model.utils.DateUtils;
  18. import com.szwl.service.TOrderService;
  19. import lombok.extern.slf4j.Slf4j;
  20. import org.apache.commons.collections4.CollectionUtils;
  21. import org.apache.commons.lang3.StringUtils;
  22. import org.elasticsearch.action.search.SearchRequest;
  23. import org.elasticsearch.action.search.SearchResponse;
  24. import org.elasticsearch.client.RequestOptions;
  25. import org.elasticsearch.client.indices.GetIndexRequest;
  26. import org.elasticsearch.index.query.BoolQueryBuilder;
  27. import org.elasticsearch.search.aggregations.Aggregation;
  28. import org.elasticsearch.search.aggregations.AggregationBuilder;
  29. import org.elasticsearch.search.aggregations.AggregationBuilders;
  30. import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
  31. import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
  32. import org.elasticsearch.search.aggregations.metrics.ParsedSum;
  33. import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
  34. import org.elasticsearch.search.builder.SearchSourceBuilder;
  35. import org.springframework.beans.factory.annotation.Autowired;
  36. import org.springframework.stereotype.Service;
  37. import java.math.BigDecimal;
  38. import java.math.RoundingMode;
  39. import java.text.ParseException;
  40. import java.text.SimpleDateFormat;
  41. import java.util.ArrayList;
  42. import java.util.Calendar;
  43. import java.util.Date;
  44. import java.util.List;
  45. import java.util.stream.Collectors;
  46. @Slf4j
  47. @Service
  48. public class EsTOrderService extends EsBaseService<TOrder, TOrderParam> {
  49. private static boolean InitEsTableStatus = false; // 是否正常完成 es 初始化
  50. public static final int MAX_ROW = 10000;
  51. @Autowired
  52. TOrderService tOrderService;
  53. @Autowired
  54. EsTEquipmentService esTEquipmentService;
  55. @Autowired
  56. EsTCoinOrderService esTCoinOrderService;
  57. @Override
  58. public String getTableName() {
  59. return "es_t_order";
  60. }
  61. // public String getTableName() {
  62. // return "es_t_order";
  63. // }
  64. @Override
  65. public TOrder getInstanceOfEntity() {
  66. return new TOrder();
  67. }
  68. @Override
  69. public void setInitTableStatus(boolean flag) {
  70. InitEsTableStatus = flag;
  71. }
  72. @Override
  73. public boolean getInitTableStatus() {
  74. return InitEsTableStatus;
  75. }
  76. @Override
  77. public void initTableFun() {
  78. try{
  79. String tableName = getTableName();
  80. GetIndexRequest request = new GetIndexRequest(tableName);
  81. // DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(this.getTableName());
  82. // restHighLevelClient.indices().delete(deleteIndexRequest,RequestOptions.DEFAULT);
  83. boolean isExists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
  84. if (!isExists) {
  85. log.info("es 索引 开始创建"+tableName);
  86. createTable();
  87. // 初始化旧流水
  88. int num = 0;
  89. while (true) {
  90. int limit = MAX_ROW;
  91. // int offset= num * MAX_ROW;
  92. int offset= num;
  93. // TOrderExample example = new TOrderExample();
  94. // example.setLimit(limit);
  95. // example.setOffset(offset);
  96. // List<TOrder> list = tOrderService.selectByOption(example);
  97. LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  98. SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  99. String time = "2022-06-01 00:00:00";
  100. Date date = ft.parse(time);
  101. query.gt(TOrder::getCreateDate,date);
  102. query.ne(TOrder::getStatus,"0");
  103. Page<TOrder> page = new Page<>(offset, limit, true);
  104. IPage<TOrder> iPage = tOrderService.page(page, query);
  105. List<TOrder> list = iPage.getRecords();
  106. insertBatch(list);
  107. num++;
  108. if(list.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
  109. break;
  110. }
  111. }
  112. } else {
  113. log.info("es 索引 "+tableName+" 已存在不再创建");
  114. }
  115. InitEsTableStatus = true;
  116. }catch (Exception e){
  117. log.error("ElasticsearchRunner InitEsTOrderThread 发生错误:{}" , e);
  118. throw new MyException("ElasticsearchRunner InitEsTOrderThread 发生错误:" + e.getMessage());
  119. }
  120. }
  121. @Override
  122. public String getEntityPrimaryKey(TOrder tOrder) {
  123. return String.valueOf(tOrder.getId());
  124. }
  125. @Override
  126. public TOrder setEntityPrimaryKey(TOrder tOrder, String value) {
  127. return null;
  128. }
  129. // @Override
  130. // public TOrder setEntityPrimaryKey(TOrder tOrder, String value) {
  131. // return tOrder.setId(Long.parseLong(value));
  132. // }
  133. /**
  134. * 根据时间 重新同步es
  135. * @param
  136. * @return
  137. */
  138. public void updateEsByDate(String startTime, String endTime) {
  139. try {
  140. startTime = startTime.replace("/", "-");
  141. endTime = endTime.replace("/", "-");
  142. Date start = DateUtils.parseDate(startTime, DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss, new Date());
  143. Date end = DateUtils.parseDate(endTime, DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss, new Date());
  144. // 查询es 现有的数据,删除
  145. TOrderParam param = new TOrderParam();
  146. param.setCreateDate_start(start);
  147. param.setCreateDate_end(end);
  148. List<TOrder> list_es = this.selectEntityByEqualToOption(param);
  149. for (TOrder entity : list_es) {
  150. this.deleteTableById(String.valueOf(entity.getId()));
  151. }
  152. // 插入 新的
  153. // List<TOrder> list = tOrderService.selectForEs(param);
  154. LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  155. query.gt(TOrder::getCreateDate,start);
  156. query.lt(TOrder::getCreateDate,end);
  157. List<TOrder> list = tOrderService.list(query);
  158. if (CollectionUtils.isNotEmpty(list)) {
  159. insertBatch(list);
  160. }
  161. } catch (Exception e) {
  162. log.error(this.getTableName()+" updateEsByDate 发生错误:{}", e);
  163. throw new MyException(this.getTableName()+" updateEsByDate 发生错误:" + e.getMessage());
  164. }
  165. }
  166. /**
  167. * es 查询条件
  168. * @param param
  169. * @return
  170. */
  171. public BoolQueryBuilder getStatisticsParam2QueryBuilder(StatisticsParam param){
  172. BoolQueryBuilder boolQueryBuilder;
  173. String startDate = param.getStartDate().replace("/","-");
  174. String endDate = param.getEndDate().replace("/","-");
  175. Date start = DateUtils.parseDate(startDate+" 00:00:00",DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss,new Date());
  176. Date end = DateUtils.parseDate(endDate+" 23:59:59",DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss,new Date());
  177. if(StringUtils.equals("0",param.getIfForeign())){ // 国内用户
  178. TOrderParam tOrderParam = new TOrderParam();
  179. tOrderParam.setStatus(1);
  180. tOrderParam.setCreateDate_start(start);
  181. tOrderParam.setCreateDate_end(end);
  182. if(StringUtils.isNotEmpty(param.getAdminId())){ // 所属商家id
  183. tOrderParam.setAdminId(Long.parseLong(param.getAdminId()));
  184. }
  185. if(StringUtils.isNotEmpty(param.getEquipmentId())){ // 设备id
  186. tOrderParam.setEquipmentId(Long.parseLong(param.getEquipmentId()));
  187. }
  188. if (null != param.getAgencyId()){
  189. tOrderParam.setAgencyId(param.getAgencyId());
  190. tOrderParam.setType_gt(1);
  191. }
  192. if(null != param.getMerchantId()){
  193. tOrderParam.setMerchantId(param.getMerchantId());
  194. tOrderParam.setType_gt(2);
  195. }
  196. boolQueryBuilder = this.getParam2QueryBuilder(tOrderParam);
  197. }else{ // 国外用户
  198. TEquipmentParam tEquipmentParam = new TEquipmentParam();
  199. if(StringUtils.isNotEmpty(param.getEquipmentId())){ // 设备id
  200. tEquipmentParam.setId(Long.parseLong(param.getEquipmentId()));
  201. }
  202. List<TEquipment> equipmentList = esTEquipmentService.selectEntityByEqualToOption(tEquipmentParam);
  203. List<String> clientIdList = equipmentList.stream().map(TEquipment::getClientId).collect(Collectors.toList());
  204. TCoinOrderParam tCoinOrderParam = new TCoinOrderParam();
  205. tCoinOrderParam.setCreateDate_start(start);
  206. tCoinOrderParam.setCreateDate_end(end);
  207. tCoinOrderParam.setClientId_inList(clientIdList);
  208. if(StringUtils.isNotEmpty(param.getAdminId())){ // 所属商家id
  209. tCoinOrderParam.setAdminId(Long.parseLong(param.getAdminId()));
  210. }
  211. boolQueryBuilder = esTCoinOrderService.getParam2QueryBuilder(tCoinOrderParam);
  212. }
  213. return boolQueryBuilder;
  214. }
  215. public ChartColumn getStatistics(StatisticsParam param) {
  216. String msg = "";
  217. String format = "yyyy-MM";
  218. DateHistogramInterval interval = DateHistogramInterval.DAY;
  219. if (ChartType.day.toString().equals(param.getChartType())) {
  220. interval = DateHistogramInterval.HOUR;
  221. format = "HH点";
  222. msg = "日统计";
  223. }
  224. if (ChartType.week.toString().equals(param.getChartType())) {
  225. interval = DateHistogramInterval.DAY;
  226. format = "yyyy-MM-dd";
  227. msg = "周统计";
  228. }
  229. if (ChartType.month.toString().equals(param.getChartType())) {
  230. interval = DateHistogramInterval.DAY;
  231. format = "MM月dd";
  232. msg = "月统计";
  233. }
  234. if (ChartType.year.toString().equals(param.getChartType())) {
  235. interval = DateHistogramInterval.MONTH;
  236. format = "MM月";
  237. msg = "年统计";
  238. }
  239. ChartColumn chartColumn = getStatistics(param,format,interval);
  240. if (ChartType.week.toString().equals(param.getChartType())) {
  241. ArrayList<String> categories = new ArrayList<>();
  242. for (String day :chartColumn.getCategories()) {
  243. int week = DateUtils.getWeek(DateUtils.parseDate(day,DateUtils.PATTERN_yyyy_MM_dd,new Date()));
  244. String finalCategories = "周" + (week==0?"日":week);
  245. categories.add(finalCategories);
  246. }
  247. chartColumn.setCategories(categories);
  248. }
  249. return chartColumn;
  250. }
  251. public ChartColumn getStatistics(StatisticsParam param,String format,DateHistogramInterval interval) {
  252. try {
  253. String aggregationResultName = "aggregationResult";// 仅为名称,用以获取返回结果
  254. String aggSumName = "sales"; // 仅为名称,用以获取返回结果
  255. //Bool查询
  256. BoolQueryBuilder queryBuilder = getStatisticsParam2QueryBuilder(param);
  257. // 时间聚合
  258. AggregationBuilder dateHistogram = AggregationBuilders.dateHistogram(aggregationResultName)
  259. .field("createDate")
  260. .calendarInterval(interval)
  261. .format(format);
  262. // 根据字段price 求和 sum, sales 仅为名称,用以获取返回结果
  263. SumAggregationBuilder sumAggregationBuilder = AggregationBuilders.sum(aggSumName).field("price");
  264. dateHistogram.subAggregation(sumAggregationBuilder);
  265. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  266. // 指定size为0,不返回文档 因为只需要数量
  267. sourceBuilder.query(queryBuilder).aggregation(dateHistogram).size(0);
  268. SearchRequest searchRequest = new SearchRequest(getTableName());
  269. searchRequest.source(sourceBuilder);
  270. log.debug("sourceBuilder:{}", sourceBuilder);
  271. SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  272. Aggregation agg = searchResponse.getAggregations().get(aggregationResultName);
  273. List<? extends Histogram.Bucket> buckets = ((Histogram) agg).getBuckets();
  274. List<ChartBean> chartBeanList = new ArrayList<>();
  275. for (Histogram.Bucket bucket : buckets) {
  276. ChartBean chartBean = new ChartBean();
  277. chartBean.setCategorie(bucket.getKeyAsString());
  278. chartBean.setSaleNum(Float.valueOf(bucket.getDocCount()));
  279. // chartBean.setSaleNum(Long.valueOf(bucket.getDocCount()).intValue());
  280. ParsedSum sum = bucket.getAggregations().get(aggSumName);
  281. String s = format1((float) sum.getValue());
  282. chartBean.setSalePrice(Float.valueOf(s));
  283. // chartBean.setSalePrice(Double.valueOf(sum.getValue()).intValue());
  284. chartBeanList.add(chartBean);
  285. }
  286. ChartColumn chartColumn = new ChartColumn(chartBeanList);
  287. return chartColumn;
  288. } catch (Exception e) {
  289. log.error(getTableName() + " es根据条件 聚合查询 报错:{}", e);
  290. throw new MyException(getTableName() + " es根据条件 聚合查询 报错:" + e.getMessage());
  291. }
  292. }
  293. public static String format1(Float value){
  294. BigDecimal bd = new BigDecimal(value);//创建一个bd对象,将要转换的值value传入构造函数
  295. bd = bd.setScale(2, RoundingMode.HALF_UP);//调用setScale方法进行数据格式化,保留两位小数,采用四舍五入规则
  296. return bd.toString(); //返回bd对象的值(转化为string形式)
  297. }
  298. public void tongbuByHour() throws ParseException {
  299. String startDate = null;
  300. String endDate = null;
  301. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  302. Calendar calendar = Calendar.getInstance();
  303. calendar.setTime(new Date());
  304. endDate = sdf.format(calendar.getTime());
  305. calendar.add(calendar.HOUR_OF_DAY, -1);
  306. // calendar.add(calendar.DATE, -4);
  307. startDate = sdf.format(calendar.getTime());
  308. Date start = sdf.parse(startDate);
  309. Date end = sdf.parse(endDate);
  310. // 查询es 现有的数据
  311. // TOrderExample example = new TOrderExample();
  312. // TOrderExample.Criteria criteria = example.createCriteria();
  313. // criteria.andCreateDateGreaterThanOrEqualTo(start);
  314. // criteria.andCreateDateLessThanOrEqualTo(end);
  315. // criteria.andEsEqualTo("1");
  316. // List<TOrder> list_es = tOrderService.selectByOption(example);
  317. LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  318. query.gt(TOrder::getCreateDate,start);
  319. query.lt(TOrder::getCreateDate,end);
  320. List<TOrder> list_es = tOrderService.list(query);
  321. if (CollectionUtils.isNotEmpty(list_es)) {
  322. insertBatch(list_es);
  323. if(list_es.size()>0){
  324. for(TOrder order : list_es){
  325. try {
  326. updateDataById(order);
  327. } catch (Exception e) {
  328. e.printStackTrace();
  329. }
  330. }
  331. }
  332. }
  333. }
  334. }