|
- package com.szwl.service.es;
- import cn.com.crbank.ommo.esclient.EsBaseService;
- import cn.com.crbank.ommo.exception.MyException;
- import cn.hutool.core.util.RandomUtil;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.baomidou.mybatisplus.core.metadata.IPage;
- import com.baomidou.mybatisplus.core.toolkit.Wrappers;
- import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
- import com.szwl.feign.SzwlFeign;
- import com.szwl.model.bean.ChartBean;
- import com.szwl.model.bean.ChartColumn;
- import com.szwl.model.bean.OrderDaoChuDTO;
- import com.szwl.model.bo.ChartType;
- import com.szwl.model.bo.R;
- import com.szwl.model.entity.*;
- import com.szwl.model.query.StatisticsParam;
- import com.szwl.model.query.TCoinOrderParam;
- import com.szwl.model.query.TEquipmentParam;
- import com.szwl.model.query.TOrderParam;
- import com.szwl.model.utils.DateUtils;
- import com.szwl.service.OrderStatisticsMonthService;
- import com.szwl.service.OrderStatisticsYearService;
- import com.szwl.service.TOrderService;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.collections4.CollectionUtils;
- import org.apache.commons.lang3.StringUtils;
- import org.elasticsearch.action.search.SearchRequest;
- import org.elasticsearch.action.search.SearchResponse;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.indices.GetIndexRequest;
- import org.elasticsearch.index.query.BoolQueryBuilder;
- import org.elasticsearch.index.query.QueryBuilders;
- import org.elasticsearch.search.aggregations.*;
- import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
- import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
- import org.elasticsearch.search.aggregations.bucket.terms.Terms;
- import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
- import org.elasticsearch.search.aggregations.metrics.ParsedSum;
- import org.elasticsearch.search.aggregations.metrics.Sum;
- import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
- import org.elasticsearch.search.builder.SearchSourceBuilder;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import java.math.BigDecimal;
- import java.math.RoundingMode;
- import java.text.ParseException;
- import java.text.SimpleDateFormat;
- import java.util.ArrayList;
- import java.util.Calendar;
- import java.util.Date;
- import java.util.List;
- @Slf4j
- @Service
- public class EsTOrderService extends EsBaseService<TOrder, TOrderParam> {
- private static boolean InitEsTableStatus = false; // 是否正常完成 es 初始化
- public static final int MAX_ROW = 1000;
- @Autowired
- TOrderService tOrderService;
- @Autowired
- EsTEquipmentService esTEquipmentService;
- @Autowired
- EsTCoinOrderService esTCoinOrderService;
- @Autowired
- SzwlFeign szwlFeign;
- @Autowired
- OrderStatisticsMonthService orderStatisticsMonthService;
- @Autowired
- OrderStatisticsYearService orderStatisticsYearService;
- @Override
- public String getTableName() {
- return "es_t_order";
- }
- // public String getTableName() {
- // return "es_t_order";
- // }
- @Override
- public TOrder getInstanceOfEntity() {
- return new TOrder();
- }
- @Override
- public void setInitTableStatus(boolean flag) {
- InitEsTableStatus = flag;
- }
- @Override
- public boolean getInitTableStatus() {
- return InitEsTableStatus;
- }
- @Override
- public void initTableFun() {
- try{
- String tableName = getTableName();
- GetIndexRequest request = new GetIndexRequest(tableName);
- // DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(this.getTableName());
- // restHighLevelClient.indices().delete(deleteIndexRequest,RequestOptions.DEFAULT);
- boolean isExists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
- if (!isExists) {
- log.info("es 索引 开始创建"+tableName);
- createTable();
- // 初始化旧流水
- int num = 0;
- while (true) {
- int limit = MAX_ROW;
- // int offset= num * MAX_ROW;
- int offset= num;
- // TOrderExample example = new TOrderExample();
- // example.setLimit(limit);
- // example.setOffset(offset);
- // List<TOrder> list = tOrderService.selectByOption(example);
- LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
- SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- String time = "2022-07-01 00:00:00";
- Date date = ft.parse(time);
- query.gt(TOrder::getCreateDate,date);
- Page<TOrder> page = new Page<>(offset, limit, true);
- IPage<TOrder> iPage = tOrderService.page(page, query);
- List<TOrder> list = iPage.getRecords();
- insertBatch(list);
- num++;
- if(list.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
- break;
- }
- }
- } else {
- log.info("es 索引 "+tableName+" 已存在不再创建");
- }
- InitEsTableStatus = true;
- }catch (Exception e){
- log.error("ElasticsearchRunner InitEsTOrderThread 发生错误:{}" , e);
- throw new MyException("ElasticsearchRunner InitEsTOrderThread 发生错误:" + e.getMessage());
- }
- }
- @Override
- public String getEntityPrimaryKey(TOrder tOrder) {
- return String.valueOf(tOrder.getId());
- }
- @Override
- public TOrder setEntityPrimaryKey(TOrder tOrder, String value) {
- return null;
- }
- /**
- * 根据时间 重新同步es
- * @param
- * @return
- */
- public void updateEsByDate(String startTime, String endTime) {
- try {
- startTime = startTime.replace("/", "-");
- endTime = endTime.replace("/", "-");
- Date start = DateUtils.parseDate(startTime, DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss, new Date());
- Date end = DateUtils.parseDate(endTime, DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss, new Date());
- // 查询es 现有的数据,删除
- TOrderParam param = new TOrderParam();
- param.setCreateDate_start(start);
- param.setCreateDate_end(end);
- List<TOrder> list_es = this.selectEntityByEqualToOption(param);
- for (TOrder entity : list_es) {
- this.deleteTableById(String.valueOf(entity.getId()));
- }
- // 插入 新的
- // List<TOrder> list = tOrderService.selectForEs(param);
- LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
- query.gt(TOrder::getCreateDate,start);
- query.lt(TOrder::getCreateDate,end);
- List<TOrder> list = tOrderService.list(query);
- if (CollectionUtils.isNotEmpty(list)) {
- insertBatch(list);
- }
- } catch (Exception e) {
- log.error(this.getTableName()+" updateEsByDate 发生错误:{}", e);
- throw new MyException(this.getTableName()+" updateEsByDate 发生错误:" + e.getMessage());
- }
- }
- /**
- * es 查询条件
- * @param param
- * @return
- */
- public BoolQueryBuilder getStatisticsParam2QueryBuilder(StatisticsParam param){
- BoolQueryBuilder boolQueryBuilder;
- String startDate = param.getStartDate().replace("/","-");
- String endDate = param.getEndDate().replace("/","-");
- Date start = DateUtils.parseDate(startDate+" 00:00:00", DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss,new Date());
- Date end = DateUtils.parseDate(endDate+" 23:59:59", DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss,new Date());
- if(StringUtils.equals("0",param.getIfForeign())){ // 国内用户
- TOrderParam tOrderParam = new TOrderParam();
- tOrderParam.setStatus(1);
- tOrderParam.setPayDate_start(start);
- tOrderParam.setPayDate_end(end);
- if(StringUtils.isNotEmpty(param.getAdminId())){ // 所属商家id
- tOrderParam.setAdminId(Long.parseLong(param.getAdminId()));
- }
- if(StringUtils.isNotEmpty(param.getEquipmentId())){ // 设备id
- tOrderParam.setEquipmentId(Long.parseLong(param.getEquipmentId()));
- }
- if(param.getEquipmentIds()!=null&¶m.getEquipmentIds().size()>0){ // 设备id
- tOrderParam.setEquipmentId_inList(param.getEquipmentIds());
- }
- if(StringUtils.isNotEmpty(param.getPayType())){ // 支付方式
- tOrderParam.setFrpCode(param.getPayType());
- }
- if (null != param.getAgencyId()){
- tOrderParam.setAgencyId(param.getAgencyId());
- tOrderParam.setType_gt(1);
- }
- if(null != param.getMerchantId()){
- tOrderParam.setMerchantId(param.getMerchantId());
- tOrderParam.setType_gt(2);
- }
- if(param.getAdminIds()!=null&¶m.getAdminIds().size()>0){
- tOrderParam.setAdminId_inList(param.getAdminIds());
- }
- boolQueryBuilder = this.getParam2QueryBuilder(tOrderParam);
- }else{ // 国外用户
- TEquipmentParam tEquipmentParam = new TEquipmentParam();
- TCoinOrderParam tCoinOrderParam = new TCoinOrderParam();
- tCoinOrderParam.setPayDate_start(start);
- tCoinOrderParam.setPayDate_end(end);
- // 设备id
- if(StringUtils.isNotEmpty(param.getClientId())){
- // tEquipmentParam.setId(Long.parseLong(param.getEquipmentId()));
- // List<TEquipment> equipmentList = esTEquipmentService.selectEntityByEqualToOption(tEquipmentParam);
- // List<String> clientIdList = equipmentList.stream().map(TEquipment::getClientId).collect(Collectors.toList());
- // tCoinOrderParam.setClientId_inList(clientIdList);
- tCoinOrderParam.setClientId(param.getClientId());
- }
- // 支付方式
- if (StringUtils.isNotEmpty(param.getPayType())) {
- tCoinOrderParam.setPayType(Integer.valueOf(param.getPayType()));
- }
- if(param.getAdminIds() != null && param.getAdminIds().size()>0){
- tCoinOrderParam.setAdminId_inList(param.getAdminIds());
- }
- // 所属商家id
- if(StringUtils.isNotEmpty(param.getAdminId())){
- tCoinOrderParam.setAdminId(Long.parseLong(param.getAdminId()));
- }
- // 设备
- if(param.getClientIds() != null && param.getClientIds().size()>0){
- tCoinOrderParam.setClientId_inList(param.getClientIds());
- }
- boolQueryBuilder = esTCoinOrderService.getParam2QueryBuilder(tCoinOrderParam);
- }
- return boolQueryBuilder;
- }
- public ChartColumn getStatistics(StatisticsParam param) {
- String msg = "";
- String format = "yyyy-MM";
- DateHistogramInterval interval = DateHistogramInterval.DAY;
- if (ChartType.day.toString().equals(param.getChartType())) {
- interval = DateHistogramInterval.HOUR;
- format = "HH:00";
- msg = "日统计";
- }
- if (ChartType.week.toString().equals(param.getChartType())) {
- interval = DateHistogramInterval.DAY;
- format = "yyyy-MM-dd";
- msg = "周统计";
- }
- if (ChartType.month.toString().equals(param.getChartType())) {
- interval = DateHistogramInterval.DAY;
- format = "MM-dd";
- msg = "月统计";
- }
- if (ChartType.year.toString().equals(param.getChartType())) {
- interval = DateHistogramInterval.MONTH;
- format = "MM月";
- msg = "年统计";
- }
- ChartColumn chartColumn = getStatistics(param,format,interval);
- if (ChartType.week.toString().equals(param.getChartType())) {
- ArrayList<String> categories = new ArrayList<>();
- for (String day :chartColumn.getCategories()) {
- int week = DateUtils.getWeek(DateUtils.parseDate(day, DateUtils.PATTERN_yyyy_MM_dd,new Date()));
- String finalCategories = "周" + (week==0?"日":week);
- categories.add(finalCategories);
- }
- chartColumn.setCategories(categories);
- }
- return chartColumn;
- }
- public ChartColumn getStatistics(StatisticsParam param, String format, DateHistogramInterval interval) {
- try {
- // 仅为名称,用以获取返回结果
- String aggregationResultName = "aggregationResult";
- // 仅为名称,用以获取返回结果
- String aggSumName = "sales";
- // 仅为名称,用以获取返回结果
- String aggName_productNumber = "productNumber";
- // 仅为名称,用以获取返回结果
- String aggName_refundQuantity = "refundQuantity";
- // 仅为名称,用以获取返回结果
- String aggName_refundAmount = "refundAmount";
- //Bool查询
- BoolQueryBuilder queryBuilder = getStatisticsParam2QueryBuilder(param);
- // 公司平台
- String companyType = param.getCompanyType();
- if(StringUtils.isNotEmpty(companyType)) {
- if(companyType.equals("0")) {
- queryBuilder.must(
- QueryBuilders.boolQuery().should(
- QueryBuilders.termQuery("companyType", "0")
- ).should(
- QueryBuilders.boolQuery().mustNot(
- QueryBuilders.existsQuery("companyType")
- )
- )
- );
- } else {
- queryBuilder.must(QueryBuilders.termQuery("companyType","1"));
- }
- }
- // 设备类型
- String machineType = param.getMachineType();
- if(StringUtils.isNotEmpty(machineType)) {
- if(machineType.equals("0")) {
- queryBuilder.must(
- QueryBuilders.boolQuery().should(
- QueryBuilders.termQuery("machineType", "0")
- ).should(
- QueryBuilders.boolQuery().mustNot(
- QueryBuilders.existsQuery("machineType")
- )
- )
- );
- } else {
- queryBuilder.must(QueryBuilders.termQuery("machineType",machineType));
- }
- }
- // 时间聚合
- AggregationBuilder dateHistogram = AggregationBuilders.dateHistogram(aggregationResultName)
- .field("payDate")
- .calendarInterval(interval)
- .format(format);
- // 根据字段price 求和 sum, sales 仅为名称,用以获取返回结果
- // 订单金额
- SumAggregationBuilder sumAggregationBuilder = AggregationBuilders.sum(aggSumName).field("price");
- dateHistogram.subAggregation(sumAggregationBuilder);
- // 订单数量
- SumAggregationBuilder sumAggregationBuilder2 = AggregationBuilders.sum(aggName_productNumber).field("productNumber");
- dateHistogram.subAggregation(sumAggregationBuilder2);
- // 退款数量
- SumAggregationBuilder sumAggregationBuilder3 = AggregationBuilders.sum(aggName_refundQuantity).field("refundQuantity");
- dateHistogram.subAggregation(sumAggregationBuilder3);
- // 退款金额
- SumAggregationBuilder sumAggregationBuilder4 = AggregationBuilders.sum(aggName_refundAmount).field("refundAmount");
- dateHistogram.subAggregation(sumAggregationBuilder4);
- SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
- // 指定size为0,不返回文档 因为只需要数量
- sourceBuilder.query(queryBuilder).aggregation(dateHistogram).size(0);
- String tableName = getTableName();
- if(StringUtils.isNotEmpty(param.getIfForeign()) && param.getIfForeign().equals("1")) {
- tableName = esTCoinOrderService.getTableName();
- }
- SearchRequest searchRequest = new SearchRequest(tableName);
- searchRequest.source(sourceBuilder);
- log.debug("sourceBuilder:{}", sourceBuilder);
- SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
- Aggregation agg = searchResponse.getAggregations().get(aggregationResultName);
- List<? extends Histogram.Bucket> buckets = ((Histogram) agg).getBuckets();
- List<ChartBean> chartBeanList = new ArrayList<>();
- for (Histogram.Bucket bucket : buckets) {
- ChartBean chartBean = new ChartBean();
- chartBean.setCategorie(bucket.getKeyAsString());
- //统计商品数
- ParsedSum saleNum1 = bucket.getAggregations().get(aggName_productNumber);
- ParsedSum saleNum2 = bucket.getAggregations().get(aggName_refundQuantity);
- String saleNumStr1 = format1((float) saleNum1.getValue());
- String saleNumStr2 = format1((float) saleNum2.getValue());
- if(StringUtils.isEmpty(saleNumStr1)){
- saleNumStr1 = "1";
- }
- chartBean.setSaleNum(Float.valueOf(saleNumStr1) - Float.valueOf(saleNumStr2));
- //统计订单数
- chartBean.setOrderNum(Float.valueOf(bucket.getDocCount()));
- ParsedSum sum = bucket.getAggregations().get(aggSumName);
- ParsedSum refundSum = bucket.getAggregations().get(aggName_refundAmount);
- String s1 = format1((float) sum.getValue());
- String s2 = format1((float) refundSum.getValue());
- chartBean.setSalePrice(Float.valueOf(s1) - Float.valueOf(s2));
- chartBeanList.add(chartBean);
- }
- ChartColumn chartColumn = new ChartColumn(chartBeanList);
- return chartColumn;
- } catch (Exception e) {
- log.error(getTableName() + " es根据条件 聚合查询 报错:{}", e);
- throw new MyException(getTableName() + " es根据条件 聚合查询 报错:" + e.getMessage());
- }
- }
- public static String format1(Float value){
- BigDecimal bd = new BigDecimal(value);//创建一个bd对象,将要转换的值value传入构造函数
- bd = bd.setScale(2, RoundingMode.HALF_UP);//调用setScale方法进行数据格式化,保留两位小数,采用四舍五入规则
- return bd.toString(); //返回bd对象的值(转化为string形式)
- }
- public void tongbuByHour() throws ParseException {
- String startDate = null;
- String endDate = null;
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- Calendar calendar = Calendar.getInstance();
- calendar.setTime(new Date());
- endDate = sdf.format(calendar.getTime());
- calendar.add(Calendar.HOUR_OF_DAY, -1);
- // calendar.add(calendar.DATE, -4);
- startDate = sdf.format(calendar.getTime());
- Date start = sdf.parse(startDate);
- Date end = sdf.parse(endDate);
- // 查询es 现有的数据
- //1,同步已支付的订单
- int num = 0;
- while (true) {
- int limit = MAX_ROW;
- int offset= num * MAX_ROW;
- LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
- query.gt(TOrder::getCreateDate,start);
- query.lt(TOrder::getCreateDate,end);
- query.eq(TOrder::getStatus,1);
- Page<TOrder> page = new Page<>(offset, limit, true);
- IPage<TOrder> iPage = tOrderService.page(page, query);
- List<TOrder> list_es = iPage.getRecords();
- if(list_es.size()>0){
- insertBatch(list_es);
- for(TOrder order:list_es){
- try {
- updateDataById(order);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- num++;
- if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
- break;
- }
- }
- //2,同步已退款的订单
- int num2 = 0;
- while (true) {
- int limit = MAX_ROW;
- int offset= num2 * MAX_ROW;
- LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
- query.gt(TOrder::getCreateDate,start);
- query.lt(TOrder::getCreateDate,end);
- query.eq(TOrder::getStatus,3);
- Page<TOrder> page = new Page<>(offset, limit, true);
- IPage<TOrder> iPage = tOrderService.page(page, query);
- List<TOrder> list_es = iPage.getRecords();
- if(list_es.size()>0){
- insertBatch(list_es);
- for(TOrder order:list_es){
- try {
- updateDataById(order);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- num2++;
- if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
- break;
- }
- }
- // 查询es 现有的数据
- // LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
- // query.gt(TOrder::getCreateDate,start);
- // query.lt(TOrder::getCreateDate,end);
- // List<TOrder> list_es = tOrderService.list(query);
- // if (CollectionUtils.isNotEmpty(list_es)) {
- // insertBatch(list_es);
- // if(list_es.size()>0){
- // for(TOrder order : list_es){
- // try {
- // updateDataById(order);
- // } catch (Exception e) {
- // e.printStackTrace();
- // }
- // }
- // }
- // }
- }
- public void tongbuByDay() throws ParseException {
- String startDate = null;
- String endDate = null;
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- Calendar calendar = Calendar.getInstance();
- calendar.setTime(new Date());
- endDate = sdf.format(calendar.getTime());
- calendar.add(calendar.DATE, -1);
- // calendar.add(calendar.DATE, -4);
- startDate = sdf.format(calendar.getTime());
- Date start = sdf.parse(startDate);
- Date end = sdf.parse(endDate);
- // 查询es 现有的数据
- //1,同步已支付的订单
- int num = 0;
- while (true) {
- int limit = MAX_ROW;
- int offset= num * MAX_ROW;
- LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
- query.gt(TOrder::getCreateDate,start);
- query.lt(TOrder::getCreateDate,end);
- query.eq(TOrder::getStatus,1);
- Page<TOrder> page = new Page<>(offset, limit, true);
- IPage<TOrder> iPage = tOrderService.page(page, query);
- List<TOrder> list_es = iPage.getRecords();
- if(list_es.size()>0){
- insertBatch(list_es);
- for(TOrder order:list_es){
- try {
- updateDataById(order);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- num++;
- if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
- break;
- }
- }
- //2,同步已退款的订单
- int num2 = 0;
- while (true) {
- int limit = MAX_ROW;
- int offset= num2 * MAX_ROW;
- LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
- query.gt(TOrder::getCreateDate,start);
- query.lt(TOrder::getCreateDate,end);
- query.eq(TOrder::getStatus,3);
- Page<TOrder> page = new Page<>(offset, limit, true);
- IPage<TOrder> iPage = tOrderService.page(page, query);
- List<TOrder> list_es = iPage.getRecords();
- if(list_es.size()>0){
- insertBatch(list_es);
- for(TOrder order:list_es){
- try {
- updateDataById(order);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- num2++;
- if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
- break;
- }
- }
- // 查询es 现有的数据
- // LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
- // query.gt(TOrder::getCreateDate,start);
- // query.lt(TOrder::getCreateDate,end);
- // List<TOrder> list_es = tOrderService.list(query);
- // if (CollectionUtils.isNotEmpty(list_es)) {
- // insertBatch(list_es);
- // if(list_es.size()>0){
- // for(TOrder order : list_es){
- // try {
- // updateDataById(order);
- // } catch (Exception e) {
- // e.printStackTrace();
- // }
- // }
- // }
- // }
- }
- /**
- * 同步汇总统计到数据库
- * @param param 参数
- * @return
- */
- public Page<OrderDaoChuDTO> getSummaryStatistics(StatisticsParam param) {
- Page<OrderDaoChuDTO> page = new Page<>(param.getCurrent(), param.getSize(), true);
- String statisticsDate;
- if (StringUtils.isNotEmpty(param.getDateType()) && param.getDateType().equals("1")) {
- // 按年统计
- statisticsDate = param.getStartDate().substring(0, 4);
- } else {
- statisticsDate = param.getStartDate().substring(0, 7);
- }
- try {
- // 仅为名称,用以获取返回结果
- String aggregationResultName = "aggregationResult";
- // 按商户或者按设备分组查询
- String field = "adminId";
- // 仅为名称,用以获取返回结果
- String aggNameSalePrice = "salePrice";
- // 仅为名称,用以获取返回结果
- String aggNameTotal = "total";
- // 仅为名称,用以获取返回结果
- String aggNameSaleNum = "saleNum";
- // 创建布尔查询
- BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
- .must(QueryBuilders.rangeQuery("payDate")
- .from(param.getStartDate())
- .to(param.getEndDate())
- .format("yyyy-MM-dd HH:mm:ss"))
- .must(QueryBuilders.termQuery("status", 1));
- if(StringUtils.isNotEmpty(param.getPacketType()) && param.getPacketType().equals("1")) {
- // 按设备分组
- field = "clientId.keyword";
- }
- int size = param.getCurrent() * param.getSize();
- // 创建聚合:按 clientId 进行分组
- TermsAggregationBuilder aggregation = AggregationBuilders
- .terms(aggregationResultName)
- .field(field)
- .size(size)
- .order(BucketOrder.aggregation(aggNameSalePrice,false))
- .subAggregation(AggregationBuilders.sum(aggNameSalePrice).field("price"))
- .subAggregation(AggregationBuilders.sum(aggNameSaleNum).field("productNumber"));
- // 创建总数聚合
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
- .query(boolQuery)
- .aggregation(aggregation)
- .size(0);
- // cardinality 聚合
- searchSourceBuilder.aggregation(AggregationBuilders.cardinality(aggNameTotal).field(field));
- // 获取表名
- String tableName = getTableName();
- if(StringUtils.isNotEmpty(param.getIfForeign()) && param.getIfForeign().equals("1")) {
- tableName = esTCoinOrderService.getTableName();
- }
- // 创建请求
- SearchRequest searchRequest = new SearchRequest(tableName).source(searchSourceBuilder);
- log.debug("sourceBuilder:{}", searchSourceBuilder);
- // 执行查询
- SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
- // 解析结果
- Aggregations aggregations = searchResponse.getAggregations();
- Terms aggregationResult = aggregations.get(aggregationResultName);
- // 修改集合
- List<OrderStatisticsYear> updateYearList = new ArrayList<>();
- List<OrderStatisticsMonth> updateMonthList = new ArrayList<>();
- // 插入集合
- List<OrderStatisticsYear> insertYearList = new ArrayList<>();
- List<OrderStatisticsMonth> insertMonthList = new ArrayList<>();
- for (Terms.Bucket bucket : aggregationResult.getBuckets()) {
- String key = bucket.getKeyAsString();
- Sum salesSum = bucket.getAggregations().get(aggNameSalePrice);
- double priceTotal = salesSum.getValue();
- long docCount = bucket.getDocCount();
- Sum saleNumSum = bucket.getAggregations().get(aggNameSaleNum);
- int saleNum = (int)saleNumSum.getValue();
- if (priceTotal == 0) {
- continue;
- }
- if (StringUtils.isNotEmpty(param.getDateType()) && param.getDateType().equals("1")) {
- // 按年统计
- OrderStatisticsYear orderStatistics = new OrderStatisticsYear();
- orderStatistics.setId(RandomUtil.randomNumbers(32));
- orderStatistics.setIfForeign(param.getIfForeign());
- orderStatistics.setType(param.getPacketType());
- if (StringUtils.isNotEmpty(param.getPacketType()) && param.getPacketType().equals("1")) {
- TEquipment equipment = R.getDataIfSuccess(szwlFeign.findEquipmentByClientId(key));
- if (equipment == null) {
- continue;
- }
- // TAdmin admin = R.getDataIfSuccess(szwlFeign.getAdmin(String.valueOf(equipment.getAdminId())));
- // if (admin == null) {
- // continue;
- // }
- Long adminId = equipment.getAdminId();
- String companyType = R.getDataIfSuccess(szwlFeign.getCompanyTypeById(String.valueOf(equipment.getAdminId())));
- if (StringUtils.isEmpty(companyType)) {
- continue;
- }
- // 先查询是否已有数据
- LambdaQueryWrapper<OrderStatisticsYear> query = Wrappers.lambdaQuery();
- query.eq(OrderStatisticsYear::getAdminId, equipment.getAdminId());
- query.eq(OrderStatisticsYear::getEquipmentId, equipment.getId());
- query.eq(OrderStatisticsYear::getStatisticsDate, statisticsDate);
- query.eq(OrderStatisticsYear::getType, param.getPacketType());
- query.eq(OrderStatisticsYear::getIfForeign, param.getIfForeign());
- OrderStatisticsYear statisticsYear = orderStatisticsYearService.getOne(query);
- if(statisticsYear != null){
- // 金额是否有变化,有就更新,没有就不更新
- BigDecimal newSalePrice = new BigDecimal(priceTotal).setScale(2, RoundingMode.HALF_UP);
- if (statisticsYear.getSalePrice().equals(newSalePrice)) {
- continue;
- }
- // 更新数据
- statisticsYear.setSaleNum(saleNum);
- statisticsYear.setSalePrice(newSalePrice);
- statisticsYear.setOrderNum(Long.valueOf(docCount).intValue());
- updateYearList.add(statisticsYear);
- continue;
- }
- // 没有的话就插入数据
- orderStatistics.setSalePrice(new BigDecimal(priceTotal).setScale(2, RoundingMode.HALF_UP));
- orderStatistics.setAdminId(adminId);
- orderStatistics.setEquipmentId(equipment.getId());
- orderStatistics.setClientId(key);
- orderStatistics.setSaleNum(saleNum);
- orderStatistics.setOrderNum(Long.valueOf(docCount).intValue());
- orderStatistics.setCompanyType(companyType);
- orderStatistics.setStatisticsDate(statisticsDate);
- insertYearList.add(orderStatistics);
- } else {
- // 按商户分组
- // TAdmin admin = R.getDataIfSuccess(szwlFeign.getAdmin(key));
- // if (admin == null) {
- // continue;
- // }
- String companyType = R.getDataIfSuccess(szwlFeign.getCompanyTypeById(key));
- if (StringUtils.isEmpty(companyType)) {
- continue;
- }
- // 先查询是否已有数据
- LambdaQueryWrapper<OrderStatisticsYear> query = Wrappers.lambdaQuery();
- query.eq(OrderStatisticsYear::getAdminId, key);
- query.eq(OrderStatisticsYear::getStatisticsDate, statisticsDate);
- query.eq(OrderStatisticsYear::getType, param.getPacketType());
- query.eq(OrderStatisticsYear::getIfForeign, param.getIfForeign());
- OrderStatisticsYear statisticsYear = orderStatisticsYearService.getOne(query);
- if(statisticsYear != null){
- // 金额是否有变化,有就更新,没有就不更新
- BigDecimal newSalePrice = new BigDecimal(priceTotal).setScale(2, RoundingMode.HALF_UP);
- if (statisticsYear.getSalePrice().equals(newSalePrice)) {
- continue;
- }
- // 更新数据
- statisticsYear.setSaleNum(saleNum);
- statisticsYear.setSalePrice(newSalePrice);
- statisticsYear.setOrderNum(Long.valueOf(docCount).intValue());
- updateYearList.add(statisticsYear);
- continue;
- }
- orderStatistics.setSalePrice(new BigDecimal(priceTotal).setScale(2, RoundingMode.HALF_UP));
- orderStatistics.setAdminId(Long.valueOf(key));
- orderStatistics.setSaleNum(saleNum);
- orderStatistics.setOrderNum(Long.valueOf(docCount).intValue());
- orderStatistics.setCompanyType(companyType);
- orderStatistics.setStatisticsDate(statisticsDate);
- insertYearList.add(orderStatistics);
- }
- } else {
- OrderStatisticsMonth orderStatistics = new OrderStatisticsMonth();
- orderStatistics.setId(RandomUtil.randomNumbers(32));
- orderStatistics.setIfForeign(param.getIfForeign());
- orderStatistics.setType(param.getPacketType());
- if (StringUtils.isNotEmpty(param.getPacketType()) && param.getPacketType().equals("1")) {
- TEquipment equipment = R.getDataIfSuccess(szwlFeign.findEquipmentByClientId(key));
- if (equipment == null) {
- continue;
- }
- // TAdmin admin = R.getDataIfSuccess(szwlFeign.getAdmin(String.valueOf(equipment.getAdminId())));
- // if (admin == null) {
- // continue;
- // }
- Long adminId = equipment.getAdminId();
- String companyType = R.getDataIfSuccess(szwlFeign.getCompanyTypeById(String.valueOf(adminId)));
- if (StringUtils.isEmpty(companyType)) {
- continue;
- }
- // 先查询是否已有数据
- LambdaQueryWrapper<OrderStatisticsMonth> query = Wrappers.lambdaQuery();
- query.eq(OrderStatisticsMonth::getAdminId, equipment.getAdminId());
- query.eq(OrderStatisticsMonth::getEquipmentId, equipment.getId());
- query.eq(OrderStatisticsMonth::getStatisticsDate, statisticsDate);
- query.eq(OrderStatisticsMonth::getType, param.getPacketType());
- query.eq(OrderStatisticsMonth::getIfForeign, param.getIfForeign());
- OrderStatisticsMonth statisticsMonth = orderStatisticsMonthService.getOne(query);
- if(statisticsMonth != null){
- // 金额是否有变化,有就更新,没有就不更新
- BigDecimal newSalePrice = new BigDecimal(priceTotal).setScale(2, RoundingMode.HALF_UP);
- if (statisticsMonth.getSalePrice().equals(newSalePrice)) {
- continue;
- }
- // 更新数据
- statisticsMonth.setSaleNum(saleNum);
- statisticsMonth.setSalePrice(newSalePrice);
- statisticsMonth.setOrderNum(Long.valueOf(docCount).intValue());
- updateMonthList.add(statisticsMonth);
- continue;
- }
- orderStatistics.setSalePrice(new BigDecimal(priceTotal).setScale(2, RoundingMode.HALF_UP));
- orderStatistics.setAdminId(adminId);
- orderStatistics.setEquipmentId(equipment.getId());
- orderStatistics.setClientId(key);
- orderStatistics.setSaleNum(saleNum);
- orderStatistics.setOrderNum(Long.valueOf(docCount).intValue());
- orderStatistics.setCompanyType(companyType);
- orderStatistics.setStatisticsDate(statisticsDate);
- insertMonthList.add(orderStatistics);
- } else {
- // 按商户分组
- // TAdmin admin = R.getDataIfSuccess(szwlFeign.getAdmin(key));
- // if (admin == null) {
- // continue;
- // }
- String companyType = R.getDataIfSuccess(szwlFeign.getCompanyTypeById(key));
- if (StringUtils.isEmpty(companyType)) {
- continue;
- }
- // 先查询是否已有数据
- LambdaQueryWrapper<OrderStatisticsMonth> query = Wrappers.lambdaQuery();
- query.eq(OrderStatisticsMonth::getAdminId, key);
- query.eq(OrderStatisticsMonth::getStatisticsDate, statisticsDate);
- query.eq(OrderStatisticsMonth::getType, param.getPacketType());
- query.eq(OrderStatisticsMonth::getIfForeign, param.getIfForeign());
- OrderStatisticsMonth statisticsMonth = orderStatisticsMonthService.getOne(query);
- if(statisticsMonth != null){
- // 金额是否有变化,有就更新,没有就不更新
- BigDecimal newSalePrice = new BigDecimal(priceTotal).setScale(2, RoundingMode.HALF_UP);
- if (statisticsMonth.getSalePrice().equals(newSalePrice)) {
- continue;
- }
- // 更新数据
- statisticsMonth.setSaleNum(saleNum);
- statisticsMonth.setSalePrice(newSalePrice);
- statisticsMonth.setOrderNum(Long.valueOf(docCount).intValue());
- updateMonthList.add(statisticsMonth);
- continue;
- }
- orderStatistics.setSalePrice(new BigDecimal(priceTotal).setScale(2, RoundingMode.HALF_UP));
- orderStatistics.setAdminId(Long.valueOf(key));
- orderStatistics.setSaleNum(saleNum);
- orderStatistics.setOrderNum(Long.valueOf(docCount).intValue());
- orderStatistics.setCompanyType(companyType);
- orderStatistics.setStatisticsDate(statisticsDate);
- insertMonthList.add(orderStatistics);
- }
- }
- }
- // 如果修改集合不为空,就批量修改
- if (!updateYearList.isEmpty()) {
- orderStatisticsYearService.updateBatchById(updateYearList);
- }
- if (!updateMonthList.isEmpty()) {
- orderStatisticsMonthService.updateBatchById(updateMonthList);
- }
- // 如果插入集合不为空,就批量插入
- if (!insertYearList.isEmpty()) {
- orderStatisticsYearService.saveBatch(insertYearList);
- }
- // 如果插入集合不为空,就批量插入
- if (!insertMonthList.isEmpty()) {
- orderStatisticsMonthService.saveBatch(insertMonthList);
- }
- return page;
- } catch (Exception e) {
- log.error(getTableName() + " es根据条件 聚合查询 报错:{}", e);
- throw new MyException(getTableName() + " es根据条件 聚合查询 报错:" + e.getMessage());
- }
- }
- }
|