123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541 |
- package com.szwl.service.es;
- import cn.com.crbank.ommo.esclient.EsBaseService;
- import cn.com.crbank.ommo.exception.MyException;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.baomidou.mybatisplus.core.metadata.IPage;
- import com.baomidou.mybatisplus.core.toolkit.Wrappers;
- import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
- import com.szwl.model.bean.ChartBean;
- import com.szwl.model.bean.ChartColumn;
- import com.szwl.model.bo.ChartType;
- import com.szwl.model.entity.TEquipment;
- import com.szwl.model.entity.TOrder;
- import com.szwl.model.query.StatisticsParam;
- import com.szwl.model.query.TCoinOrderParam;
- import com.szwl.model.query.TEquipmentParam;
- import com.szwl.model.query.TOrderParam;
- import com.szwl.model.utils.DateUtils;
- import com.szwl.service.TOrderService;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.collections4.CollectionUtils;
- import org.apache.commons.lang3.StringUtils;
- import org.elasticsearch.action.search.SearchRequest;
- import org.elasticsearch.action.search.SearchResponse;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.indices.GetIndexRequest;
- import org.elasticsearch.index.query.BoolQueryBuilder;
- import org.elasticsearch.search.aggregations.Aggregation;
- import org.elasticsearch.search.aggregations.AggregationBuilder;
- import org.elasticsearch.search.aggregations.AggregationBuilders;
- import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
- import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
- import org.elasticsearch.search.aggregations.metrics.ParsedSum;
- import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
- import org.elasticsearch.search.builder.SearchSourceBuilder;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import java.io.IOException;
- import java.lang.reflect.InvocationTargetException;
- import java.math.BigDecimal;
- import java.math.RoundingMode;
- import java.text.ParseException;
- import java.text.SimpleDateFormat;
- import java.util.ArrayList;
- import java.util.Calendar;
- import java.util.Date;
- import java.util.List;
- import java.util.concurrent.ExecutionException;
- import java.util.stream.Collectors;
- @Slf4j
- @Service
- public class EsTOrderService extends EsBaseService<TOrder, TOrderParam> {
- private static boolean InitEsTableStatus = false; // 是否正常完成 es 初始化
- public static final int MAX_ROW = 1000;
- @Autowired
- TOrderService tOrderService;
- @Autowired
- EsTEquipmentService esTEquipmentService;
- @Autowired
- EsTCoinOrderService esTCoinOrderService;
- @Override
- public String getTableName() {
- return "es_t_order";
- }
- // public String getTableName() {
- // return "es_t_order";
- // }
- @Override
- public TOrder getInstanceOfEntity() {
- return new TOrder();
- }
- @Override
- public void setInitTableStatus(boolean flag) {
- InitEsTableStatus = flag;
- }
- @Override
- public boolean getInitTableStatus() {
- return InitEsTableStatus;
- }
- @Override
- public void initTableFun() {
- try{
- String tableName = getTableName();
- GetIndexRequest request = new GetIndexRequest(tableName);
- // DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(this.getTableName());
- // restHighLevelClient.indices().delete(deleteIndexRequest,RequestOptions.DEFAULT);
- boolean isExists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
- if (!isExists) {
- log.info("es 索引 开始创建"+tableName);
- createTable();
- // 初始化旧流水
- int num = 0;
- while (true) {
- int limit = MAX_ROW;
- // int offset= num * MAX_ROW;
- int offset= num;
- // TOrderExample example = new TOrderExample();
- // example.setLimit(limit);
- // example.setOffset(offset);
- // List<TOrder> list = tOrderService.selectByOption(example);
- LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
- SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- String time = "2022-07-01 00:00:00";
- Date date = ft.parse(time);
- query.gt(TOrder::getCreateDate,date);
- Page<TOrder> page = new Page<>(offset, limit, true);
- IPage<TOrder> iPage = tOrderService.page(page, query);
- List<TOrder> list = iPage.getRecords();
- insertBatch(list);
- num++;
- if(list.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
- break;
- }
- }
- } else {
- log.info("es 索引 "+tableName+" 已存在不再创建");
- }
- InitEsTableStatus = true;
- }catch (Exception e){
- log.error("ElasticsearchRunner InitEsTOrderThread 发生错误:{}" , e);
- throw new MyException("ElasticsearchRunner InitEsTOrderThread 发生错误:" + e.getMessage());
- }
- }
- @Override
- public String getEntityPrimaryKey(TOrder tOrder) {
- return String.valueOf(tOrder.getId());
- }
- @Override
- public TOrder setEntityPrimaryKey(TOrder tOrder, String value) {
- return null;
- }
- // @Override
- // public TOrder setEntityPrimaryKey(TOrder tOrder, String value) {
- // return tOrder.setId(Long.parseLong(value));
- // }
- /**
- * 根据时间 重新同步es
- * @param
- * @return
- */
- public void updateEsByDate(String startTime, String endTime) {
- try {
- startTime = startTime.replace("/", "-");
- endTime = endTime.replace("/", "-");
- Date start = DateUtils.parseDate(startTime, DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss, new Date());
- Date end = DateUtils.parseDate(endTime, DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss, new Date());
- // 查询es 现有的数据,删除
- TOrderParam param = new TOrderParam();
- param.setCreateDate_start(start);
- param.setCreateDate_end(end);
- List<TOrder> list_es = this.selectEntityByEqualToOption(param);
- for (TOrder entity : list_es) {
- this.deleteTableById(String.valueOf(entity.getId()));
- }
- // 插入 新的
- // List<TOrder> list = tOrderService.selectForEs(param);
- LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
- query.gt(TOrder::getCreateDate,start);
- query.lt(TOrder::getCreateDate,end);
- List<TOrder> list = tOrderService.list(query);
- if (CollectionUtils.isNotEmpty(list)) {
- insertBatch(list);
- }
- } catch (Exception e) {
- log.error(this.getTableName()+" updateEsByDate 发生错误:{}", e);
- throw new MyException(this.getTableName()+" updateEsByDate 发生错误:" + e.getMessage());
- }
- }
- /**
- * es 查询条件
- * @param param
- * @return
- */
- public BoolQueryBuilder getStatisticsParam2QueryBuilder(StatisticsParam param){
- BoolQueryBuilder boolQueryBuilder;
- String startDate = param.getStartDate().replace("/","-");
- String endDate = param.getEndDate().replace("/","-");
- Date start = DateUtils.parseDate(startDate+" 00:00:00", DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss,new Date());
- Date end = DateUtils.parseDate(endDate+" 23:59:59", DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss,new Date());
- if(StringUtils.equals("0",param.getIfForeign())){ // 国内用户
- TOrderParam tOrderParam = new TOrderParam();
- tOrderParam.setStatus(1);
- tOrderParam.setCreateDate_start(start);
- tOrderParam.setCreateDate_end(end);
- if(StringUtils.isNotEmpty(param.getAdminId())){ // 所属商家id
- tOrderParam.setAdminId(Long.parseLong(param.getAdminId()));
- }
- if(StringUtils.isNotEmpty(param.getEquipmentId())){ // 设备id
- tOrderParam.setEquipmentId(Long.parseLong(param.getEquipmentId()));
- }
- if(param.getEquipmentIds()!=null&¶m.getEquipmentIds().size()>0){ // 设备id
- tOrderParam.setEquipmentId_inList(param.getEquipmentIds());
- }
- if(StringUtils.isNotEmpty(param.getPayType())){ // 支付方式
- tOrderParam.setFrpCode(param.getPayType());
- }
- if (null != param.getAgencyId()){
- tOrderParam.setAgencyId(param.getAgencyId());
- tOrderParam.setType_gt(1);
- }
- if(null != param.getMerchantId()){
- tOrderParam.setMerchantId(param.getMerchantId());
- tOrderParam.setType_gt(2);
- }
- if(param.getAdminIds()!=null&¶m.getAdminIds().size()>0){
- tOrderParam.setAdminId_inList(param.getAdminIds());
- }
- boolQueryBuilder = this.getParam2QueryBuilder(tOrderParam);
- }else{ // 国外用户
- TEquipmentParam tEquipmentParam = new TEquipmentParam();
- if(StringUtils.isNotEmpty(param.getEquipmentId())){ // 设备id
- tEquipmentParam.setId(Long.parseLong(param.getEquipmentId()));
- }
- List<TEquipment> equipmentList = esTEquipmentService.selectEntityByEqualToOption(tEquipmentParam);
- List<String> clientIdList = equipmentList.stream().map(TEquipment::getClientId).collect(Collectors.toList());
- TCoinOrderParam tCoinOrderParam = new TCoinOrderParam();
- tCoinOrderParam.setCreateDate_start(start);
- tCoinOrderParam.setCreateDate_end(end);
- tCoinOrderParam.setClientId_inList(clientIdList);
- if(param.getAdminIds().size()>0){
- tCoinOrderParam.setAdminId_inList(param.getAdminIds());
- }
- if(StringUtils.isNotEmpty(param.getAdminId())){ // 所属商家id
- tCoinOrderParam.setAdminId(Long.parseLong(param.getAdminId()));
- }
- if(param.getClientIds().size()>0){ // 设备
- tCoinOrderParam.setClientId_inList(param.getClientIds());
- }
- boolQueryBuilder = esTCoinOrderService.getParam2QueryBuilder(tCoinOrderParam);
- }
- return boolQueryBuilder;
- }
- public ChartColumn getStatistics(StatisticsParam param) {
- String msg = "";
- String format = "yyyy-MM";
- DateHistogramInterval interval = DateHistogramInterval.DAY;
- if (ChartType.day.toString().equals(param.getChartType())) {
- interval = DateHistogramInterval.HOUR;
- format = "HH点";
- msg = "日统计";
- }
- if (ChartType.week.toString().equals(param.getChartType())) {
- interval = DateHistogramInterval.DAY;
- format = "yyyy-MM-dd";
- msg = "周统计";
- }
- if (ChartType.month.toString().equals(param.getChartType())) {
- interval = DateHistogramInterval.DAY;
- format = "MM月dd";
- msg = "月统计";
- }
- if (ChartType.year.toString().equals(param.getChartType())) {
- interval = DateHistogramInterval.MONTH;
- format = "MM月";
- msg = "年统计";
- }
- ChartColumn chartColumn = getStatistics(param,format,interval);
- if (ChartType.week.toString().equals(param.getChartType())) {
- ArrayList<String> categories = new ArrayList<>();
- for (String day :chartColumn.getCategories()) {
- int week = DateUtils.getWeek(DateUtils.parseDate(day, DateUtils.PATTERN_yyyy_MM_dd,new Date()));
- String finalCategories = "周" + (week==0?"日":week);
- categories.add(finalCategories);
- }
- chartColumn.setCategories(categories);
- }
- return chartColumn;
- }
- public ChartColumn getStatistics(StatisticsParam param, String format, DateHistogramInterval interval) {
- try {
- String aggregationResultName = "aggregationResult";// 仅为名称,用以获取返回结果
- String aggSumName = "sales"; // 仅为名称,用以获取返回结果
- String aggName_productNumber = "productNumber"; // 仅为名称,用以获取返回结果
- //Bool查询
- BoolQueryBuilder queryBuilder = getStatisticsParam2QueryBuilder(param);
- // 时间聚合
- AggregationBuilder dateHistogram = AggregationBuilders.dateHistogram(aggregationResultName)
- .field("createDate")
- .calendarInterval(interval)
- .format(format);
- // 根据字段price 求和 sum, sales 仅为名称,用以获取返回结果
- SumAggregationBuilder sumAggregationBuilder = AggregationBuilders.sum(aggSumName).field("price");
- dateHistogram.subAggregation(sumAggregationBuilder);
- SumAggregationBuilder sumAggregationBuilder2 = AggregationBuilders.sum(aggName_productNumber).field("productNumber");
- dateHistogram.subAggregation(sumAggregationBuilder2);
- SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
- // 指定size为0,不返回文档 因为只需要数量
- sourceBuilder.query(queryBuilder).aggregation(dateHistogram).size(0);
- SearchRequest searchRequest = new SearchRequest(getTableName());
- searchRequest.source(sourceBuilder);
- log.debug("sourceBuilder:{}", sourceBuilder);
- SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
- Aggregation agg = searchResponse.getAggregations().get(aggregationResultName);
- List<? extends Histogram.Bucket> buckets = ((Histogram) agg).getBuckets();
- List<ChartBean> chartBeanList = new ArrayList<>();
- for (Histogram.Bucket bucket : buckets) {
- ChartBean chartBean = new ChartBean();
- chartBean.setCategorie(bucket.getKeyAsString());
- // chartBean.setSaleNum(Float.valueOf(bucket.getDocCount()));
- //统计商品数
- ParsedSum saleNum = bucket.getAggregations().get(aggName_productNumber);
- String saleNumStr = format1((float) saleNum.getValue());
- if(StringUtils.isEmpty(saleNumStr)){
- saleNumStr = "1";
- }
- chartBean.setSaleNum(Float.valueOf(saleNumStr));
- //统计订单数
- chartBean.setOrderNum(Float.valueOf(bucket.getDocCount()));
- // chartBean.setSaleNum(Long.valueOf(bucket.getDocCount()).intValue());
- ParsedSum sum = bucket.getAggregations().get(aggSumName);
- String s = format1((float) sum.getValue());
- chartBean.setSalePrice(Float.valueOf(s));
- // chartBean.setSalePrice(Double.valueOf(sum.getValue()).intValue());
- chartBeanList.add(chartBean);
- }
- ChartColumn chartColumn = new ChartColumn(chartBeanList);
- return chartColumn;
- } catch (Exception e) {
- log.error(getTableName() + " es根据条件 聚合查询 报错:{}", e);
- throw new MyException(getTableName() + " es根据条件 聚合查询 报错:" + e.getMessage());
- }
- }
- public static String format1(Float value){
- BigDecimal bd = new BigDecimal(value);//创建一个bd对象,将要转换的值value传入构造函数
- bd = bd.setScale(2, RoundingMode.HALF_UP);//调用setScale方法进行数据格式化,保留两位小数,采用四舍五入规则
- return bd.toString(); //返回bd对象的值(转化为string形式)
- }
- public void tongbuByHour() throws ParseException {
- String startDate = null;
- String endDate = null;
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- Calendar calendar = Calendar.getInstance();
- calendar.setTime(new Date());
- endDate = sdf.format(calendar.getTime());
- calendar.add(calendar.HOUR_OF_DAY, -1);
- // calendar.add(calendar.DATE, -4);
- startDate = sdf.format(calendar.getTime());
- Date start = sdf.parse(startDate);
- Date end = sdf.parse(endDate);
- // 查询es 现有的数据
- //1,同步已支付的订单
- int num = 0;
- while (true) {
- int limit = MAX_ROW;
- int offset= num * MAX_ROW;
- LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
- query.gt(TOrder::getCreateDate,start);
- query.lt(TOrder::getCreateDate,end);
- query.eq(TOrder::getStatus,1);
- Page<TOrder> page = new Page<>(offset, limit, true);
- IPage<TOrder> iPage = tOrderService.page(page, query);
- List<TOrder> list_es = iPage.getRecords();
- if(list_es.size()>0){
- insertBatch(list_es);
- for(TOrder order:list_es){
- try {
- updateDataById(order);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- num++;
- if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
- break;
- }
- }
- //2,同步已退款的订单
- int num2 = 0;
- while (true) {
- int limit = MAX_ROW;
- int offset= num2 * MAX_ROW;
- LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
- query.gt(TOrder::getCreateDate,start);
- query.lt(TOrder::getCreateDate,end);
- query.eq(TOrder::getStatus,3);
- Page<TOrder> page = new Page<>(offset, limit, true);
- IPage<TOrder> iPage = tOrderService.page(page, query);
- List<TOrder> list_es = iPage.getRecords();
- if(list_es.size()>0){
- insertBatch(list_es);
- for(TOrder order:list_es){
- try {
- updateDataById(order);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- num2++;
- if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
- break;
- }
- }
- // 查询es 现有的数据
- // LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
- // query.gt(TOrder::getCreateDate,start);
- // query.lt(TOrder::getCreateDate,end);
- // List<TOrder> list_es = tOrderService.list(query);
- // if (CollectionUtils.isNotEmpty(list_es)) {
- // insertBatch(list_es);
- // if(list_es.size()>0){
- // for(TOrder order : list_es){
- // try {
- // updateDataById(order);
- // } catch (Exception e) {
- // e.printStackTrace();
- // }
- // }
- // }
- // }
- }
- public void tongbuByDay() throws ParseException {
- String startDate = null;
- String endDate = null;
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- Calendar calendar = Calendar.getInstance();
- calendar.setTime(new Date());
- endDate = sdf.format(calendar.getTime());
- calendar.add(calendar.DATE, -1);
- // calendar.add(calendar.DATE, -4);
- startDate = sdf.format(calendar.getTime());
- Date start = sdf.parse(startDate);
- Date end = sdf.parse(endDate);
- // 查询es 现有的数据
- //1,同步已支付的订单
- int num = 0;
- while (true) {
- int limit = MAX_ROW;
- int offset= num * MAX_ROW;
- LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
- query.gt(TOrder::getCreateDate,start);
- query.lt(TOrder::getCreateDate,end);
- query.eq(TOrder::getStatus,1);
- Page<TOrder> page = new Page<>(offset, limit, true);
- IPage<TOrder> iPage = tOrderService.page(page, query);
- List<TOrder> list_es = iPage.getRecords();
- if(list_es.size()>0){
- insertBatch(list_es);
- for(TOrder order:list_es){
- try {
- updateDataById(order);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- num++;
- if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
- break;
- }
- }
- //2,同步已退款的订单
- int num2 = 0;
- while (true) {
- int limit = MAX_ROW;
- int offset= num2 * MAX_ROW;
- LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
- query.gt(TOrder::getCreateDate,start);
- query.lt(TOrder::getCreateDate,end);
- query.eq(TOrder::getStatus,3);
- Page<TOrder> page = new Page<>(offset, limit, true);
- IPage<TOrder> iPage = tOrderService.page(page, query);
- List<TOrder> list_es = iPage.getRecords();
- if(list_es.size()>0){
- insertBatch(list_es);
- for(TOrder order:list_es){
- try {
- updateDataById(order);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- num2++;
- if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
- break;
- }
- }
- // 查询es 现有的数据
- // LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
- // query.gt(TOrder::getCreateDate,start);
- // query.lt(TOrder::getCreateDate,end);
- // List<TOrder> list_es = tOrderService.list(query);
- // if (CollectionUtils.isNotEmpty(list_es)) {
- // insertBatch(list_es);
- // if(list_es.size()>0){
- // for(TOrder order : list_es){
- // try {
- // updateDataById(order);
- // } catch (Exception e) {
- // e.printStackTrace();
- // }
- // }
- // }
- // }
- }
- }
|