123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- package com.szwl.service.es;
- import cn.com.crbank.ommo.esclient.EsBaseService;
- import cn.com.crbank.ommo.exception.MyException;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.baomidou.mybatisplus.core.metadata.IPage;
- import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
- import com.baomidou.mybatisplus.core.toolkit.Wrappers;
- import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
- import com.szwl.model.entity.TCoinOrder;
- import com.szwl.model.query.TCoinOrderParam;
- import com.szwl.model.utils.DateUtils;
- import com.szwl.service.TCoinOrderService;
- import lombok.extern.slf4j.Slf4j;
- //import org.apache.commons.collections4.CollectionUtils;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.indices.GetIndexRequest;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import java.text.ParseException;
- import java.text.SimpleDateFormat;
- import java.util.Calendar;
- import java.util.Date;
- import java.util.List;
- @Slf4j
- @Service
- public class EsTCoinOrderService extends EsBaseService<TCoinOrder, TCoinOrderParam> {
- private static boolean InitEsTableStatus = false; // 是否正常完成 es 初始化
- public static final int MAX_ROW = 10000;
- @Autowired
- TCoinOrderService tCoinOrderService;
- @Override
- public String getTableName() {
- return "es_t_coin_order";
- }
- @Override
- public TCoinOrder getInstanceOfEntity() {
- return new TCoinOrder();
- }
- @Override
- public void setInitTableStatus(boolean flag) {
- InitEsTableStatus = flag;
- }
- @Override
- public boolean getInitTableStatus() {
- return InitEsTableStatus;
- }
- @Override
- public String getEntityPrimaryKey(TCoinOrder tCoinOrder) {
- return String.valueOf(tCoinOrder.getId());
- }
- @Override
- public TCoinOrder setEntityPrimaryKey(TCoinOrder tCoinOrder, String value) {
- return null;
- }
- // @Override
- // public TCoinOrder setEntityPrimaryKey(TCoinOrder tCoinOrder, String value) {
- // return tCoinOrder.setId(Long.parseLong(value));
- // }
- @Override
- public void initTableFun() {
- try{
- String tableName = getTableName();
- GetIndexRequest request = new GetIndexRequest(tableName);
- // DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(this.getTableName());
- // restHighLevelClient.indices().delete(deleteIndexRequest,RequestOptions.DEFAULT);
- boolean isExists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
- if (!isExists) {
- log.info("es 索引 开始创建"+tableName);
- createTable();
- // 初始化旧流水
- int num = 0;
- while (true) {
- int limit = MAX_ROW;
- // int offset= num * MAX_ROW;
- int offset= num;
- // TCoinOrderExample example = new TCoinOrderExample();
- // example.setLimit(limit);
- // example.setOffset(offset);
- // List<TCoinOrder> list = tCoinOrderService.selectByOption(example);
- LambdaQueryWrapper<TCoinOrder> query = Wrappers.lambdaQuery();
- SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- String time = "2022-06-01 00:00:00";
- Date date = ft.parse(time);
- query.gt(TCoinOrder::getCreateDate,date);
- Page<TCoinOrder> page = new Page<>(offset, limit, true);
- IPage<TCoinOrder> iPage = tCoinOrderService.page(page, query);
- List<TCoinOrder> list = iPage.getRecords();
- insertBatch(list);
- num++;
- if(list.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出
- break;
- }
- }
- } else {
- log.info("es 索引 "+tableName+" 已存在不再创建");
- }
- InitEsTableStatus = true;
- }catch (Exception e){
- log.error("ElasticsearchRunner InitEsTCoinOrderThread 发生错误:{}" , e);
- throw new MyException("ElasticsearchRunner InitEsTCoinOrderThread 发生错误:" + e.getMessage());
- }
- }
- /**
- * 根据时间 重新同步es
- * @param
- * @return
- */
- public void updateEsByDate(String startTime, String endTime) {
- try {
- startTime = startTime.replace("/", "-");
- endTime = endTime.replace("/", "-");
- Date start = DateUtils.parseDate(startTime, DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss, new Date());
- Date end = DateUtils.parseDate(endTime, DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss, new Date());
- // 查询es 现有的数据,删除
- TCoinOrderParam param = new TCoinOrderParam();
- param.setCreateDate_start(start);
- param.setCreateDate_end(end);
- List<TCoinOrder> list_es = this.selectEntityByEqualToOption(param);
- for (TCoinOrder entity : list_es) {
- this.deleteTableById(String.valueOf(entity.getId()));
- }
- // 插入 新的
- // List<TCoinOrder> list = tCoinOrderService.selectForEs(param);
- LambdaQueryWrapper<TCoinOrder> query = Wrappers.lambdaQuery();
- query.gt(TCoinOrder::getCreateDate,start);
- query.lt(TCoinOrder::getCreateDate,end);
- List<TCoinOrder> list = tCoinOrderService.list(query);
- if (CollectionUtils.isNotEmpty(list)) {
- insertBatch(list);
- }
- } catch (Exception e) {
- log.error(this.getTableName()+" updateEsByDate 发生错误:{}", e);
- throw new MyException(this.getTableName()+" updateEsByDate 发生错误:" + e.getMessage());
- }
- }
- public void tongbuByHour() throws ParseException {
- String startDate = null;
- String endDate = null;
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- Calendar calendar = Calendar.getInstance();
- calendar.setTime(new Date());
- endDate = sdf.format(calendar.getTime());
- calendar.add(calendar.HOUR_OF_DAY, -1);
- startDate = sdf.format(calendar.getTime());
- Date start = sdf.parse(startDate);
- Date end = sdf.parse(endDate);
- // 查询es 现有的数据
- // TCoinOrderExample example = new TCoinOrderExample();
- // TCoinOrderExample.Criteria criteria = example.createCriteria();
- // criteria.andCreateDateGreaterThanOrEqualTo(start);
- // criteria.andCreateDateLessThanOrEqualTo(end);
- // criteria.andEsEqualTo("1");
- // List<TCoinOrder> list_es = tCoinOrderService.selectByOption(example);
- LambdaQueryWrapper<TCoinOrder> query = Wrappers.lambdaQuery();
- query.gt(TCoinOrder::getCreateDate,start);
- query.lt(TCoinOrder::getCreateDate,end);
- List<TCoinOrder> list_es = tCoinOrderService.list(query);
- if (CollectionUtils.isNotEmpty(list_es)) {
- // insertBatch(list_es);
- if(list_es.size()>0){
- for(TCoinOrder coinOrder : list_es){
- try {
- updateDataById(coinOrder);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
- }
|