瀏覽代碼

数据同步优化

litianbiao 2 年之前
父節點
當前提交
3c254ec8ac

+ 165 - 0
src/main/java/com/szwl/controller/EsController.java

@@ -0,0 +1,165 @@
+package com.szwl.controller;
+
+import cn.com.crbank.ommo.bean.ResultMessage;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.szwl.model.entity.TCoinOrder;
+import com.szwl.model.entity.TOrder;
+import com.szwl.model.query.TCoinOrderParam;
+import com.szwl.model.query.TOrderParam;
+import com.szwl.service.TCoinOrderService;
+import com.szwl.service.TOrderService;
+import com.szwl.service.es.EsTCoinOrderService;
+import com.szwl.service.es.EsTEquipmentService;
+import com.szwl.service.es.EsTOrderService;
+import io.swagger.annotations.ApiOperation;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.List;
+
+@Slf4j
+@RestController
+@RequestMapping("EsApi")
+public class EsController {
+    @Autowired
+    EsTOrderService esTOrderService;
+    @Autowired
+    EsTEquipmentService esTEquipmentService;
+    @Autowired
+    EsTCoinOrderService esTCoinOrderService;
+    @Autowired
+    TCoinOrderService tCoinOrderService;
+    @Autowired
+    TOrderService orderService;
+    @GetMapping("/saveEsTCoinOrder")
+    @ApiOperation("同步CoinOrder表数据")
+    public ResponseEntity<?> saveEsTCoinOrder(TCoinOrderParam param) {
+        try{
+            // 初始化旧流水
+            int num = 0;
+            while (true) {
+                int limit =1000;
+                int offset= num * 1000;
+                LambdaQueryWrapper<TCoinOrder> query = Wrappers.lambdaQuery();
+                query.gt(TCoinOrder::getCreateDate,param.getCreateDate_start());
+                query.lt(TCoinOrder::getCreateDate,param.getCreateDate_end());
+//            List<TCoinOrder> list_es = tCoinOrderService.list(query);
+                Page<TCoinOrder> page = new Page<>(offset, limit, true);
+                IPage<TCoinOrder> iPage = tCoinOrderService.page(page, query);
+                List<TCoinOrder> list_es = iPage.getRecords();
+                if(list_es.size()>0){
+                    esTCoinOrderService.insertBatch(list_es);
+                    for(TCoinOrder order:list_es){
+                        try {
+                            esTCoinOrderService.updateDataById(order);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+                num++;
+                if(list_es.size()< 1000){ // 数据小于 最大值 ,证明后面已无数据,则跳出
+                    break;
+                }
+            }
+//            TCoinOrderExample example = new TCoinOrderExample();
+//            TCoinOrderExample.Criteria criteria = example.createCriteria();
+//            criteria.andModifyDateGreaterThanOrEqualTo(param.getCreateDate_start());
+//            criteria.andModifyDateLessThanOrEqualTo(param.getCreateDate_end());
+//            List<TCoinOrder> list = coinOrderService.selectByOption(example);
+//            if(list.size()>0){
+//                for(TCoinOrder coinOrder : list){
+//                    esTCoinOrderService.updateDataById(coinOrder);
+//                }
+//            }
+        }catch (Exception e){
+            log.error("ElasticsearchRunner InitEsTOrderThread 发生错误:{}" , e);
+            throw new cn.com.crbank.ommo.exception.MyException("ElasticsearchRunner InitEsTOrderThread 发生错误:" + e.getMessage());
+        }
+        return ResponseEntity
+                .status(HttpStatus.OK)
+                .body(new ResultMessage()
+                        .setCode(true)
+                        .setMessage("同步es设备数据成功"));
+
+    }
+
+    @GetMapping("/saveEsTOrder")
+    @ApiOperation("同步Order表数据")
+    public ResponseEntity<?> saveEsTOrder(TOrderParam param) {
+        try{
+            // 初始化旧流水
+            // 查询es 现有的数据
+            //1,同步已支付的订单
+            int num = 0;
+            while (true) {
+                int limit = 1000;
+                int offset= num * 1000;
+                LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
+                query.gt(TOrder::getCreateDate,param.getCreateDate_start());
+                query.lt(TOrder::getCreateDate,param.getCreateDate_end());
+                query.eq(TOrder::getStatus,1);
+                Page<TOrder> page = new Page<>(offset, limit, true);
+                IPage<TOrder> iPage = orderService.page(page, query);
+                List<TOrder> list_es = iPage.getRecords();
+                if(list_es.size()>0){
+                    esTOrderService.insertBatch(list_es);
+                    for(TOrder order:list_es){
+                        try {
+                            esTOrderService.updateDataById(order);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+                num++;
+                if(list_es.size()< 1000){ // 数据小于 最大值 ,证明后面已无数据,则跳出
+                    break;
+                }
+            }
+            //2,同步已退款的订单
+            int num2 = 0;
+            while (true) {
+                int limit = 1000;
+                int offset= num2 * 1000;
+                LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
+                query.gt(TOrder::getCreateDate,param.getCreateDate_start());
+                query.lt(TOrder::getCreateDate,param.getCreateDate_end());
+                query.eq(TOrder::getStatus,3);
+                Page<TOrder> page = new Page<>(offset, limit, true);
+                IPage<TOrder> iPage = orderService.page(page, query);
+                List<TOrder> list_es = iPage.getRecords();
+                if(list_es.size()>0){
+                    esTOrderService.insertBatch(list_es);
+                    for(TOrder order:list_es){
+                        try {
+                            esTOrderService.updateDataById(order);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+                num2++;
+                if(list_es.size()< 1000){ // 数据小于 最大值 ,证明后面已无数据,则跳出
+                    break;
+                }
+            }
+        }catch (Exception e){
+            log.error("ElasticsearchRunner InitEsTOrderThread 发生错误:{}" , e);
+            throw new cn.com.crbank.ommo.exception.MyException("ElasticsearchRunner InitEsTOrderThread 发生错误:" + e.getMessage());
+        }
+        return ResponseEntity
+                .status(HttpStatus.OK)
+                .body(new ResultMessage()
+                        .setCode(true)
+                        .setMessage("同步es设备数据成功"));
+    }
+}

+ 4 - 2
src/main/java/com/szwl/controller/ScheduledService.java

@@ -70,9 +70,11 @@ public class ScheduledService {
 
     }
 
-//    在每小时的29分执行一次 es同步数据
-    @Scheduled(cron = "0 29 * * * ?")
+//    在每小时的26分执行一次 es同步数据
+    @Scheduled(cron = "0  * * * ?")
     public void tongbuEs() throws ParseException {
+//        esTOrderService.tongbuByHour();
+//        esTCoinOrderService.tongbuByHour();
         int i = 0;
         try {
             String hostAddress = InetAddress.getLocalHost().getHostAddress();

+ 36 - 16
src/main/java/com/szwl/service/es/EsTCoinOrderService.java

@@ -28,7 +28,7 @@ import java.util.List;
 @Service
 public class EsTCoinOrderService extends EsBaseService<TCoinOrder, TCoinOrderParam> {
     private static boolean InitEsTableStatus = false; // 是否正常完成 es 初始化
-    public static final int MAX_ROW = 10000;
+    public static final int MAX_ROW = 1000;
     @Autowired
     TCoinOrderService tCoinOrderService;
 
@@ -164,28 +164,48 @@ public class EsTCoinOrderService extends EsBaseService<TCoinOrder, TCoinOrderPar
         Date start = sdf.parse(startDate);
         Date end = sdf.parse(endDate);
         // 查询es 现有的数据
-//        TCoinOrderExample example = new TCoinOrderExample();
-//        TCoinOrderExample.Criteria criteria = example.createCriteria();
-//        criteria.andCreateDateGreaterThanOrEqualTo(start);
-//        criteria.andCreateDateLessThanOrEqualTo(end);
-//        criteria.andEsEqualTo("1");
-//        List<TCoinOrder> list_es = tCoinOrderService.selectByOption(example);
-        LambdaQueryWrapper<TCoinOrder> query = Wrappers.lambdaQuery();
-        query.gt(TCoinOrder::getCreateDate,start);
-        query.lt(TCoinOrder::getCreateDate,end);
-        List<TCoinOrder> list_es = tCoinOrderService.list(query);
-        if (CollectionUtils.isNotEmpty(list_es)) {
-//            insertBatch(list_es);
+        int num = 0;
+        while (true) {
+            int limit = MAX_ROW;
+            int offset= num * MAX_ROW;
+            LambdaQueryWrapper<TCoinOrder> query = Wrappers.lambdaQuery();
+            query.gt(TCoinOrder::getCreateDate,start);
+            query.lt(TCoinOrder::getCreateDate,end);
+//            List<TCoinOrder> list_es = tCoinOrderService.list(query);
+            Page<TCoinOrder> page = new Page<>(offset, limit, true);
+            IPage<TCoinOrder> iPage = tCoinOrderService.page(page, query);
+            List<TCoinOrder> list_es = iPage.getRecords();
             if(list_es.size()>0){
-                for(TCoinOrder coinOrder : list_es){
+                insertBatch(list_es);
+                for(TCoinOrder order:list_es){
                     try {
-                        updateDataById(coinOrder);
+                        updateDataById(order);
                     } catch (Exception e) {
                         e.printStackTrace();
                     }
                 }
             }
-
+            num++;
+            if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
+                break;
+            }
         }
+//        LambdaQueryWrapper<TCoinOrder> query = Wrappers.lambdaQuery();
+//        query.gt(TCoinOrder::getCreateDate,start);
+//        query.lt(TCoinOrder::getCreateDate,end);
+//        List<TCoinOrder> list_es = tCoinOrderService.list(query);
+//        if (CollectionUtils.isNotEmpty(list_es)) {
+////            insertBatch(list_es);
+//            if(list_es.size()>0){
+//                for(TCoinOrder coinOrder : list_es){
+//                    try {
+//                        updateDataById(coinOrder);
+//                    } catch (Exception e) {
+//                        e.printStackTrace();
+//                    }
+//                }
+//            }
+//
+//        }
     }
 }

+ 66 - 14
src/main/java/com/szwl/service/es/EsTOrderService.java

@@ -36,6 +36,8 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.math.BigDecimal;
 import java.math.RoundingMode;
 import java.text.ParseException;
@@ -44,13 +46,14 @@ import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 @Slf4j
 @Service
 public class EsTOrderService extends EsBaseService<TOrder, TOrderParam> {
     private static boolean InitEsTableStatus = false; // 是否正常完成 es 初始化
-    public static final int MAX_ROW = 10000;
+    public static final int MAX_ROW = 1000;
     @Autowired
     TOrderService tOrderService;
     @Autowired
@@ -351,20 +354,48 @@ public class EsTOrderService extends EsBaseService<TOrder, TOrderParam> {
         Date start = sdf.parse(startDate);
         Date end = sdf.parse(endDate);
         // 查询es 现有的数据
-//        TOrderExample example = new TOrderExample();
-//        TOrderExample.Criteria criteria = example.createCriteria();
-//        criteria.andCreateDateGreaterThanOrEqualTo(start);
-//        criteria.andCreateDateLessThanOrEqualTo(end);
-//        criteria.andEsEqualTo("1");
-//        List<TOrder> list_es = tOrderService.selectByOption(example);
-        LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
-        query.gt(TOrder::getCreateDate,start);
-        query.lt(TOrder::getCreateDate,end);
-        List<TOrder> list_es = tOrderService.list(query);
-        if (CollectionUtils.isNotEmpty(list_es)) {
-            insertBatch(list_es);
+        //1,同步已支付的订单
+        int num = 0;
+        while (true) {
+            int limit = MAX_ROW;
+            int offset= num * MAX_ROW;
+            LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
+            query.gt(TOrder::getCreateDate,start);
+            query.lt(TOrder::getCreateDate,end);
+            query.eq(TOrder::getStatus,1);
+            Page<TOrder> page = new Page<>(offset, limit, true);
+            IPage<TOrder> iPage = tOrderService.page(page, query);
+            List<TOrder> list_es = iPage.getRecords();
+            if(list_es.size()>0){
+                insertBatch(list_es);
+                for(TOrder order:list_es){
+                    try {
+                        updateDataById(order);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+            num++;
+            if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
+                break;
+            }
+        }
+        //2,同步已退款的订单
+        int num2 = 0;
+        while (true) {
+            int limit = MAX_ROW;
+            int offset= num2 * MAX_ROW;
+            LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
+            query.gt(TOrder::getCreateDate,start);
+            query.lt(TOrder::getCreateDate,end);
+            query.eq(TOrder::getStatus,3);
+            Page<TOrder> page = new Page<>(offset, limit, true);
+            IPage<TOrder> iPage = tOrderService.page(page, query);
+            List<TOrder> list_es = iPage.getRecords();
             if(list_es.size()>0){
-                for(TOrder order : list_es){
+                insertBatch(list_es);
+                for(TOrder order:list_es){
                     try {
                         updateDataById(order);
                     } catch (Exception e) {
@@ -372,6 +403,27 @@ public class EsTOrderService extends EsBaseService<TOrder, TOrderParam> {
                     }
                 }
             }
+            num2++;
+            if(list_es.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
+                break;
+            }
         }
+        // 查询es 现有的数据
+//        LambdaQueryWrapper<TOrder> query = Wrappers.lambdaQuery();
+//        query.gt(TOrder::getCreateDate,start);
+//        query.lt(TOrder::getCreateDate,end);
+//        List<TOrder> list_es = tOrderService.list(query);
+//        if (CollectionUtils.isNotEmpty(list_es)) {
+//            insertBatch(list_es);
+//            if(list_es.size()>0){
+//                for(TOrder order : list_es){
+//                    try {
+//                        updateDataById(order);
+//                    } catch (Exception e) {
+//                        e.printStackTrace();
+//                    }
+//                }
+//            }
+//        }
     }
 }