EsTOrderService.java 42 KB


  1. package com.szwl.service.es;
  2. import cn.com.crbank.ommo.esclient.EsBaseService;
  3. import cn.com.crbank.ommo.exception.MyException;
  4. import cn.hutool.core.util.RandomUtil;
  5. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  6. import com.baomidou.mybatisplus.core.metadata.IPage;
  7. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  8. import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
  9. import com.szwl.feign.SzwlFeign;
  10. import com.szwl.model.bean.ChartBean;
  11. import com.szwl.model.bean.ChartColumn;
  12. import com.szwl.model.bean.OrderDaoChuDTO;
  13. import com.szwl.model.bo.ChartType;
  14. import com.szwl.model.bo.R;
  15. import com.szwl.model.entity.*;
  16. import com.szwl.model.query.StatisticsParam;
  17. import com.szwl.model.query.TCoinOrderParam;
  18. import com.szwl.model.query.TEquipmentParam;
  19. import com.szwl.model.query.TOrderParam;
  20. import com.szwl.model.utils.DateUtils;
  21. import com.szwl.service.OrderStatisticsMonthService;
  22. import com.szwl.service.OrderStatisticsYearService;
  23. import com.szwl.service.TOrderService;
  24. import lombok.extern.slf4j.Slf4j;
  25. import org.apache.commons.collections4.CollectionUtils;
  26. import org.apache.commons.lang3.StringUtils;
  27. import org.elasticsearch.action.search.SearchRequest;
  28. import org.elasticsearch.action.search.SearchResponse;
  29. import org.elasticsearch.client.RequestOptions;
  30. import org.elasticsearch.client.indices.GetIndexRequest;
  31. import org.elasticsearch.index.query.BoolQueryBuilder;
  32. import org.elasticsearch.index.query.QueryBuilders;
  33. import org.elasticsearch.search.aggregations.*;
  34. import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
  35. import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
  36. import org.elasticsearch.search.aggregations.bucket.terms.Terms;
  37. import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
  38. import org.elasticsearch.search.aggregations.metrics.ParsedSum;
  39. import org.elasticsearch.search.aggregations.metrics.Sum;
  40. import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
  41. import org.elasticsearch.search.builder.SearchSourceBuilder;
  42. import org.springframework.beans.factory.annotation.Autowired;
  43. import org.springframework.stereotype.Service;
  44. import java.math.BigDecimal;
  45. import java.math.RoundingMode;
  46. import java.text.ParseException;
  47. import java.text.SimpleDateFormat;
  48. import java.util.ArrayList;
  49. import java.util.Calendar;
  50. import java.util.Date;
  51. import java.util.List;
  52. @Slf4j
  53. @Service
  54. public class EsTOrderService extends EsBaseService<TOrder, TOrderParam> {
  55. private static boolean InitEsTableStatus = false; // 是否正常完成 es 初始化
  56. public static final int MAX_ROW = 1000;
  57. @Autowired
  58. TOrderService tOrderService;
  59. @Autowired
  60. EsTEquipmentService esTEquipmentService;
  61. @Autowired
  62. EsTCoinOrderService esTCoinOrderService;
  63. @Autowired
  64. SzwlFeign szwlFeign;
  65. @Autowired
  66. OrderStatisticsMonthService orderStatisticsMonthService;
  67. @Autowired
  68. OrderStatisticsYearService orderStatisticsYearService;
  69. @Override
  70. public String getTableName() {
  71. return "es_t_order";
  72. }
  73. // public String getTableName() {
  74. // return "es_t_order";
  75. // }
  76. @Override
  77. public TOrder getInstanceOfEntity() {
  78. return new TOrder();
  79. }
  80. @Override
  81. public void setInitTableStatus(boolean flag) {
  82. InitEsTableStatus = flag;
  83. }
  84. @Override
  85. public boolean getInitTableStatus() {
  86. return InitEsTableStatus;
  87. }
  88. @Override
  89. public void initTableFun() {
  90. try{
  91. String tableName = getTableName();
  92. GetIndexRequest request = new GetIndexRequest(tableName);
  93. // DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(this.getTableName());
  94. // restHighLevelClient.indices().delete(deleteIndexRequest,RequestOptions.DEFAULT);
  95. boolean isExists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
  96. if (!isExists) {
  97. log.info("es 索引 开始创建"+tableName);
  98. createTable();
  99. // 初始化旧流水
  100. int num = 0;
  101. while (true) {
  102. int limit = MAX_ROW;
  103. // int offset= num * MAX_ROW;
  104. int offset= num;
  105. // TOrderExample example = new TOrderExample();
  106. // example.setLimit(limit);
  107. // example.setOffset(offset);
  108. // List<TOrder> list = tOrderService.selectByOption(example);
  109. LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  110. SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  111. String time = "2022-07-01 00:00:00";
  112. Date date = ft.parse(time);
  113. query.gt(TOrder::getCreateDate,date);
  114. Page<TOrder> page = new Page<>(offset, limit, true);
  115. IPage<TOrder> iPage = tOrderService.page(page, query);
  116. List<TOrder> list = iPage.getRecords();
  117. insertBatch(list);
  118. num++;
  119. if(list.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
  120. break;
  121. }
  122. }
  123. } else {
  124. log.info("es 索引 "+tableName+" 已存在不再创建");
  125. }
  126. InitEsTableStatus = true;
  127. }catch (Exception e){
  128. log.error("ElasticsearchRunner InitEsTOrderThread 发生错误:{}" , e);
  129. throw new MyException("ElasticsearchRunner InitEsTOrderThread 发生错误:" + e.getMessage());
  130. }
  131. }
  132. @Override
  133. public String getEntityPrimaryKey(TOrder tOrder) {
  134. return String.valueOf(tOrder.getId());
  135. }
  136. @Override
  137. public TOrder setEntityPrimaryKey(TOrder tOrder, String value) {
  138. return null;
  139. }
  140. /**
  141. * 根据时间 重新同步es
  142. * @param
  143. * @return
  144. */
  145. public void updateEsByDate(String startTime, String endTime) {
  146. try {
  147. startTime = startTime.replace("/", "-");
  148. endTime = endTime.replace("/", "-");
  149. Date start = DateUtils.parseDate(startTime, DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss, new Date());
  150. Date end = DateUtils.parseDate(endTime, DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss, new Date());
  151. // 查询es 现有的数据,删除
  152. TOrderParam param = new TOrderParam();
  153. param.setCreateDate_start(start);
  154. param.setCreateDate_end(end);
  155. List<TOrder> list_es = this.selectEntityByEqualToOption(param);
  156. for (TOrder entity : list_es) {
  157. this.deleteTableById(String.valueOf(entity.getId()));
  158. }
  159. // 插入 新的
  160. // List<TOrder> list = tOrderService.selectForEs(param);
  161. LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  162. query.gt(TOrder::getCreateDate,start);
  163. query.lt(TOrder::getCreateDate,end);
  164. List<TOrder> list = tOrderService.list(query);
  165. if (CollectionUtils.isNotEmpty(list)) {
  166. insertBatch(list);
  167. }
  168. } catch (Exception e) {
  169. log.error(this.getTableName()+" updateEsByDate 发生错误:{}", e);
  170. throw new MyException(this.getTableName()+" updateEsByDate 发生错误:" + e.getMessage());
  171. }
  172. }
  173. /**
  174. * es 查询条件
  175. * @param param
  176. * @return
  177. */
  178. public BoolQueryBuilder getStatisticsParam2QueryBuilder(StatisticsParam param){
  179. BoolQueryBuilder boolQueryBuilder;
  180. String startDate = param.getStartDate().replace("/","-");
  181. String endDate = param.getEndDate().replace("/","-");
  182. Date start = DateUtils.parseDate(startDate+" 00:00:00", DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss,new Date());
  183. Date end = DateUtils.parseDate(endDate+" 23:59:59", DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss,new Date());
  184. if(StringUtils.equals("0",param.getIfForeign())){ // 国内用户
  185. TOrderParam tOrderParam = new TOrderParam();
  186. tOrderParam.setStatus(1);
  187. tOrderParam.setPayDate_start(start);
  188. tOrderParam.setPayDate_end(end);
  189. if(StringUtils.isNotEmpty(param.getAdminId())){ // 所属商家id
  190. tOrderParam.setAdminId(Long.parseLong(param.getAdminId()));
  191. }
  192. if(StringUtils.isNotEmpty(param.getEquipmentId())){ // 设备id
  193. tOrderParam.setEquipmentId(Long.parseLong(param.getEquipmentId()));
  194. }
  195. if(param.getEquipmentIds()!=null&&param.getEquipmentIds().size()>0){ // 设备id
  196. tOrderParam.setEquipmentId_inList(param.getEquipmentIds());
  197. }
  198. if(StringUtils.isNotEmpty(param.getPayType())){ // 支付方式
  199. tOrderParam.setFrpCode(param.getPayType());
  200. }
  201. if (null != param.getAgencyId()){
  202. tOrderParam.setAgencyId(param.getAgencyId());
  203. tOrderParam.setType_gt(1);
  204. }
  205. if(null != param.getMerchantId()){
  206. tOrderParam.setMerchantId(param.getMerchantId());
  207. tOrderParam.setType_gt(2);
  208. }
  209. if(param.getAdminIds()!=null&&param.getAdminIds().size()>0){
  210. tOrderParam.setAdminId_inList(param.getAdminIds());
  211. }
  212. boolQueryBuilder = this.getParam2QueryBuilder(tOrderParam);
  213. }else{ // 国外用户
  214. TEquipmentParam tEquipmentParam = new TEquipmentParam();
  215. TCoinOrderParam tCoinOrderParam = new TCoinOrderParam();
  216. tCoinOrderParam.setPayDate_start(start);
  217. tCoinOrderParam.setPayDate_end(end);
  218. // 设备id
  219. if(StringUtils.isNotEmpty(param.getClientId())){
  220. // tEquipmentParam.setId(Long.parseLong(param.getEquipmentId()));
  221. // List<TEquipment> equipmentList = esTEquipmentService.selectEntityByEqualToOption(tEquipmentParam);
  222. // List<String> clientIdList = equipmentList.stream().map(TEquipment::getClientId).collect(Collectors.toList());
  223. // tCoinOrderParam.setClientId_inList(clientIdList);
  224. tCoinOrderParam.setClientId(param.getClientId());
  225. }
  226. // 支付方式
  227. if (StringUtils.isNotEmpty(param.getPayType())) {
  228. tCoinOrderParam.setPayType(Integer.valueOf(param.getPayType()));
  229. }
  230. if(param.getAdminIds() != null && param.getAdminIds().size()>0){
  231. tCoinOrderParam.setAdminId_inList(param.getAdminIds());
  232. }
  233. // 所属商家id
  234. if(StringUtils.isNotEmpty(param.getAdminId())){
  235. tCoinOrderParam.setAdminId(Long.parseLong(param.getAdminId()));
  236. }
  237. // 设备
  238. if(param.getClientIds() != null && param.getClientIds().size()>0){
  239. tCoinOrderParam.setClientId_inList(param.getClientIds());
  240. }
  241. boolQueryBuilder = esTCoinOrderService.getParam2QueryBuilder(tCoinOrderParam);
  242. }
  243. return boolQueryBuilder;
  244. }
  245. public ChartColumn getStatistics(StatisticsParam param) {
  246. String msg = "";
  247. String format = "yyyy-MM";
  248. DateHistogramInterval interval = DateHistogramInterval.DAY;
  249. if (ChartType.day.toString().equals(param.getChartType())) {
  250. interval = DateHistogramInterval.HOUR;
  251. format = "HH:00";
  252. msg = "日统计";
  253. }
  254. if (ChartType.week.toString().equals(param.getChartType())) {
  255. interval = DateHistogramInterval.DAY;
  256. format = "yyyy-MM-dd";
  257. msg = "周统计";
  258. }
  259. if (ChartType.month.toString().equals(param.getChartType())) {
  260. interval = DateHistogramInterval.DAY;
  261. format = "MM-dd";
  262. msg = "月统计";
  263. }
  264. if (ChartType.year.toString().equals(param.getChartType())) {
  265. interval = DateHistogramInterval.MONTH;
  266. format = "MM月";
  267. msg = "年统计";
  268. }
  269. ChartColumn chartColumn = getStatistics(param,format,interval);
  270. if (ChartType.week.toString().equals(param.getChartType())) {
  271. ArrayList<String> categories = new ArrayList<>();
  272. for (String day :chartColumn.getCategories()) {
  273. int week = DateUtils.getWeek(DateUtils.parseDate(day, DateUtils.PATTERN_yyyy_MM_dd,new Date()));
  274. String finalCategories = "周" + (week==0?"日":week);
  275. categories.add(finalCategories);
  276. }
  277. chartColumn.setCategories(categories);
  278. }
  279. return chartColumn;
  280. }
  281. public ChartColumn getStatistics(StatisticsParam param, String format, DateHistogramInterval interval) {
  282. try {
  283. // 仅为名称,用以获取返回结果
  284. String aggregationResultName = "aggregationResult";
  285. // 仅为名称,用以获取返回结果
  286. String aggSumName = "sales";
  287. // 仅为名称,用以获取返回结果
  288. String aggName_productNumber = "productNumber";
  289. // 仅为名称,用以获取返回结果
  290. String aggName_refundQuantity = "refundQuantity";
  291. // 仅为名称,用以获取返回结果
  292. String aggName_refundAmount = "refundAmount";
  293. //Bool查询
  294. BoolQueryBuilder queryBuilder = getStatisticsParam2QueryBuilder(param);
  295. // 公司平台
  296. String companyType = param.getCompanyType();
  297. if(StringUtils.isNotEmpty(companyType)) {
  298. if(companyType.equals("0")) {
  299. queryBuilder.must(
  300. QueryBuilders.boolQuery().should(
  301. QueryBuilders.termQuery("companyType", "0")
  302. ).should(
  303. QueryBuilders.boolQuery().mustNot(
  304. QueryBuilders.existsQuery("companyType")
  305. )
  306. )
  307. );
  308. } else {
  309. queryBuilder.must(QueryBuilders.termQuery("companyType","1"));
  310. }
  311. }
  312. // 设备类型
  313. String machineType = param.getMachineType();
  314. if(StringUtils.isNotEmpty(machineType)) {
  315. if(machineType.equals("0")) {
  316. queryBuilder.must(
  317. QueryBuilders.boolQuery().should(
  318. QueryBuilders.termQuery("machineType", "0")
  319. ).should(
  320. QueryBuilders.boolQuery().mustNot(
  321. QueryBuilders.existsQuery("machineType")
  322. )
  323. )
  324. );
  325. } else {
  326. queryBuilder.must(QueryBuilders.termQuery("machineType",machineType));
  327. }
  328. }
  329. // 时间聚合
  330. AggregationBuilder dateHistogram = AggregationBuilders.dateHistogram(aggregationResultName)
  331. .field("payDate")
  332. .calendarInterval(interval)
  333. .format(format);
  334. // 根据字段price 求和 sum, sales 仅为名称,用以获取返回结果
  335. // 订单金额
  336. SumAggregationBuilder sumAggregationBuilder = AggregationBuilders.sum(aggSumName).field("price");
  337. dateHistogram.subAggregation(sumAggregationBuilder);
  338. // 订单数量
  339. SumAggregationBuilder sumAggregationBuilder2 = AggregationBuilders.sum(aggName_productNumber).field("productNumber");
  340. dateHistogram.subAggregation(sumAggregationBuilder2);
  341. // 退款数量
  342. SumAggregationBuilder sumAggregationBuilder3 = AggregationBuilders.sum(aggName_refundQuantity).field("refundQuantity");
  343. dateHistogram.subAggregation(sumAggregationBuilder3);
  344. // 退款金额
  345. SumAggregationBuilder sumAggregationBuilder4 = AggregationBuilders.sum(aggName_refundAmount).field("refundAmount");
  346. dateHistogram.subAggregation(sumAggregationBuilder4);
  347. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  348. // 指定size为0,不返回文档 因为只需要数量
  349. sourceBuilder.query(queryBuilder).aggregation(dateHistogram).size(0);
  350. String tableName = getTableName();
  351. if(StringUtils.isNotEmpty(param.getIfForeign()) && param.getIfForeign().equals("1")) {
  352. tableName = esTCoinOrderService.getTableName();
  353. }
  354. SearchRequest searchRequest = new SearchRequest(tableName);
  355. searchRequest.source(sourceBuilder);
  356. log.debug("sourceBuilder:{}", sourceBuilder);
  357. SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  358. Aggregation agg = searchResponse.getAggregations().get(aggregationResultName);
  359. List<? extends Histogram.Bucket> buckets = ((Histogram) agg).getBuckets();
  360. List<ChartBean> chartBeanList = new ArrayList<>();
  361. for (Histogram.Bucket bucket : buckets) {
  362. ChartBean chartBean = new ChartBean();
  363. chartBean.setCategorie(bucket.getKeyAsString());
  364. //统计商品数
  365. ParsedSum saleNum1 = bucket.getAggregations().get(aggName_productNumber);
  366. ParsedSum saleNum2 = bucket.getAggregations().get(aggName_refundQuantity);
  367. String saleNumStr1 = format1((float) saleNum1.getValue());
  368. String saleNumStr2 = format1((float) saleNum2.getValue());
  369. if(StringUtils.isEmpty(saleNumStr1)){
  370. saleNumStr1 = "1";
  371. }
  372. chartBean.setSaleNum(Float.valueOf(saleNumStr1) - Float.valueOf(saleNumStr2));
  373. //统计订单数
  374. chartBean.setOrderNum(Float.valueOf(bucket.getDocCount()));
  375. ParsedSum sum = bucket.getAggregations().get(aggSumName);
  376. ParsedSum refundSum = bucket.getAggregations().get(aggName_refundAmount);
  377. String s1 = format1((float) sum.getValue());
  378. String s2 = format1((float) refundSum.getValue());
  379. chartBean.setSalePrice(Float.valueOf(s1) - Float.valueOf(s2));
  380. chartBeanList.add(chartBean);
  381. }
  382. ChartColumn chartColumn = new ChartColumn(chartBeanList);
  383. return chartColumn;
  384. } catch (Exception e) {
  385. log.error(getTableName() + " es根据条件 聚合查询 报错:{}", e);
  386. throw new MyException(getTableName() + " es根据条件 聚合查询 报错:" + e.getMessage());
  387. }
  388. }
  389. public static String format1(Float value){
  390. BigDecimal bd = new BigDecimal(value);//创建一个bd对象,将要转换的值value传入构造函数
  391. bd = bd.setScale(2, RoundingMode.HALF_UP);//调用setScale方法进行数据格式化,保留两位小数,采用四舍五入规则
  392. return bd.toString(); //返回bd对象的值(转化为string形式)
  393. }
  394. public void tongbuByHour() throws ParseException {
  395. String startDate = null;
  396. String endDate = null;
  397. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  398. Calendar calendar = Calendar.getInstance();
  399. calendar.setTime(new Date());
  400. endDate = sdf.format(calendar.getTime());
  401. calendar.add(Calendar.HOUR_OF_DAY, -1);
  402. // calendar.add(calendar.DATE, -4);
  403. startDate = sdf.format(calendar.getTime());
  404. Date start = sdf.parse(startDate);
  405. Date end = sdf.parse(endDate);
  406. // 查询es 现有的数据
  407. //1,同步已支付的订单
  408. int num = 0;
  409. while (true) {
  410. int limit = MAX_ROW;
  411. int offset= num * MAX_ROW;
  412. LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  413. query.gt(TOrder::getCreateDate,start);
  414. query.lt(TOrder::getCreateDate,end);
  415. query.eq(TOrder::getStatus,1);
  416. Page<TOrder> page = new Page<>(offset, limit, true);
  417. IPage<TOrder> iPage = tOrderService.page(page, query);
  418. List<TOrder> list_es = iPage.getRecords();
  419. if(list_es.size()>0){
  420. insertBatch(list_es);
  421. for(TOrder order:list_es){
  422. try {
  423. updateDataById(order);
  424. } catch (Exception e) {
  425. e.printStackTrace();
  426. }
  427. }
  428. }
  429. num++;
  430. if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
  431. break;
  432. }
  433. }
  434. //2,同步已退款的订单
  435. int num2 = 0;
  436. while (true) {
  437. int limit = MAX_ROW;
  438. int offset= num2 * MAX_ROW;
  439. LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  440. query.gt(TOrder::getCreateDate,start);
  441. query.lt(TOrder::getCreateDate,end);
  442. query.eq(TOrder::getStatus,3);
  443. Page<TOrder> page = new Page<>(offset, limit, true);
  444. IPage<TOrder> iPage = tOrderService.page(page, query);
  445. List<TOrder> list_es = iPage.getRecords();
  446. if(list_es.size()>0){
  447. insertBatch(list_es);
  448. for(TOrder order:list_es){
  449. try {
  450. updateDataById(order);
  451. } catch (Exception e) {
  452. e.printStackTrace();
  453. }
  454. }
  455. }
  456. num2++;
  457. if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
  458. break;
  459. }
  460. }
  461. // 查询es 现有的数据
  462. // LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  463. // query.gt(TOrder::getCreateDate,start);
  464. // query.lt(TOrder::getCreateDate,end);
  465. // List<TOrder> list_es = tOrderService.list(query);
  466. // if (CollectionUtils.isNotEmpty(list_es)) {
  467. // insertBatch(list_es);
  468. // if(list_es.size()>0){
  469. // for(TOrder order : list_es){
  470. // try {
  471. // updateDataById(order);
  472. // } catch (Exception e) {
  473. // e.printStackTrace();
  474. // }
  475. // }
  476. // }
  477. // }
  478. }
  479. public void tongbuByDay() throws ParseException {
  480. String startDate = null;
  481. String endDate = null;
  482. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  483. Calendar calendar = Calendar.getInstance();
  484. calendar.setTime(new Date());
  485. endDate = sdf.format(calendar.getTime());
  486. calendar.add(calendar.DATE, -1);
  487. // calendar.add(calendar.DATE, -4);
  488. startDate = sdf.format(calendar.getTime());
  489. Date start = sdf.parse(startDate);
  490. Date end = sdf.parse(endDate);
  491. // 查询es 现有的数据
  492. //1,同步已支付的订单
  493. int num = 0;
  494. while (true) {
  495. int limit = MAX_ROW;
  496. int offset= num * MAX_ROW;
  497. LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  498. query.gt(TOrder::getCreateDate,start);
  499. query.lt(TOrder::getCreateDate,end);
  500. query.eq(TOrder::getStatus,1);
  501. Page<TOrder> page = new Page<>(offset, limit, true);
  502. IPage<TOrder> iPage = tOrderService.page(page, query);
  503. List<TOrder> list_es = iPage.getRecords();
  504. if(list_es.size()>0){
  505. insertBatch(list_es);
  506. for(TOrder order:list_es){
  507. try {
  508. updateDataById(order);
  509. } catch (Exception e) {
  510. e.printStackTrace();
  511. }
  512. }
  513. }
  514. num++;
  515. if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
  516. break;
  517. }
  518. }
  519. //2,同步已退款的订单
  520. int num2 = 0;
  521. while (true) {
  522. int limit = MAX_ROW;
  523. int offset= num2 * MAX_ROW;
  524. LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  525. query.gt(TOrder::getCreateDate,start);
  526. query.lt(TOrder::getCreateDate,end);
  527. query.eq(TOrder::getStatus,3);
  528. Page<TOrder> page = new Page<>(offset, limit, true);
  529. IPage<TOrder> iPage = tOrderService.page(page, query);
  530. List<TOrder> list_es = iPage.getRecords();
  531. if(list_es.size()>0){
  532. insertBatch(list_es);
  533. for(TOrder order:list_es){
  534. try {
  535. updateDataById(order);
  536. } catch (Exception e) {
  537. e.printStackTrace();
  538. }
  539. }
  540. }
  541. num2++;
  542. if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
  543. break;
  544. }
  545. }
  546. // 查询es 现有的数据
  547. // LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  548. // query.gt(TOrder::getCreateDate,start);
  549. // query.lt(TOrder::getCreateDate,end);
  550. // List<TOrder> list_es = tOrderService.list(query);
  551. // if (CollectionUtils.isNotEmpty(list_es)) {
  552. // insertBatch(list_es);
  553. // if(list_es.size()>0){
  554. // for(TOrder order : list_es){
  555. // try {
  556. // updateDataById(order);
  557. // } catch (Exception e) {
  558. // e.printStackTrace();
  559. // }
  560. // }
  561. // }
  562. // }
  563. }
  564. /**
  565. * 同步汇总统计到数据库
  566. * @param param 参数
  567. * @return
  568. */
  569. public Page<OrderDaoChuDTO> getSummaryStatistics(StatisticsParam param) {
  570. Page<OrderDaoChuDTO> page = new Page<>(param.getCurrent(), param.getSize(), true);
  571. String statisticsDate;
  572. if (StringUtils.isNotEmpty(param.getDateType()) && param.getDateType().equals("1")) {
  573. // 按年统计
  574. statisticsDate = param.getStartDate().substring(0, 4);
  575. } else {
  576. statisticsDate = param.getStartDate().substring(0, 7);
  577. }
  578. try {
  579. // 仅为名称,用以获取返回结果
  580. String aggregationResultName = "aggregationResult";
  581. // 按商户或者按设备分组查询
  582. String field = "adminId";
  583. // 仅为名称,用以获取返回结果
  584. String aggNameSalePrice = "salePrice";
  585. // 仅为名称,用以获取返回结果
  586. String aggNameTotal = "total";
  587. // 仅为名称,用以获取返回结果
  588. String aggNameSaleNum = "saleNum";
  589. // 创建布尔查询
  590. BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
  591. .must(QueryBuilders.rangeQuery("payDate")
  592. .from(param.getStartDate())
  593. .to(param.getEndDate())
  594. .format("yyyy-MM-dd HH:mm:ss"))
  595. .must(QueryBuilders.termQuery("status", 1));
  596. if(StringUtils.isNotEmpty(param.getPacketType()) && param.getPacketType().equals("1")) {
  597. // 按设备分组
  598. field = "clientId.keyword";
  599. }
  600. int size = param.getCurrent() * param.getSize();
  601. // 创建聚合:按 clientId 进行分组
  602. TermsAggregationBuilder aggregation = AggregationBuilders
  603. .terms(aggregationResultName)
  604. .field(field)
  605. .size(size)
  606. .order(BucketOrder.aggregation(aggNameSalePrice,false))
  607. .subAggregation(AggregationBuilders.sum(aggNameSalePrice).field("price"))
  608. .subAggregation(AggregationBuilders.sum(aggNameSaleNum).field("productNumber"));
  609. // 创建总数聚合
  610. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
  611. .query(boolQuery)
  612. .aggregation(aggregation)
  613. .size(0);
  614. // cardinality 聚合
  615. searchSourceBuilder.aggregation(AggregationBuilders.cardinality(aggNameTotal).field(field));
  616. // 获取表名
  617. String tableName = getTableName();
  618. if(StringUtils.isNotEmpty(param.getIfForeign()) && param.getIfForeign().equals("1")) {
  619. tableName = esTCoinOrderService.getTableName();
  620. }
  621. // 创建请求
  622. SearchRequest searchRequest = new SearchRequest(tableName).source(searchSourceBuilder);
  623. log.debug("sourceBuilder:{}", searchSourceBuilder);
  624. // 执行查询
  625. SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  626. // 解析结果
  627. Aggregations aggregations = searchResponse.getAggregations();
  628. Terms aggregationResult = aggregations.get(aggregationResultName);
  629. // 修改集合
  630. List<OrderStatisticsYear> updateYearList = new ArrayList<>();
  631. List<OrderStatisticsMonth> updateMonthList = new ArrayList<>();
  632. // 插入集合
  633. List<OrderStatisticsYear> insertYearList = new ArrayList<>();
  634. List<OrderStatisticsMonth> insertMonthList = new ArrayList<>();
  635. for (Terms.Bucket bucket : aggregationResult.getBuckets()) {
  636. String key = bucket.getKeyAsString();
  637. Sum salesSum = bucket.getAggregations().get(aggNameSalePrice);
  638. double priceTotal = salesSum.getValue();
  639. long docCount = bucket.getDocCount();
  640. Sum saleNumSum = bucket.getAggregations().get(aggNameSaleNum);
  641. int saleNum = (int)saleNumSum.getValue();
  642. if (priceTotal == 0) {
  643. continue;
  644. }
  645. if (StringUtils.isNotEmpty(param.getDateType()) && param.getDateType().equals("1")) {
  646. // 按年统计
  647. OrderStatisticsYear orderStatistics = new OrderStatisticsYear();
  648. orderStatistics.setId(RandomUtil.randomNumbers(32));
  649. orderStatistics.setIfForeign(param.getIfForeign());
  650. orderStatistics.setType(param.getPacketType());
  651. if (StringUtils.isNotEmpty(param.getPacketType()) && param.getPacketType().equals("1")) {
  652. TEquipment equipment = R.getDataIfSuccess(szwlFeign.findEquipmentByClientId(key));
  653. if (equipment == null) {
  654. continue;
  655. }
  656. // TAdmin admin = R.getDataIfSuccess(szwlFeign.getAdmin(String.valueOf(equipment.getAdminId())));
  657. // if (admin == null) {
  658. // continue;
  659. // }
  660. Long adminId = equipment.getAdminId();
  661. String companyType = R.getDataIfSuccess(szwlFeign.getCompanyTypeById(String.valueOf(equipment.getAdminId())));
  662. if (StringUtils.isEmpty(companyType)) {
  663. continue;
  664. }
  665. // 先查询是否已有数据
  666. LambdaQueryWrapper<OrderStatisticsYear> query = Wrappers.lambdaQuery();
  667. query.eq(OrderStatisticsYear::getAdminId, equipment.getAdminId());
  668. query.eq(OrderStatisticsYear::getEquipmentId, equipment.getId());
  669. query.eq(OrderStatisticsYear::getStatisticsDate, statisticsDate);
  670. query.eq(OrderStatisticsYear::getType, param.getPacketType());
  671. query.eq(OrderStatisticsYear::getIfForeign, param.getIfForeign());
  672. OrderStatisticsYear statisticsYear = orderStatisticsYearService.getOne(query);
  673. if(statisticsYear != null){
  674. // 金额是否有变化,有就更新,没有就不更新
  675. BigDecimal newSalePrice = new BigDecimal(priceTotal).setScale(2, RoundingMode.HALF_UP);
  676. if (statisticsYear.getSalePrice().equals(newSalePrice)) {
  677. continue;
  678. }
  679. // 更新数据
  680. statisticsYear.setSaleNum(saleNum);
  681. statisticsYear.setSalePrice(newSalePrice);
  682. statisticsYear.setOrderNum(Long.valueOf(docCount).intValue());
  683. updateYearList.add(statisticsYear);
  684. continue;
  685. }
  686. // 没有的话就插入数据
  687. orderStatistics.setSalePrice(new BigDecimal(priceTotal).setScale(2, RoundingMode.HALF_UP));
  688. orderStatistics.setAdminId(adminId);
  689. orderStatistics.setEquipmentId(equipment.getId());
  690. orderStatistics.setClientId(key);
  691. orderStatistics.setSaleNum(saleNum);
  692. orderStatistics.setOrderNum(Long.valueOf(docCount).intValue());
  693. orderStatistics.setCompanyType(companyType);
  694. orderStatistics.setStatisticsDate(statisticsDate);
  695. insertYearList.add(orderStatistics);
  696. } else {
  697. // 按商户分组
  698. // TAdmin admin = R.getDataIfSuccess(szwlFeign.getAdmin(key));
  699. // if (admin == null) {
  700. // continue;
  701. // }
  702. String companyType = R.getDataIfSuccess(szwlFeign.getCompanyTypeById(key));
  703. if (StringUtils.isEmpty(companyType)) {
  704. continue;
  705. }
  706. // 先查询是否已有数据
  707. LambdaQueryWrapper<OrderStatisticsYear> query = Wrappers.lambdaQuery();
  708. query.eq(OrderStatisticsYear::getAdminId, key);
  709. query.eq(OrderStatisticsYear::getStatisticsDate, statisticsDate);
  710. query.eq(OrderStatisticsYear::getType, param.getPacketType());
  711. query.eq(OrderStatisticsYear::getIfForeign, param.getIfForeign());
  712. OrderStatisticsYear statisticsYear = orderStatisticsYearService.getOne(query);
  713. if(statisticsYear != null){
  714. // 金额是否有变化,有就更新,没有就不更新
  715. BigDecimal newSalePrice = new BigDecimal(priceTotal).setScale(2, RoundingMode.HALF_UP);
  716. if (statisticsYear.getSalePrice().equals(newSalePrice)) {
  717. continue;
  718. }
  719. // 更新数据
  720. statisticsYear.setSaleNum(saleNum);
  721. statisticsYear.setSalePrice(newSalePrice);
  722. statisticsYear.setOrderNum(Long.valueOf(docCount).intValue());
  723. updateYearList.add(statisticsYear);
  724. continue;
  725. }
  726. orderStatistics.setSalePrice(new BigDecimal(priceTotal).setScale(2, RoundingMode.HALF_UP));
  727. orderStatistics.setAdminId(Long.valueOf(key));
  728. orderStatistics.setSaleNum(saleNum);
  729. orderStatistics.setOrderNum(Long.valueOf(docCount).intValue());
  730. orderStatistics.setCompanyType(companyType);
  731. orderStatistics.setStatisticsDate(statisticsDate);
  732. insertYearList.add(orderStatistics);
  733. }
  734. } else {
  735. OrderStatisticsMonth orderStatistics = new OrderStatisticsMonth();
  736. orderStatistics.setId(RandomUtil.randomNumbers(32));
  737. orderStatistics.setIfForeign(param.getIfForeign());
  738. orderStatistics.setType(param.getPacketType());
  739. if (StringUtils.isNotEmpty(param.getPacketType()) && param.getPacketType().equals("1")) {
  740. TEquipment equipment = R.getDataIfSuccess(szwlFeign.findEquipmentByClientId(key));
  741. if (equipment == null) {
  742. continue;
  743. }
  744. // TAdmin admin = R.getDataIfSuccess(szwlFeign.getAdmin(String.valueOf(equipment.getAdminId())));
  745. // if (admin == null) {
  746. // continue;
  747. // }
  748. Long adminId = equipment.getAdminId();
  749. String companyType = R.getDataIfSuccess(szwlFeign.getCompanyTypeById(String.valueOf(adminId)));
  750. if (StringUtils.isEmpty(companyType)) {
  751. continue;
  752. }
  753. // 先查询是否已有数据
  754. LambdaQueryWrapper<OrderStatisticsMonth> query = Wrappers.lambdaQuery();
  755. query.eq(OrderStatisticsMonth::getAdminId, equipment.getAdminId());
  756. query.eq(OrderStatisticsMonth::getEquipmentId, equipment.getId());
  757. query.eq(OrderStatisticsMonth::getStatisticsDate, statisticsDate);
  758. query.eq(OrderStatisticsMonth::getType, param.getPacketType());
  759. query.eq(OrderStatisticsMonth::getIfForeign, param.getIfForeign());
  760. OrderStatisticsMonth statisticsMonth = orderStatisticsMonthService.getOne(query);
  761. if(statisticsMonth != null){
  762. // 金额是否有变化,有就更新,没有就不更新
  763. BigDecimal newSalePrice = new BigDecimal(priceTotal).setScale(2, RoundingMode.HALF_UP);
  764. if (statisticsMonth.getSalePrice().equals(newSalePrice)) {
  765. continue;
  766. }
  767. // 更新数据
  768. statisticsMonth.setSaleNum(saleNum);
  769. statisticsMonth.setSalePrice(newSalePrice);
  770. statisticsMonth.setOrderNum(Long.valueOf(docCount).intValue());
  771. updateMonthList.add(statisticsMonth);
  772. continue;
  773. }
  774. orderStatistics.setSalePrice(new BigDecimal(priceTotal).setScale(2, RoundingMode.HALF_UP));
  775. orderStatistics.setAdminId(adminId);
  776. orderStatistics.setEquipmentId(equipment.getId());
  777. orderStatistics.setClientId(key);
  778. orderStatistics.setSaleNum(saleNum);
  779. orderStatistics.setOrderNum(Long.valueOf(docCount).intValue());
  780. orderStatistics.setCompanyType(companyType);
  781. orderStatistics.setStatisticsDate(statisticsDate);
  782. insertMonthList.add(orderStatistics);
  783. } else {
  784. // 按商户分组
  785. // TAdmin admin = R.getDataIfSuccess(szwlFeign.getAdmin(key));
  786. // if (admin == null) {
  787. // continue;
  788. // }
  789. String companyType = R.getDataIfSuccess(szwlFeign.getCompanyTypeById(key));
  790. if (StringUtils.isEmpty(companyType)) {
  791. continue;
  792. }
  793. // 先查询是否已有数据
  794. LambdaQueryWrapper<OrderStatisticsMonth> query = Wrappers.lambdaQuery();
  795. query.eq(OrderStatisticsMonth::getAdminId, key);
  796. query.eq(OrderStatisticsMonth::getStatisticsDate, statisticsDate);
  797. query.eq(OrderStatisticsMonth::getType, param.getPacketType());
  798. query.eq(OrderStatisticsMonth::getIfForeign, param.getIfForeign());
  799. OrderStatisticsMonth statisticsMonth = orderStatisticsMonthService.getOne(query);
  800. if(statisticsMonth != null){
  801. // 金额是否有变化,有就更新,没有就不更新
  802. BigDecimal newSalePrice = new BigDecimal(priceTotal).setScale(2, RoundingMode.HALF_UP);
  803. if (statisticsMonth.getSalePrice().equals(newSalePrice)) {
  804. continue;
  805. }
  806. // 更新数据
  807. statisticsMonth.setSaleNum(saleNum);
  808. statisticsMonth.setSalePrice(newSalePrice);
  809. statisticsMonth.setOrderNum(Long.valueOf(docCount).intValue());
  810. updateMonthList.add(statisticsMonth);
  811. continue;
  812. }
  813. orderStatistics.setSalePrice(new BigDecimal(priceTotal).setScale(2, RoundingMode.HALF_UP));
  814. orderStatistics.setAdminId(Long.valueOf(key));
  815. orderStatistics.setSaleNum(saleNum);
  816. orderStatistics.setOrderNum(Long.valueOf(docCount).intValue());
  817. orderStatistics.setCompanyType(companyType);
  818. orderStatistics.setStatisticsDate(statisticsDate);
  819. insertMonthList.add(orderStatistics);
  820. }
  821. }
  822. }
  823. // 如果修改集合不为空,就批量修改
  824. if (!updateYearList.isEmpty()) {
  825. orderStatisticsYearService.updateBatchById(updateYearList);
  826. }
  827. if (!updateMonthList.isEmpty()) {
  828. orderStatisticsMonthService.updateBatchById(updateMonthList);
  829. }
  830. // 如果插入集合不为空,就批量插入
  831. if (!insertYearList.isEmpty()) {
  832. orderStatisticsYearService.saveBatch(insertYearList);
  833. }
  834. // 如果插入集合不为空,就批量插入
  835. if (!insertMonthList.isEmpty()) {
  836. orderStatisticsMonthService.saveBatch(insertMonthList);
  837. }
  838. return page;
  839. } catch (Exception e) {
  840. log.error(getTableName() + " es根据条件 聚合查询 报错:{}", e);
  841. throw new MyException(getTableName() + " es根据条件 聚合查询 报错:" + e.getMessage());
  842. }
  843. }
  844. }