EsTOrderService.java 22 KB


  1. package com.szwl.service.es;
  2. import cn.com.crbank.ommo.esclient.EsBaseService;
  3. import cn.com.crbank.ommo.exception.MyException;
  4. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  5. import com.baomidou.mybatisplus.core.metadata.IPage;
  6. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  7. import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
  8. import com.szwl.model.bean.ChartBean;
  9. import com.szwl.model.bean.ChartColumn;
  10. import com.szwl.model.bo.ChartType;
  11. import com.szwl.model.entity.TEquipment;
  12. import com.szwl.model.entity.TOrder;
  13. import com.szwl.model.query.StatisticsParam;
  14. import com.szwl.model.query.TCoinOrderParam;
  15. import com.szwl.model.query.TEquipmentParam;
  16. import com.szwl.model.query.TOrderParam;
  17. import com.szwl.model.utils.DateUtils;
  18. import com.szwl.service.TOrderService;
  19. import lombok.extern.slf4j.Slf4j;
  20. import org.apache.commons.collections4.CollectionUtils;
  21. import org.apache.commons.lang3.StringUtils;
  22. import org.elasticsearch.action.search.SearchRequest;
  23. import org.elasticsearch.action.search.SearchResponse;
  24. import org.elasticsearch.client.RequestOptions;
  25. import org.elasticsearch.client.indices.GetIndexRequest;
  26. import org.elasticsearch.index.query.BoolQueryBuilder;
  27. import org.elasticsearch.search.aggregations.Aggregation;
  28. import org.elasticsearch.search.aggregations.AggregationBuilder;
  29. import org.elasticsearch.search.aggregations.AggregationBuilders;
  30. import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
  31. import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
  32. import org.elasticsearch.search.aggregations.metrics.ParsedSum;
  33. import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
  34. import org.elasticsearch.search.builder.SearchSourceBuilder;
  35. import org.springframework.beans.factory.annotation.Autowired;
  36. import org.springframework.stereotype.Service;
  37. import java.io.IOException;
  38. import java.lang.reflect.InvocationTargetException;
  39. import java.math.BigDecimal;
  40. import java.math.RoundingMode;
  41. import java.text.ParseException;
  42. import java.text.SimpleDateFormat;
  43. import java.util.ArrayList;
  44. import java.util.Calendar;
  45. import java.util.Date;
  46. import java.util.List;
  47. import java.util.concurrent.ExecutionException;
  48. import java.util.stream.Collectors;
  49. @Slf4j
  50. @Service
  51. public class EsTOrderService extends EsBaseService<TOrder, TOrderParam> {
  52. private static boolean InitEsTableStatus = false; // 是否正常完成 es 初始化
  53. public static final int MAX_ROW = 1000;
  54. @Autowired
  55. TOrderService tOrderService;
  56. @Autowired
  57. EsTEquipmentService esTEquipmentService;
  58. @Autowired
  59. EsTCoinOrderService esTCoinOrderService;
  60. @Override
  61. public String getTableName() {
  62. return "es_t_order";
  63. }
  64. // public String getTableName() {
  65. // return "es_t_order";
  66. // }
  67. @Override
  68. public TOrder getInstanceOfEntity() {
  69. return new TOrder();
  70. }
  71. @Override
  72. public void setInitTableStatus(boolean flag) {
  73. InitEsTableStatus = flag;
  74. }
  75. @Override
  76. public boolean getInitTableStatus() {
  77. return InitEsTableStatus;
  78. }
  79. @Override
  80. public void initTableFun() {
  81. try{
  82. String tableName = getTableName();
  83. GetIndexRequest request = new GetIndexRequest(tableName);
  84. // DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(this.getTableName());
  85. // restHighLevelClient.indices().delete(deleteIndexRequest,RequestOptions.DEFAULT);
  86. boolean isExists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
  87. if (!isExists) {
  88. log.info("es 索引 开始创建"+tableName);
  89. createTable();
  90. // 初始化旧流水
  91. int num = 0;
  92. while (true) {
  93. int limit = MAX_ROW;
  94. // int offset= num * MAX_ROW;
  95. int offset= num;
  96. // TOrderExample example = new TOrderExample();
  97. // example.setLimit(limit);
  98. // example.setOffset(offset);
  99. // List<TOrder> list = tOrderService.selectByOption(example);
  100. LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  101. SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  102. String time = "2022-07-01 00:00:00";
  103. Date date = ft.parse(time);
  104. query.gt(TOrder::getCreateDate,date);
  105. Page<TOrder> page = new Page<>(offset, limit, true);
  106. IPage<TOrder> iPage = tOrderService.page(page, query);
  107. List<TOrder> list = iPage.getRecords();
  108. insertBatch(list);
  109. num++;
  110. if(list.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
  111. break;
  112. }
  113. }
  114. } else {
  115. log.info("es 索引 "+tableName+" 已存在不再创建");
  116. }
  117. InitEsTableStatus = true;
  118. }catch (Exception e){
  119. log.error("ElasticsearchRunner InitEsTOrderThread 发生错误:{}" , e);
  120. throw new MyException("ElasticsearchRunner InitEsTOrderThread 发生错误:" + e.getMessage());
  121. }
  122. }
  123. @Override
  124. public String getEntityPrimaryKey(TOrder tOrder) {
  125. return String.valueOf(tOrder.getId());
  126. }
  127. @Override
  128. public TOrder setEntityPrimaryKey(TOrder tOrder, String value) {
  129. return null;
  130. }
  131. // @Override
  132. // public TOrder setEntityPrimaryKey(TOrder tOrder, String value) {
  133. // return tOrder.setId(Long.parseLong(value));
  134. // }
  135. /**
  136. * 根据时间 重新同步es
  137. * @param
  138. * @return
  139. */
  140. public void updateEsByDate(String startTime, String endTime) {
  141. try {
  142. startTime = startTime.replace("/", "-");
  143. endTime = endTime.replace("/", "-");
  144. Date start = DateUtils.parseDate(startTime, DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss, new Date());
  145. Date end = DateUtils.parseDate(endTime, DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss, new Date());
  146. // 查询es 现有的数据,删除
  147. TOrderParam param = new TOrderParam();
  148. param.setCreateDate_start(start);
  149. param.setCreateDate_end(end);
  150. List<TOrder> list_es = this.selectEntityByEqualToOption(param);
  151. for (TOrder entity : list_es) {
  152. this.deleteTableById(String.valueOf(entity.getId()));
  153. }
  154. // 插入 新的
  155. // List<TOrder> list = tOrderService.selectForEs(param);
  156. LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  157. query.gt(TOrder::getCreateDate,start);
  158. query.lt(TOrder::getCreateDate,end);
  159. List<TOrder> list = tOrderService.list(query);
  160. if (CollectionUtils.isNotEmpty(list)) {
  161. insertBatch(list);
  162. }
  163. } catch (Exception e) {
  164. log.error(this.getTableName()+" updateEsByDate 发生错误:{}", e);
  165. throw new MyException(this.getTableName()+" updateEsByDate 发生错误:" + e.getMessage());
  166. }
  167. }
  168. /**
  169. * es 查询条件
  170. * @param param
  171. * @return
  172. */
  173. public BoolQueryBuilder getStatisticsParam2QueryBuilder(StatisticsParam param){
  174. BoolQueryBuilder boolQueryBuilder;
  175. String startDate = param.getStartDate().replace("/","-");
  176. String endDate = param.getEndDate().replace("/","-");
  177. Date start = DateUtils.parseDate(startDate+" 00:00:00", DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss,new Date());
  178. Date end = DateUtils.parseDate(endDate+" 23:59:59", DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss,new Date());
  179. if(StringUtils.equals("0",param.getIfForeign())){ // 国内用户
  180. TOrderParam tOrderParam = new TOrderParam();
  181. tOrderParam.setStatus(1);
  182. tOrderParam.setCreateDate_start(start);
  183. tOrderParam.setCreateDate_end(end);
  184. if(StringUtils.isNotEmpty(param.getAdminId())){ // 所属商家id
  185. tOrderParam.setAdminId(Long.parseLong(param.getAdminId()));
  186. }
  187. if(StringUtils.isNotEmpty(param.getEquipmentId())){ // 设备id
  188. tOrderParam.setEquipmentId(Long.parseLong(param.getEquipmentId()));
  189. }
  190. if(param.getEquipmentIds()!=null&&param.getEquipmentIds().size()>0){ // 设备id
  191. tOrderParam.setEquipmentId_inList(param.getEquipmentIds());
  192. }
  193. if(StringUtils.isNotEmpty(param.getPayType())){ // 支付方式
  194. tOrderParam.setFrpCode(param.getPayType());
  195. }
  196. if (null != param.getAgencyId()){
  197. tOrderParam.setAgencyId(param.getAgencyId());
  198. tOrderParam.setType_gt(1);
  199. }
  200. if(null != param.getMerchantId()){
  201. tOrderParam.setMerchantId(param.getMerchantId());
  202. tOrderParam.setType_gt(2);
  203. }
  204. if(param.getAdminIds()!=null&&param.getAdminIds().size()>0){
  205. tOrderParam.setAdminId_inList(param.getAdminIds());
  206. }
  207. boolQueryBuilder = this.getParam2QueryBuilder(tOrderParam);
  208. }else{ // 国外用户
  209. TEquipmentParam tEquipmentParam = new TEquipmentParam();
  210. if(StringUtils.isNotEmpty(param.getEquipmentId())){ // 设备id
  211. tEquipmentParam.setId(Long.parseLong(param.getEquipmentId()));
  212. }
  213. List<TEquipment> equipmentList = esTEquipmentService.selectEntityByEqualToOption(tEquipmentParam);
  214. List<String> clientIdList = equipmentList.stream().map(TEquipment::getClientId).collect(Collectors.toList());
  215. TCoinOrderParam tCoinOrderParam = new TCoinOrderParam();
  216. tCoinOrderParam.setCreateDate_start(start);
  217. tCoinOrderParam.setCreateDate_end(end);
  218. tCoinOrderParam.setClientId_inList(clientIdList);
  219. if(param.getAdminIds().size()>0){
  220. tCoinOrderParam.setAdminId_inList(param.getAdminIds());
  221. }
  222. if(StringUtils.isNotEmpty(param.getAdminId())){ // 所属商家id
  223. tCoinOrderParam.setAdminId(Long.parseLong(param.getAdminId()));
  224. }
  225. if(param.getClientIds().size()>0){ // 设备
  226. tCoinOrderParam.setClientId_inList(param.getClientIds());
  227. }
  228. boolQueryBuilder = esTCoinOrderService.getParam2QueryBuilder(tCoinOrderParam);
  229. }
  230. return boolQueryBuilder;
  231. }
  232. public ChartColumn getStatistics(StatisticsParam param) {
  233. String msg = "";
  234. String format = "yyyy-MM";
  235. DateHistogramInterval interval = DateHistogramInterval.DAY;
  236. if (ChartType.day.toString().equals(param.getChartType())) {
  237. interval = DateHistogramInterval.HOUR;
  238. format = "HH点";
  239. msg = "日统计";
  240. }
  241. if (ChartType.week.toString().equals(param.getChartType())) {
  242. interval = DateHistogramInterval.DAY;
  243. format = "yyyy-MM-dd";
  244. msg = "周统计";
  245. }
  246. if (ChartType.month.toString().equals(param.getChartType())) {
  247. interval = DateHistogramInterval.DAY;
  248. format = "MM月dd";
  249. msg = "月统计";
  250. }
  251. if (ChartType.year.toString().equals(param.getChartType())) {
  252. interval = DateHistogramInterval.MONTH;
  253. format = "MM月";
  254. msg = "年统计";
  255. }
  256. ChartColumn chartColumn = getStatistics(param,format,interval);
  257. if (ChartType.week.toString().equals(param.getChartType())) {
  258. ArrayList<String> categories = new ArrayList<>();
  259. for (String day :chartColumn.getCategories()) {
  260. int week = DateUtils.getWeek(DateUtils.parseDate(day, DateUtils.PATTERN_yyyy_MM_dd,new Date()));
  261. String finalCategories = "周" + (week==0?"日":week);
  262. categories.add(finalCategories);
  263. }
  264. chartColumn.setCategories(categories);
  265. }
  266. return chartColumn;
  267. }
  268. public ChartColumn getStatistics(StatisticsParam param, String format, DateHistogramInterval interval) {
  269. try {
  270. String aggregationResultName = "aggregationResult";// 仅为名称,用以获取返回结果
  271. String aggSumName = "sales"; // 仅为名称,用以获取返回结果
  272. String aggName_productNumber = "productNumber"; // 仅为名称,用以获取返回结果
  273. //Bool查询
  274. BoolQueryBuilder queryBuilder = getStatisticsParam2QueryBuilder(param);
  275. // 时间聚合
  276. AggregationBuilder dateHistogram = AggregationBuilders.dateHistogram(aggregationResultName)
  277. .field("createDate")
  278. .calendarInterval(interval)
  279. .format(format);
  280. // 根据字段price 求和 sum, sales 仅为名称,用以获取返回结果
  281. SumAggregationBuilder sumAggregationBuilder = AggregationBuilders.sum(aggSumName).field("price");
  282. dateHistogram.subAggregation(sumAggregationBuilder);
  283. SumAggregationBuilder sumAggregationBuilder2 = AggregationBuilders.sum(aggName_productNumber).field("productNumber");
  284. dateHistogram.subAggregation(sumAggregationBuilder2);
  285. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  286. // 指定size为0,不返回文档 因为只需要数量
  287. sourceBuilder.query(queryBuilder).aggregation(dateHistogram).size(0);
  288. SearchRequest searchRequest = new SearchRequest(getTableName());
  289. searchRequest.source(sourceBuilder);
  290. log.debug("sourceBuilder:{}", sourceBuilder);
  291. SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  292. Aggregation agg = searchResponse.getAggregations().get(aggregationResultName);
  293. List<? extends Histogram.Bucket> buckets = ((Histogram) agg).getBuckets();
  294. List<ChartBean> chartBeanList = new ArrayList<>();
  295. for (Histogram.Bucket bucket : buckets) {
  296. ChartBean chartBean = new ChartBean();
  297. chartBean.setCategorie(bucket.getKeyAsString());
  298. // chartBean.setSaleNum(Float.valueOf(bucket.getDocCount()));
  299. //统计商品数
  300. ParsedSum saleNum = bucket.getAggregations().get(aggName_productNumber);
  301. String saleNumStr = format1((float) saleNum.getValue());
  302. if(StringUtils.isEmpty(saleNumStr)){
  303. saleNumStr = "1";
  304. }
  305. chartBean.setSaleNum(Float.valueOf(saleNumStr));
  306. //统计订单数
  307. chartBean.setOrderNum(Float.valueOf(bucket.getDocCount()));
  308. // chartBean.setSaleNum(Long.valueOf(bucket.getDocCount()).intValue());
  309. ParsedSum sum = bucket.getAggregations().get(aggSumName);
  310. String s = format1((float) sum.getValue());
  311. chartBean.setSalePrice(Float.valueOf(s));
  312. // chartBean.setSalePrice(Double.valueOf(sum.getValue()).intValue());
  313. chartBeanList.add(chartBean);
  314. }
  315. ChartColumn chartColumn = new ChartColumn(chartBeanList);
  316. return chartColumn;
  317. } catch (Exception e) {
  318. log.error(getTableName() + " es根据条件 聚合查询 报错:{}", e);
  319. throw new MyException(getTableName() + " es根据条件 聚合查询 报错:" + e.getMessage());
  320. }
  321. }
  322. public static String format1(Float value){
  323. BigDecimal bd = new BigDecimal(value);//创建一个bd对象,将要转换的值value传入构造函数
  324. bd = bd.setScale(2, RoundingMode.HALF_UP);//调用setScale方法进行数据格式化,保留两位小数,采用四舍五入规则
  325. return bd.toString(); //返回bd对象的值(转化为string形式)
  326. }
  327. public void tongbuByHour() throws ParseException {
  328. String startDate = null;
  329. String endDate = null;
  330. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  331. Calendar calendar = Calendar.getInstance();
  332. calendar.setTime(new Date());
  333. endDate = sdf.format(calendar.getTime());
  334. calendar.add(calendar.HOUR_OF_DAY, -1);
  335. // calendar.add(calendar.DATE, -4);
  336. startDate = sdf.format(calendar.getTime());
  337. Date start = sdf.parse(startDate);
  338. Date end = sdf.parse(endDate);
  339. // 查询es 现有的数据
  340. //1,同步已支付的订单
  341. int num = 0;
  342. while (true) {
  343. int limit = MAX_ROW;
  344. int offset= num * MAX_ROW;
  345. LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  346. query.gt(TOrder::getCreateDate,start);
  347. query.lt(TOrder::getCreateDate,end);
  348. query.eq(TOrder::getStatus,1);
  349. Page<TOrder> page = new Page<>(offset, limit, true);
  350. IPage<TOrder> iPage = tOrderService.page(page, query);
  351. List<TOrder> list_es = iPage.getRecords();
  352. if(list_es.size()>0){
  353. insertBatch(list_es);
  354. for(TOrder order:list_es){
  355. try {
  356. updateDataById(order);
  357. } catch (Exception e) {
  358. e.printStackTrace();
  359. }
  360. }
  361. }
  362. num++;
  363. if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
  364. break;
  365. }
  366. }
  367. //2,同步已退款的订单
  368. int num2 = 0;
  369. while (true) {
  370. int limit = MAX_ROW;
  371. int offset= num2 * MAX_ROW;
  372. LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  373. query.gt(TOrder::getCreateDate,start);
  374. query.lt(TOrder::getCreateDate,end);
  375. query.eq(TOrder::getStatus,3);
  376. Page<TOrder> page = new Page<>(offset, limit, true);
  377. IPage<TOrder> iPage = tOrderService.page(page, query);
  378. List<TOrder> list_es = iPage.getRecords();
  379. if(list_es.size()>0){
  380. insertBatch(list_es);
  381. for(TOrder order:list_es){
  382. try {
  383. updateDataById(order);
  384. } catch (Exception e) {
  385. e.printStackTrace();
  386. }
  387. }
  388. }
  389. num2++;
  390. if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
  391. break;
  392. }
  393. }
  394. // 查询es 现有的数据
  395. // LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  396. // query.gt(TOrder::getCreateDate,start);
  397. // query.lt(TOrder::getCreateDate,end);
  398. // List<TOrder> list_es = tOrderService.list(query);
  399. // if (CollectionUtils.isNotEmpty(list_es)) {
  400. // insertBatch(list_es);
  401. // if(list_es.size()>0){
  402. // for(TOrder order : list_es){
  403. // try {
  404. // updateDataById(order);
  405. // } catch (Exception e) {
  406. // e.printStackTrace();
  407. // }
  408. // }
  409. // }
  410. // }
  411. }
  412. public void tongbuByDay() throws ParseException {
  413. String startDate = null;
  414. String endDate = null;
  415. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  416. Calendar calendar = Calendar.getInstance();
  417. calendar.setTime(new Date());
  418. endDate = sdf.format(calendar.getTime());
  419. calendar.add(calendar.DATE, -1);
  420. // calendar.add(calendar.DATE, -4);
  421. startDate = sdf.format(calendar.getTime());
  422. Date start = sdf.parse(startDate);
  423. Date end = sdf.parse(endDate);
  424. // 查询es 现有的数据
  425. //1,同步已支付的订单
  426. int num = 0;
  427. while (true) {
  428. int limit = MAX_ROW;
  429. int offset= num * MAX_ROW;
  430. LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  431. query.gt(TOrder::getCreateDate,start);
  432. query.lt(TOrder::getCreateDate,end);
  433. query.eq(TOrder::getStatus,1);
  434. Page<TOrder> page = new Page<>(offset, limit, true);
  435. IPage<TOrder> iPage = tOrderService.page(page, query);
  436. List<TOrder> list_es = iPage.getRecords();
  437. if(list_es.size()>0){
  438. insertBatch(list_es);
  439. for(TOrder order:list_es){
  440. try {
  441. updateDataById(order);
  442. } catch (Exception e) {
  443. e.printStackTrace();
  444. }
  445. }
  446. }
  447. num++;
  448. if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
  449. break;
  450. }
  451. }
  452. //2,同步已退款的订单
  453. int num2 = 0;
  454. while (true) {
  455. int limit = MAX_ROW;
  456. int offset= num2 * MAX_ROW;
  457. LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  458. query.gt(TOrder::getCreateDate,start);
  459. query.lt(TOrder::getCreateDate,end);
  460. query.eq(TOrder::getStatus,3);
  461. Page<TOrder> page = new Page<>(offset, limit, true);
  462. IPage<TOrder> iPage = tOrderService.page(page, query);
  463. List<TOrder> list_es = iPage.getRecords();
  464. if(list_es.size()>0){
  465. insertBatch(list_es);
  466. for(TOrder order:list_es){
  467. try {
  468. updateDataById(order);
  469. } catch (Exception e) {
  470. e.printStackTrace();
  471. }
  472. }
  473. }
  474. num2++;
  475. if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
  476. break;
  477. }
  478. }
  479. // 查询es 现有的数据
  480. // LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  481. // query.gt(TOrder::getCreateDate,start);
  482. // query.lt(TOrder::getCreateDate,end);
  483. // List<TOrder> list_es = tOrderService.list(query);
  484. // if (CollectionUtils.isNotEmpty(list_es)) {
  485. // insertBatch(list_es);
  486. // if(list_es.size()>0){
  487. // for(TOrder order : list_es){
  488. // try {
  489. // updateDataById(order);
  490. // } catch (Exception e) {
  491. // e.printStackTrace();
  492. // }
  493. // }
  494. // }
  495. // }
  496. }
  497. }