package com.szwl.service.es; import cn.com.crbank.ommo.esclient.EsBaseService; import cn.com.crbank.ommo.exception.MyException; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.szwl.model.entity.TCoinOrder; import com.szwl.model.query.TCoinOrderParam; import com.szwl.model.utils.DateUtils; import com.szwl.service.TCoinOrderService; import lombok.extern.slf4j.Slf4j; //import org.apache.commons.collections4.CollectionUtils; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.indices.GetIndexRequest; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.List; @Slf4j @Service public class EsTCoinOrderService extends EsBaseService { private static boolean InitEsTableStatus = false; // 是否正常完成 es 初始化 public static final int MAX_ROW = 10000; @Autowired TCoinOrderService tCoinOrderService; @Override public String getTableName() { return "es_t_coin_order"; } @Override public TCoinOrder getInstanceOfEntity() { return new TCoinOrder(); } @Override public void setInitTableStatus(boolean flag) { InitEsTableStatus = flag; } @Override public boolean getInitTableStatus() { return InitEsTableStatus; } @Override public String getEntityPrimaryKey(TCoinOrder tCoinOrder) { return String.valueOf(tCoinOrder.getId()); } @Override public TCoinOrder setEntityPrimaryKey(TCoinOrder tCoinOrder, String value) { return null; } // @Override // public TCoinOrder setEntityPrimaryKey(TCoinOrder tCoinOrder, String value) { // return tCoinOrder.setId(Long.parseLong(value)); // } @Override public void initTableFun() { try{ String tableName = getTableName(); GetIndexRequest request = new GetIndexRequest(tableName); // DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(this.getTableName()); // restHighLevelClient.indices().delete(deleteIndexRequest,RequestOptions.DEFAULT); boolean isExists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT); if (!isExists) { log.info("es 索引 开始创建"+tableName); createTable(); // 初始化旧流水 int num = 0; while (true) { int limit = MAX_ROW; // int offset= num * MAX_ROW; int offset= num; // TCoinOrderExample example = new TCoinOrderExample(); // example.setLimit(limit); // example.setOffset(offset); // List list = tCoinOrderService.selectByOption(example); LambdaQueryWrapper query = Wrappers.lambdaQuery(); SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String time = "2022-06-01 00:00:00"; Date date = ft.parse(time); query.gt(TCoinOrder::getCreateDate,date); Page page = new Page<>(offset, limit, true); IPage iPage = tCoinOrderService.page(page, query); List list = iPage.getRecords(); insertBatch(list); num++; if(list.size()< MAX_ROW){ // 数据小于 最大值 ,证明后面已无数据,则跳出 break; } } } else { log.info("es 索引 "+tableName+" 已存在不再创建"); } InitEsTableStatus = true; }catch (Exception e){ log.error("ElasticsearchRunner InitEsTCoinOrderThread 发生错误:{}" , e); throw new MyException("ElasticsearchRunner InitEsTCoinOrderThread 发生错误:" + e.getMessage()); } } /** * 根据时间 重新同步es * @param * @return */ public void updateEsByDate(String startTime, String endTime) { try { startTime = startTime.replace("/", "-"); endTime = endTime.replace("/", "-"); Date start = DateUtils.parseDate(startTime, DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss, new Date()); Date end = DateUtils.parseDate(endTime, DateUtils.PATTERN_yyyy_MM_dd_HH_mm_ss, new Date()); // 查询es 现有的数据,删除 TCoinOrderParam param = new TCoinOrderParam(); param.setCreateDate_start(start); param.setCreateDate_end(end); List list_es = this.selectEntityByEqualToOption(param); for (TCoinOrder entity : list_es) { this.deleteTableById(String.valueOf(entity.getId())); } // 插入 新的 // List list = tCoinOrderService.selectForEs(param); LambdaQueryWrapper query = Wrappers.lambdaQuery(); query.gt(TCoinOrder::getCreateDate,start); query.lt(TCoinOrder::getCreateDate,end); List list = tCoinOrderService.list(query); if (CollectionUtils.isNotEmpty(list)) { insertBatch(list); } } catch (Exception e) { log.error(this.getTableName()+" updateEsByDate 发生错误:{}", e); throw new MyException(this.getTableName()+" updateEsByDate 发生错误:" + e.getMessage()); } } public void tongbuByHour() throws ParseException { String startDate = null; String endDate = null; SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Calendar calendar = Calendar.getInstance(); calendar.setTime(new Date()); endDate = sdf.format(calendar.getTime()); calendar.add(calendar.HOUR_OF_DAY, -1); startDate = sdf.format(calendar.getTime()); Date start = sdf.parse(startDate); Date end = sdf.parse(endDate); // 查询es 现有的数据 // TCoinOrderExample example = new TCoinOrderExample(); // TCoinOrderExample.Criteria criteria = example.createCriteria(); // criteria.andCreateDateGreaterThanOrEqualTo(start); // criteria.andCreateDateLessThanOrEqualTo(end); // criteria.andEsEqualTo("1"); // List list_es = tCoinOrderService.selectByOption(example); LambdaQueryWrapper query = Wrappers.lambdaQuery(); query.gt(TCoinOrder::getCreateDate,start); query.lt(TCoinOrder::getCreateDate,end); List list_es = tCoinOrderService.list(query); if (CollectionUtils.isNotEmpty(list_es)) { // insertBatch(list_es); if(list_es.size()>0){ for(TCoinOrder coinOrder : list_es){ try { updateDataById(coinOrder); } catch (Exception e) { e.printStackTrace(); } } } } } }