EsTOrderService.java 26 KB


  1. package com.szwl.service.es;
  2. import cn.com.crbank.ommo.esclient.EsBaseService;
  3. import cn.com.crbank.ommo.exception.MyException;
  4. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  5. import com.baomidou.mybatisplus.core.metadata.IPage;
  6. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  7. import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
  8. import com.szwl.model.bean.ChartBean;
  9. import com.szwl.model.bean.ChartColumn;
  10. import com.szwl.model.bo.ChartType;
  11. import com.szwl.model.entity.TEquipment;
  12. import com.szwl.model.entity.TOrder;
  13. import com.szwl.model.query.StatisticsParam;
  14. import com.szwl.model.query.TCoinOrderParam;
  15. import com.szwl.model.query.TEquipmentParam;
  16. import com.szwl.model.query.TOrderParam;
  17. import com.szwl.model.utils.DateUtils;
  18. import com.szwl.service.TOrderService;
  19. import lombok.extern.slf4j.Slf4j;
  20. import org.apache.commons.collections4.CollectionUtils;
  21. import org.apache.commons.lang3.StringUtils;
  22. import org.elasticsearch.action.search.SearchRequest;
  23. import org.elasticsearch.action.search.SearchResponse;
  24. import org.elasticsearch.client.RequestOptions;
  25. import org.elasticsearch.client.indices.GetIndexRequest;
  26. import org.elasticsearch.index.query.BoolQueryBuilder;
  27. import org.elasticsearch.index.query.ExistsQueryBuilder;
  28. import org.elasticsearch.index.query.QueryBuilders;
  29. import org.elasticsearch.search.aggregations.Aggregation;
  30. import org.elasticsearch.search.aggregations.AggregationBuilder;
  31. import org.elasticsearch.search.aggregations.AggregationBuilders;
  32. import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
  33. import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
  34. import org.elasticsearch.search.aggregations.metrics.ParsedSum;
  35. import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
  36. import org.elasticsearch.search.builder.SearchSourceBuilder;
  37. import org.springframework.beans.factory.annotation.Autowired;
  38. import org.springframework.stereotype.Service;
  39. import java.io.IOException;
  40. import java.lang.reflect.InvocationTargetException;
  41. import java.math.BigDecimal;
  42. import java.math.RoundingMode;
  43. import java.text.ParseException;
  44. import java.text.SimpleDateFormat;
  45. import java.util.ArrayList;
  46. import java.util.Calendar;
  47. import java.util.Date;
  48. import java.util.List;
  49. import java.util.concurrent.ExecutionException;
  50. import java.util.stream.Collectors;
  51. @Slf4j
  52. @Service
  53. public class EsTOrderService extends EsBaseService<TOrder, TOrderParam> {
  54. private static boolean InitEsTableStatus = false; // 是否正常完成 es 初始化
  55. public static final int MAX_ROW = 1000;
  56. @Autowired
  57. TOrderService tOrderService;
  58. @Autowired
  59. EsTEquipmentService esTEquipmentService;
  60. @Autowired
  61. EsTCoinOrderService esTCoinOrderService;
  62. @Override
  63. public String getTableName() {
  64. return "es_t_order";
  65. }
  66. // public String getTableName() {
  67. // return "es_t_order";
  68. // }
  69. @Override
  70. public TOrder getInstanceOfEntity() {
  71. return new TOrder();
  72. }
  73. @Override
  74. public void setInitTableStatus(boolean flag) {
  75. InitEsTableStatus = flag;
  76. }
  77. @Override
  78. public boolean getInitTableStatus() {
  79. return InitEsTableStatus;
  80. }
  81. @Override
  82. public void initTableFun() {
  83. try{
  84. String tableName = getTableName();
  85. GetIndexRequest request = new GetIndexRequest(tableName);
  86. // DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(this.getTableName());
  87. // restHighLevelClient.indices().delete(deleteIndexRequest,RequestOptions.DEFAULT);
  88. boolean isExists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
  89. if (!isExists) {
  90. log.info("es 索引 开始创建"+tableName);
  91. createTable();
  92. // 初始化旧流水
  93. int num = 0;
  94. while (true) {
  95. int limit = MAX_ROW;
  96. // int offset= num * MAX_ROW;
  97. int offset= num;
  98. // TOrderExample example = new TOrderExample();
  99. // example.setLimit(limit);
  100. // example.setOffset(offset);
  101. // List<TOrder> list = tOrderService.selectByOption(example);
  102. LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  103. SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  104. String time = "2022-07-01 00:00:00";
  105. Date date = ft.parse(time);
  106. query.gt(TOrder::getCreateDate,date);
  107. Page<TOrder> page = new Page<>(offset, limit, true);
  108. IPage<TOrder> iPage = tOrderService.page(page, query);
  109. List<TOrder> list = iPage.getRecords();
  110. insertBatch(list);
  111. num++;
  112. if(list.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
  113. break;
  114. }
  115. }
  116. } else {
  117. log.info("es 索引 "+tableName+" 已存在不再创建");
  118. }
  119. InitEsTableStatus = true;
  120. }catch (Exception e){
  121. log.error("ElasticsearchRunner InitEsTOrderThread 发生错误:{}" , e);
  122. throw new MyException("ElasticsearchRunner InitEsTOrderThread 发生错误:" + e.getMessage());
  123. }
  124. }
  125. @Override
  126. public String getEntityPrimaryKey(TOrder tOrder) {
  127. return String.valueOf(tOrder.getId());
  128. }
  129. @Override
  130. public TOrder setEntityPrimaryKey(TOrder tOrder, String value) {
  131. return null;
  132. }
  133. /**
  134. * 根据时间 重新同步es
  135. * @param
  136. * @return
  137. */
  138. public void updateEsByDate(String startTime, String endTime) {
  139. try {
  140. startTime = startTime.replace("/", "-");
  141. endTime = endTime.replace("/", "-");
  142. Date start = DateUtils.parseDate(startTime, DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss, new Date());
  143. Date end = DateUtils.parseDate(endTime, DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss, new Date());
  144. // 查询es 现有的数据,删除
  145. TOrderParam param = new TOrderParam();
  146. param.setCreateDate_start(start);
  147. param.setCreateDate_end(end);
  148. List<TOrder> list_es = this.selectEntityByEqualToOption(param);
  149. for (TOrder entity : list_es) {
  150. this.deleteTableById(String.valueOf(entity.getId()));
  151. }
  152. // 插入 新的
  153. // List<TOrder> list = tOrderService.selectForEs(param);
  154. LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  155. query.gt(TOrder::getCreateDate,start);
  156. query.lt(TOrder::getCreateDate,end);
  157. List<TOrder> list = tOrderService.list(query);
  158. if (CollectionUtils.isNotEmpty(list)) {
  159. insertBatch(list);
  160. }
  161. } catch (Exception e) {
  162. log.error(this.getTableName()+" updateEsByDate 发生错误:{}", e);
  163. throw new MyException(this.getTableName()+" updateEsByDate 发生错误:" + e.getMessage());
  164. }
  165. }
  166. /**
  167. * es 查询条件
  168. * @param param
  169. * @return
  170. */
  171. public BoolQueryBuilder getStatisticsParam2QueryBuilder(StatisticsParam param){
  172. BoolQueryBuilder boolQueryBuilder;
  173. String startDate = param.getStartDate().replace("/","-");
  174. String endDate = param.getEndDate().replace("/","-");
  175. Date start = DateUtils.parseDate(startDate+" 00:00:00", DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss,new Date());
  176. Date end = DateUtils.parseDate(endDate+" 23:59:59", DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss,new Date());
  177. if(StringUtils.equals("0",param.getIfForeign())){ // 国内用户
  178. TOrderParam tOrderParam = new TOrderParam();
  179. tOrderParam.setStatus(1);
  180. tOrderParam.setPayDate_start(start);
  181. tOrderParam.setPayDate_end(end);
  182. if(StringUtils.isNotEmpty(param.getAdminId())){ // 所属商家id
  183. tOrderParam.setAdminId(Long.parseLong(param.getAdminId()));
  184. }
  185. if(StringUtils.isNotEmpty(param.getEquipmentId())){ // 设备id
  186. tOrderParam.setEquipmentId(Long.parseLong(param.getEquipmentId()));
  187. }
  188. if(param.getEquipmentIds()!=null&&param.getEquipmentIds().size()>0){ // 设备id
  189. tOrderParam.setEquipmentId_inList(param.getEquipmentIds());
  190. }
  191. if(StringUtils.isNotEmpty(param.getPayType())){ // 支付方式
  192. tOrderParam.setFrpCode(param.getPayType());
  193. }
  194. if (null != param.getAgencyId()){
  195. tOrderParam.setAgencyId(param.getAgencyId());
  196. tOrderParam.setType_gt(1);
  197. }
  198. if(null != param.getMerchantId()){
  199. tOrderParam.setMerchantId(param.getMerchantId());
  200. tOrderParam.setType_gt(2);
  201. }
  202. if(param.getAdminIds()!=null&&param.getAdminIds().size()>0){
  203. tOrderParam.setAdminId_inList(param.getAdminIds());
  204. }
  205. boolQueryBuilder = this.getParam2QueryBuilder(tOrderParam);
  206. }else{ // 国外用户
  207. TEquipmentParam tEquipmentParam = new TEquipmentParam();
  208. TCoinOrderParam tCoinOrderParam = new TCoinOrderParam();
  209. tCoinOrderParam.setPayDate_start(start);
  210. tCoinOrderParam.setPayDate_end(end);
  211. // 设备id
  212. if(StringUtils.isNotEmpty(param.getClientId())){
  213. // tEquipmentParam.setId(Long.parseLong(param.getEquipmentId()));
  214. // List<TEquipment> equipmentList = esTEquipmentService.selectEntityByEqualToOption(tEquipmentParam);
  215. // List<String> clientIdList = equipmentList.stream().map(TEquipment::getClientId).collect(Collectors.toList());
  216. // tCoinOrderParam.setClientId_inList(clientIdList);
  217. tCoinOrderParam.setClientId(param.getClientId());
  218. }
  219. // 支付方式
  220. if (StringUtils.isNotEmpty(param.getPayType())) {
  221. tCoinOrderParam.setPayType(Integer.valueOf(param.getPayType()));
  222. }
  223. if(param.getAdminIds() != null && param.getAdminIds().size()>0){
  224. tCoinOrderParam.setAdminId_inList(param.getAdminIds());
  225. }
  226. // 所属商家id
  227. if(StringUtils.isNotEmpty(param.getAdminId())){
  228. tCoinOrderParam.setAdminId(Long.parseLong(param.getAdminId()));
  229. }
  230. // 设备
  231. if(param.getClientIds() != null && param.getClientIds().size()>0){
  232. tCoinOrderParam.setClientId_inList(param.getClientIds());
  233. }
  234. boolQueryBuilder = esTCoinOrderService.getParam2QueryBuilder(tCoinOrderParam);
  235. }
  236. return boolQueryBuilder;
  237. }
  238. public ChartColumn getStatistics(StatisticsParam param) {
  239. String msg = "";
  240. String format = "yyyy-MM";
  241. DateHistogramInterval interval = DateHistogramInterval.DAY;
  242. if (ChartType.day.toString().equals(param.getChartType())) {
  243. interval = DateHistogramInterval.HOUR;
  244. format = "HH:00";
  245. msg = "日统计";
  246. }
  247. if (ChartType.week.toString().equals(param.getChartType())) {
  248. interval = DateHistogramInterval.DAY;
  249. format = "yyyy-MM-dd";
  250. msg = "周统计";
  251. }
  252. if (ChartType.month.toString().equals(param.getChartType())) {
  253. interval = DateHistogramInterval.DAY;
  254. format = "MM-dd";
  255. msg = "月统计";
  256. }
  257. if (ChartType.year.toString().equals(param.getChartType())) {
  258. interval = DateHistogramInterval.MONTH;
  259. format = "MM月";
  260. msg = "年统计";
  261. }
  262. ChartColumn chartColumn = getStatistics(param,format,interval);
  263. if (ChartType.week.toString().equals(param.getChartType())) {
  264. ArrayList<String> categories = new ArrayList<>();
  265. for (String day :chartColumn.getCategories()) {
  266. int week = DateUtils.getWeek(DateUtils.parseDate(day, DateUtils.PATTERN_yyyy_MM_dd,new Date()));
  267. String finalCategories = "周" + (week==0?"日":week);
  268. categories.add(finalCategories);
  269. }
  270. chartColumn.setCategories(categories);
  271. }
  272. return chartColumn;
  273. }
  274. public ChartColumn getStatistics(StatisticsParam param, String format, DateHistogramInterval interval) {
  275. try {
  276. // 仅为名称,用以获取返回结果
  277. String aggregationResultName = "aggregationResult";
  278. // 仅为名称,用以获取返回结果
  279. String aggSumName = "sales";
  280. // 仅为名称,用以获取返回结果
  281. String aggName_productNumber = "productNumber";
  282. // 仅为名称,用以获取返回结果
  283. String aggName_refundQuantity = "refundQuantity";
  284. // 仅为名称,用以获取返回结果
  285. String aggName_refundAmount = "refundAmount";
  286. //Bool查询
  287. BoolQueryBuilder queryBuilder = getStatisticsParam2QueryBuilder(param);
  288. // 公司平台
  289. String companyType = param.getCompanyType();
  290. if(StringUtils.isNotEmpty(companyType)) {
  291. if(companyType.equals("0")) {
  292. queryBuilder.must(
  293. QueryBuilders.boolQuery().should(
  294. QueryBuilders.termQuery("companyType", "0")
  295. ).should(
  296. QueryBuilders.boolQuery().mustNot(
  297. QueryBuilders.existsQuery("companyType")
  298. )
  299. )
  300. );
  301. } else {
  302. queryBuilder.must(QueryBuilders.termQuery("companyType","1"));
  303. }
  304. }
  305. // 设备类型
  306. String machineType = param.getMachineType();
  307. if(StringUtils.isNotEmpty(machineType)) {
  308. if(machineType.equals("0")) {
  309. queryBuilder.must(
  310. QueryBuilders.boolQuery().should(
  311. QueryBuilders.termQuery("machineType", "0")
  312. ).should(
  313. QueryBuilders.boolQuery().mustNot(
  314. QueryBuilders.existsQuery("machineType")
  315. )
  316. )
  317. );
  318. } else {
  319. queryBuilder.must(QueryBuilders.termQuery("machineType","1"));
  320. }
  321. }
  322. // 时间聚合
  323. AggregationBuilder dateHistogram = AggregationBuilders.dateHistogram(aggregationResultName)
  324. .field("payDate")
  325. .calendarInterval(interval)
  326. .format(format);
  327. // 根据字段price 求和 sum, sales 仅为名称,用以获取返回结果
  328. // 订单金额
  329. SumAggregationBuilder sumAggregationBuilder = AggregationBuilders.sum(aggSumName).field("price");
  330. dateHistogram.subAggregation(sumAggregationBuilder);
  331. // 订单数量
  332. SumAggregationBuilder sumAggregationBuilder2 = AggregationBuilders.sum(aggName_productNumber).field("productNumber");
  333. dateHistogram.subAggregation(sumAggregationBuilder2);
  334. // 退款数量
  335. SumAggregationBuilder sumAggregationBuilder3 = AggregationBuilders.sum(aggName_refundQuantity).field("refundQuantity");
  336. dateHistogram.subAggregation(sumAggregationBuilder3);
  337. // 退款金额
  338. SumAggregationBuilder sumAggregationBuilder4 = AggregationBuilders.sum(aggName_refundAmount).field("refundAmount");
  339. dateHistogram.subAggregation(sumAggregationBuilder4);
  340. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  341. // 指定size为0,不返回文档 因为只需要数量
  342. sourceBuilder.query(queryBuilder).aggregation(dateHistogram).size(0);
  343. String tableName = getTableName();
  344. if(StringUtils.isNotEmpty(param.getIfForeign()) && param.getIfForeign().equals("1")) {
  345. tableName = esTCoinOrderService.getTableName();
  346. }
  347. SearchRequest searchRequest = new SearchRequest(tableName);
  348. searchRequest.source(sourceBuilder);
  349. log.debug("sourceBuilder:{}", sourceBuilder);
  350. SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  351. Aggregation agg = searchResponse.getAggregations().get(aggregationResultName);
  352. List<? extends Histogram.Bucket> buckets = ((Histogram) agg).getBuckets();
  353. List<ChartBean> chartBeanList = new ArrayList<>();
  354. for (Histogram.Bucket bucket : buckets) {
  355. ChartBean chartBean = new ChartBean();
  356. chartBean.setCategorie(bucket.getKeyAsString());
  357. // chartBean.setSaleNum(Float.valueOf(bucket.getDocCount()));
  358. //统计商品数
  359. ParsedSum saleNum1 = bucket.getAggregations().get(aggName_productNumber);
  360. ParsedSum saleNum2 = bucket.getAggregations().get(aggName_refundQuantity);
  361. String saleNumStr1 = format1((float) saleNum1.getValue());
  362. String saleNumStr2 = format1((float) saleNum2.getValue());
  363. if(StringUtils.isEmpty(saleNumStr1)){
  364. saleNumStr1 = "1";
  365. }
  366. chartBean.setSaleNum(Float.valueOf(saleNumStr1) - Float.valueOf(saleNumStr2));
  367. //统计订单数
  368. chartBean.setOrderNum(Float.valueOf(bucket.getDocCount()));
  369. // chartBean.setSaleNum(Long.valueOf(bucket.getDocCount()).intValue());
  370. ParsedSum sum = bucket.getAggregations().get(aggSumName);
  371. ParsedSum refundSum = bucket.getAggregations().get(aggName_refundAmount);
  372. String s1 = format1((float) sum.getValue());
  373. String s2 = format1((float) refundSum.getValue());
  374. chartBean.setSalePrice(Float.valueOf(s1) - Float.valueOf(s2));
  375. // chartBean.setSalePrice(Double.valueOf(sum.getValue()).intValue());
  376. chartBeanList.add(chartBean);
  377. }
  378. ChartColumn chartColumn = new ChartColumn(chartBeanList);
  379. return chartColumn;
  380. } catch (Exception e) {
  381. log.error(getTableName() + " es根据条件 聚合查询 报错:{}", e);
  382. throw new MyException(getTableName() + " es根据条件 聚合查询 报错:" + e.getMessage());
  383. }
  384. }
  385. public static String format1(Float value){
  386. BigDecimal bd = new BigDecimal(value);//创建一个bd对象,将要转换的值value传入构造函数
  387. bd = bd.setScale(2, RoundingMode.HALF_UP);//调用setScale方法进行数据格式化,保留两位小数,采用四舍五入规则
  388. return bd.toString(); //返回bd对象的值(转化为string形式)
  389. }
  390. public void tongbuByHour() throws ParseException {
  391. String startDate = null;
  392. String endDate = null;
  393. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  394. Calendar calendar = Calendar.getInstance();
  395. calendar.setTime(new Date());
  396. endDate = sdf.format(calendar.getTime());
  397. calendar.add(Calendar.HOUR_OF_DAY, -1);
  398. // calendar.add(calendar.DATE, -4);
  399. startDate = sdf.format(calendar.getTime());
  400. Date start = sdf.parse(startDate);
  401. Date end = sdf.parse(endDate);
  402. // 查询es 现有的数据
  403. //1,同步已支付的订单
  404. int num = 0;
  405. while (true) {
  406. int limit = MAX_ROW;
  407. int offset= num * MAX_ROW;
  408. LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  409. query.gt(TOrder::getCreateDate,start);
  410. query.lt(TOrder::getCreateDate,end);
  411. query.eq(TOrder::getStatus,1);
  412. Page<TOrder> page = new Page<>(offset, limit, true);
  413. IPage<TOrder> iPage = tOrderService.page(page, query);
  414. List<TOrder> list_es = iPage.getRecords();
  415. if(list_es.size()>0){
  416. insertBatch(list_es);
  417. for(TOrder order:list_es){
  418. try {
  419. updateDataById(order);
  420. } catch (Exception e) {
  421. e.printStackTrace();
  422. }
  423. }
  424. }
  425. num++;
  426. if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
  427. break;
  428. }
  429. }
  430. //2,同步已退款的订单
  431. int num2 = 0;
  432. while (true) {
  433. int limit = MAX_ROW;
  434. int offset= num2 * MAX_ROW;
  435. LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  436. query.gt(TOrder::getCreateDate,start);
  437. query.lt(TOrder::getCreateDate,end);
  438. query.eq(TOrder::getStatus,3);
  439. Page<TOrder> page = new Page<>(offset, limit, true);
  440. IPage<TOrder> iPage = tOrderService.page(page, query);
  441. List<TOrder> list_es = iPage.getRecords();
  442. if(list_es.size()>0){
  443. insertBatch(list_es);
  444. for(TOrder order:list_es){
  445. try {
  446. updateDataById(order);
  447. } catch (Exception e) {
  448. e.printStackTrace();
  449. }
  450. }
  451. }
  452. num2++;
  453. if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
  454. break;
  455. }
  456. }
  457. // 查询es 现有的数据
  458. // LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  459. // query.gt(TOrder::getCreateDate,start);
  460. // query.lt(TOrder::getCreateDate,end);
  461. // List<TOrder> list_es = tOrderService.list(query);
  462. // if (CollectionUtils.isNotEmpty(list_es)) {
  463. // insertBatch(list_es);
  464. // if(list_es.size()>0){
  465. // for(TOrder order : list_es){
  466. // try {
  467. // updateDataById(order);
  468. // } catch (Exception e) {
  469. // e.printStackTrace();
  470. // }
  471. // }
  472. // }
  473. // }
  474. }
  475. public void tongbuByDay() throws ParseException {
  476. String startDate = null;
  477. String endDate = null;
  478. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  479. Calendar calendar = Calendar.getInstance();
  480. calendar.setTime(new Date());
  481. endDate = sdf.format(calendar.getTime());
  482. calendar.add(calendar.DATE, -1);
  483. // calendar.add(calendar.DATE, -4);
  484. startDate = sdf.format(calendar.getTime());
  485. Date start = sdf.parse(startDate);
  486. Date end = sdf.parse(endDate);
  487. // 查询es 现有的数据
  488. //1,同步已支付的订单
  489. int num = 0;
  490. while (true) {
  491. int limit = MAX_ROW;
  492. int offset= num * MAX_ROW;
  493. LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  494. query.gt(TOrder::getCreateDate,start);
  495. query.lt(TOrder::getCreateDate,end);
  496. query.eq(TOrder::getStatus,1);
  497. Page<TOrder> page = new Page<>(offset, limit, true);
  498. IPage<TOrder> iPage = tOrderService.page(page, query);
  499. List<TOrder> list_es = iPage.getRecords();
  500. if(list_es.size()>0){
  501. insertBatch(list_es);
  502. for(TOrder order:list_es){
  503. try {
  504. updateDataById(order);
  505. } catch (Exception e) {
  506. e.printStackTrace();
  507. }
  508. }
  509. }
  510. num++;
  511. if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
  512. break;
  513. }
  514. }
  515. //2,同步已退款的订单
  516. int num2 = 0;
  517. while (true) {
  518. int limit = MAX_ROW;
  519. int offset= num2 * MAX_ROW;
  520. LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  521. query.gt(TOrder::getCreateDate,start);
  522. query.lt(TOrder::getCreateDate,end);
  523. query.eq(TOrder::getStatus,3);
  524. Page<TOrder> page = new Page<>(offset, limit, true);
  525. IPage<TOrder> iPage = tOrderService.page(page, query);
  526. List<TOrder> list_es = iPage.getRecords();
  527. if(list_es.size()>0){
  528. insertBatch(list_es);
  529. for(TOrder order:list_es){
  530. try {
  531. updateDataById(order);
  532. } catch (Exception e) {
  533. e.printStackTrace();
  534. }
  535. }
  536. }
  537. num2++;
  538. if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
  539. break;
  540. }
  541. }
  542. // 查询es 现有的数据
  543. // LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
  544. // query.gt(TOrder::getCreateDate,start);
  545. // query.lt(TOrder::getCreateDate,end);
  546. // List<TOrder> list_es = tOrderService.list(query);
  547. // if (CollectionUtils.isNotEmpty(list_es)) {
  548. // insertBatch(list_es);
  549. // if(list_es.size()>0){
  550. // for(TOrder order : list_es){
  551. // try {
  552. // updateDataById(order);
  553. // } catch (Exception e) {
  554. // e.printStackTrace();
  555. // }
  556. // }
  557. // }
  558. // }
  559. }
  560. }