提交 ebbca44a 作者: ZhangJingKun

Merge remote-tracking branch 'origin/master'

...@@ -94,6 +94,20 @@ ...@@ -94,6 +94,20 @@
<artifactId>fastjson</artifactId> <artifactId>fastjson</artifactId>
<version>1.2.83</version> <version>1.2.83</version>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.3.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.0</version>
</dependency>
</dependencies> </dependencies>
......
package com.zzsn.leaderbase; package com.zzsn.leaderbase;
import com.zzsn.leaderbase.config.TaskExecutor;
import org.mybatis.spring.annotation.MapperScan; import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication @SpringBootApplication
@MapperScan(value={"com.zzsn.leaderbase.mapper*"}) @EnableAsync
@MapperScan(value = {"com.zzsn.leaderbase.mapper*"})
public class LeaderBaseApplication { public class LeaderBaseApplication {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(LeaderBaseApplication.class, args); SpringApplication.run(LeaderBaseApplication.class, args);
} }
@Bean
public TaskExecutor schedulerRunner() {
return new TaskExecutor();
}
} }
package com.zzsn.leaderbase.config;
import lombok.Data;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@ConfigurationProperties(prefix = "es1")
@Configuration
@Data
public class Elasticsearch1Config {
private String endpoint1;
private Integer endpoint1port;
private String endpoint2;
private Integer endpoint2port;
private String endpoint3;
private Integer endpoint3port;
private String username;
private String password;
@Bean
@Primary
public RestHighLevelClient elasticsearch1Client() {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
RestClientBuilder builder = RestClient.builder(
new HttpHost(endpoint1, endpoint1port, "http"),
new HttpHost(endpoint2, endpoint2port, "http"),
new HttpHost(endpoint3, endpoint3port, "http"))
.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
// 设置连接超时时间和套接字超时时间
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
requestConfigBuilder.setConnectTimeout(300000); // 连接超时时间为300秒
requestConfigBuilder.setSocketTimeout(300000); // 套接字超时时间为300秒
httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());
return httpClientBuilder;
});
return new RestHighLevelClient(builder);
}
// @Bean
// public RestHighLevelClient elasticsearch1Client() {
// final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
//
// credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
//
// RestClientBuilder builder = RestClient.builder(HttpHost.create(endpoints))
// .setHttpClientConfigCallback(httpClientBuilder -> {
// httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
// return httpClientBuilder;
// });
//
// return new RestHighLevelClient(builder);
// }
// @Bean
// public RestHighLevelClient elasticsearch1Client() {
// RestHighLevelClient client = new RestHighLevelClient(
// RestClient.builder(
// new HttpHost("",9200,"http"),
// new HttpHost("",9200,"http")
// )
// );
// return client;
//
// }
}
package com.zzsn.leaderbase.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* 开启缓存支持
* @author zyf
* @Return:
*/
@Slf4j
@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
/**
* RedisTemplate配置
* @param lettuceConnectionFactory
* @return
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
log.info(" --- redis config init --- ");
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer =jacksonSerializer();
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
RedisSerializer<?> stringSerializer = new StringRedisSerializer();
// key序列化
redisTemplate.setKeySerializer(stringSerializer);
// value序列化
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
// Hash key序列化
redisTemplate.setHashKeySerializer(stringSerializer);
// Hash value序列化
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
private Jackson2JsonRedisSerializer jacksonSerializer() {
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
return jackson2JsonRedisSerializer;
}
}
package com.zzsn.leaderbase.config;
import com.zzsn.leaderbase.service.DealLeaderDataService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class TaskExecutor implements CommandLineRunner {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
private final Integer PERIOD=100;
@Autowired
private DealLeaderDataService dealLeaderDataService;
@Override
public void run(String... args)throws Exception {
scheduledExecutorService.scheduleAtFixedRate(()->{
try {
dealLeaderDataService.getData();
} catch (IOException e) {
e.printStackTrace();
}
},5,PERIOD, TimeUnit.SECONDS);
log.info("简易版定时任务启动成功!{}{}执行一次",PERIOD,TimeUnit.SECONDS);
}
}
...@@ -49,4 +49,5 @@ public interface CommonConstant { ...@@ -49,4 +49,5 @@ public interface CommonConstant {
//处理后的专题资讯信息存储索引。 //处理后的专题资讯信息存储索引。
public final static String ES_DATA_FOR_SUBJECT = "subjectdatabase"; public final static String ES_DATA_FOR_SUBJECT = "subjectdatabase";
String LAST_TIME ="lastTime" ;
} }
package com.zzsn.leaderbase.service;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.zzsn.leaderbase.constant.CommonConstant;
import com.zzsn.leaderbase.util.EsDateUtil;
import com.zzsn.leaderbase.util.EsUtil;
import com.zzsn.leaderbase.util.HttpUtil;
import com.zzsn.leaderbase.vo.InfoExtractionParam;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
@Slf4j
public class DealLeaderDataService {
@Autowired
private EsUtil esUtil;
@Value("${python.getInfoUrl}")
private String relationEntityUrl;
@Autowired
RedisTemplate redisTemplate;
public void getData() throws IOException {
log.info("获取数据");
Object object = redisTemplate.opsForValue().get(CommonConstant.LAST_TIME);
String startTime = null;
if (object != null) {
startTime = object.toString();
} else {
startTime = "2023-01-01 12:12:12";
}
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder1 = QueryBuilders.boolQuery()
.must(QueryBuilders.termsQuery("subjectId", Arrays.asList("1750716233967157250", "1750716067187048450")));
boolQueryBuilder1.filter(QueryBuilders.rangeQuery("createDate").gte(EsDateUtil.esFieldDateFormat(startTime)));
searchSourceBuilder.query(boolQueryBuilder1);
searchSourceBuilder.sort("createDate", SortOrder.DESC);
Integer pagesize = 1;
for (int i = 1; true; i++) {
Page<InfoExtractionParam> subjectdatabase = esUtil.queryPage("subjectdatabase", searchSourceBuilder, InfoExtractionParam.class, i, pagesize);
List<InfoExtractionParam> records = subjectdatabase.getRecords();
log.info("页码:{},总页:{},总数量{}", i, subjectdatabase.getPages(), subjectdatabase.getTotal());
if (CollectionUtil.isNotEmpty(records)) {
JSONObject jsonObjectParam = new JSONObject();
Map<String, List<InfoExtractionParam>> map = new HashMap<>();
map.put("data_list", records);
jsonObjectParam.put("data", map);
String result = HttpUtil.doPost(relationEntityUrl, jsonObjectParam, 10000);
Object objectResult = parseResult(result);
JSONObject dataResult = null;
if (null != objectResult) {
dataResult = JSONObject.parseObject(objectResult.toString());
}
if (dataResult != null) {
log.info(String.valueOf(dataResult));
}
} else {
log.info("此轮数据处理完毕============================================");
break;
}
}
}
private Object parseResult(String result) {
JSONObject jsonObject = JSONObject.parseObject(result);
if (null != jsonObject && "200".equals(jsonObject.getString("code"))) {
return jsonObject.get("result");
}
log.error("python 服务结果异常,响应信息{}", result);
return null;
}
}
package com.zzsn.leaderbase.util;
/**
* Es对日期类型处理
* @author kongliufeng
* @create 2020-08-07 14:05
*/
public class EsDateUtil {
/**
* yyyy-MM-dd HH:mm:ss ->yyyy-MM-ddTHH:mm:ss
* @param data
* @return
*/
public static String esFieldDateFormat(String data) {
if (data == null)
return data;
if (data.length() == 19) {//标准yyyy-MM-dd HH:mm:ss
return data.replace(" ", "T");
} else if (data.length() == 10) {//yyyy-MM-dd
return data;
}
return null;
}
/**
* yyyy-MM-dd HH:mm:ss ->yyyy-MM-ddTHH:mm:ss
* @param data
* @return
*/
public static String esFieldDateMapping(String data) {
if (data == null)
return data;
if (data.length() == 19) {//标准yyyy-MM-dd HH:mm:ss
return data.replace("T", " ");
} else if (data.length() == 10) {//yyyy-MM-dd
return data;
}
return null;
}
}
package com.zzsn.leaderbase.util;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.zzsn.leaderbase.vo.SubjectInfoVo;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.MainResponse;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.lang.reflect.Field;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;
/**
* Description: es数据操作工具类
* Author: EDY
* Date: 2023/7/28
*/
@Component
@Slf4j
public class EsUtil {
/*es客户端*/
@Autowired
@Qualifier("elasticsearch1Client")
RestHighLevelClient client;
/**
* 获取节点相关信息
*
* @return
*/
public Map<String, Object> getEsInfo() {
try {
Map<String, Object> map = new HashMap<>();
//获取Es相关集群信息
MainResponse response = client.info(RequestOptions.DEFAULT);
String clusterName = response.getClusterName();
String clusterUuid = response.getClusterUuid();
String nodeName = response.getNodeName();
MainResponse.Version version = response.getVersion();
String buildDate = version.getBuildDate();
String buildFlavor = version.getBuildFlavor();
String buildHash = version.getBuildHash();
String buildType = version.getBuildType();
String luceneVersion = version.getLuceneVersion();
String minimumIndexCompatibilityVersion = version.getMinimumIndexCompatibilityVersion();
String minimumWireCompatibilityVersion = version.getMinimumWireCompatibilityVersion();
String number = version.getNumber();
map.put("clusterName", clusterName);
map.put("clusterUuid", clusterUuid);
map.put("nodeName", nodeName);
map.put("version", version);
map.put("buildDate", buildDate);
map.put("buildFlavor", buildFlavor);
map.put("buildHash", buildHash);
map.put("buildType", buildType);
map.put("luceneVersion", luceneVersion);
map.put("minimumIndexCompatibilityVersion", minimumIndexCompatibilityVersion);
map.put("minimumWireCompatibilityVersion", minimumWireCompatibilityVersion);
map.put("number", number);
return map;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* 获取低级客户端
*
* @return
*/
public RestClient getLowLevelClient() {
return client.getLowLevelClient();
}
/**
* 创建索引
*
* @param index
* @return
*/
public boolean indexCreate(String index) {
try {
if (!indexExist(index)) {
CreateIndexRequest request = new CreateIndexRequest(index);
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
log.info(createIndexResponse.isAcknowledged() ? "创建索引[{}]成功" : "创建索引[{}]失败", index);
return createIndexResponse.isAcknowledged();
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* 判断索引是否存在
*
* @param indices
* @return
*/
@SneakyThrows
public boolean indexExist(String... indices) {
GetIndexRequest request = new GetIndexRequest(indices);
return client.indices().exists(request, RequestOptions.DEFAULT);
}
/**
* 删除索引
*
* @param index
* @return
*/
@SneakyThrows
public boolean deleteIndex(String index) {
DeleteIndexRequest request = new DeleteIndexRequest(index);
AcknowledgedResponse delete = client.indices().delete(request, RequestOptions.DEFAULT);
return delete.isAcknowledged();
}
/**
* 判断doc是否存在
*
* @param index
* @param id
* @return
*/
@SneakyThrows
public boolean docExists(String index, String id) {
GetRequest request = new GetRequest(index, id);
//禁用提取_source
request.fetchSourceContext(new FetchSourceContext(false));
//禁用获取存储的字段
request.storedFields("_none_");
boolean exists = client.exists(request, RequestOptions.DEFAULT);
return exists;
}
/**
* 插入数据
*
* @param index
* @param id
* @param object
* @return
*/
public String docSaveByEntity(String index, String id, Object object) {
return docSaveByJson(index, id, JSON.toJSONString(object, SerializerFeature.WriteMapNullValue));
}
public void docSaveByEntityAsync(String index, String id, Object object) {
docSaveByJsonAsync(index, id, JSON.toJSONString(object, SerializerFeature.WriteMapNullValue));
}
public String docSaveByMap(String index, String id, Map<String, Object> map) {
try {
IndexRequest request = new IndexRequest(index).id(id)
.source(map);
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
return indexResponse.getId();
} catch (IOException e) {
e.printStackTrace();
}
return index;
}
/**
* 批量插入数据
*
* @param index 索引库
* @param list List<ESBaseData> 批量保存的list,根据实际需求实体集成ESBaseData
*/
public <T> void docSaveBulkany(String index, List<T> list) {
BulkRequest request = new BulkRequest();
request.timeout(TimeValue.timeValueMinutes(10));
for (int i = 0; i < list.size(); i++) {
T b = list.get(i);
String id = null;
try {
Field field = b.getClass().getDeclaredField("id");
field.setAccessible(true);
id = (String)field.get(b);
} catch (NoSuchFieldException e) {
log.info("实体没有id字段");
continue;
} catch (IllegalAccessException e) {
log.info("无权限访问id字段");
continue;
}
request.add(new IndexRequest(index).id(id).source(
JSON.toJSONString(list.get(i)), XContentType.JSON
));
}
try {
BulkResponse bulk = client.bulk(request, RequestOptions.DEFAULT);
BulkItemResponse[] bulkItemResponses = bulk.getItems();
int length = bulkItemResponses.length;
for (int i = 0; i < length; ++i) {
BulkItemResponse response = bulkItemResponses[i];
if (response.isFailed()) {
log.info("批量保存[{}]过程中,id为[{}]的保存失败,失败原因[{}]", response.getIndex(), response.getId(), response.getFailureMessage());
} else {
// log.info("批量保存[{}]过程中,id为[{}]的保存成功,状态[{}],version[{}]", response.getIndex(), response.getId(), response.status(), response.getVersion());
}
}
} catch (IOException e) {
e.printStackTrace();
log.warn("批量[{}]保存失败", index);
}
}
/**
* 批量插入数据(异步)
* @param index 索引库
* @param list List<ESBaseData> 批量保存的list,根据实际需求实体集成ESBaseData
*/
public <T> void docSaveBulkAsync(String index,List<T> list){
BulkRequest request = new BulkRequest();
request.timeout(TimeValue.timeValueMinutes(10));
for (int i = 0; i < list.size(); i++) {
T b = list.get(i);
String id = null;
try {
Field field = b.getClass().getDeclaredField("id");
field.setAccessible(true);
id = (String)field.get(b);
} catch (NoSuchFieldException e) {
log.info("实体没有id字段");
continue;
} catch (IllegalAccessException e) {
log.info("无权限访问id字段");
continue;
}
request.add(new IndexRequest(index).id(id).source(
JSON.toJSONString(list.get(i)), XContentType.JSON
));
}
client.bulkAsync(request, RequestOptions.DEFAULT, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
BulkItemResponse[] bulkItems = bulkItemResponses.getItems();
int length = bulkItems.length;
for(int i = 0; i < length; ++i) {
BulkItemResponse response = bulkItems[i];
if (response.isFailed()) {//查看所有请求失败结果
log.info("批量保存[{}]过程中,id为[{}]的保存失败,失败原因[{}]",response.getIndex(),response.getId(),response.getFailureMessage());
}else{//请求成功的
log.info("批量保存[{}]过程中,id为[{}]的保存成功,状态[{}],version[{}]",response.getIndex(),response.getId(),response.status(),response.getVersion());
}
}
}
@Override
public void onFailure(Exception e) {
log.warn("批量[{}]保存失败,失败原因[{}]",index,e.getMessage());
}
});
}
/**
* 批量更新操作,根据指定的查询条件和字段信息,更新符合条件的文档的指定字段值。
* 方法名:updataBatchByQuery,表示批量更新文档的方法。
* 参数:index,指定要更新的索引名称。
* 参数:boolQuery,BoolQueryBuilder对象,表示更新文档的查询条件。
* 参数:modifyColum,表示要更新的字段名。
* 参数:modifyColumValue,表示要更新的字段值。
* 方法抛出了IOException和InterruptedException异常。
* 创建了一个UpdateByQueryRequest对象,传入了要更新的索引名称。
* 设置查询条件,使用setQuery方法,传入BoolQueryBuilder对象。
* 设置更新脚本,使用setScript方法,传入Script对象。脚本使用painless语言,通过ctx._source.字段名 = 字段值的方式来更新文档的指定字段值。
* 调用client.updateByQuery方法,传入UpdateByQueryRequest对象和默认的RequestOptions,执行批量更新操作,并返回BulkByScrollResponse对象。
* */
public void updataBatchByQuery(String index, BoolQueryBuilder boolQuery, String modifyColum, String modifyColumValue) throws IOException, InterruptedException {
UpdateByQueryRequest request = new UpdateByQueryRequest(index);
request.setQuery(boolQuery);
request.setScript(new Script(ScriptType.INLINE, "painless", "ctx._source."+modifyColum+" = '"+modifyColumValue+"'", Collections.emptyMap()));
BulkByScrollResponse response = client.updateByQuery(request, RequestOptions.DEFAULT);
long updated = response.getUpdated();
log.info("更新条数{}",updated);
}
/**
* 批量更新操作,根据指定的查询条件和多个字段的映射关系,更新符合条件的文档的多个字段的值。
* 方法名:updataMoreColumBatchByQuery,表示批量更新文档的多个字段的方法。
* 参数:index,指定要更新的索引名称。
* 参数:boolQuery,BoolQueryBuilder对象,表示更新文档的查询条件。
* 参数:modifyColum_value,表示要更新的多个字段名和对应的字段值的映射关系。
* 方法抛出了IOException和InterruptedException异常。
* 创建了一个UpdateByQueryRequest对象,传入了要更新的索引名称。
* 设置查询条件,使用setQuery方法,传入BoolQueryBuilder对象。
* 调用getIdOrCode方法,传入多个字段名和字段值的映射关系,获取更新脚本。
* 设置更新脚本,使用setScript方法,传入Script对象。脚本使用painless语言,通过ctx._source.字段名 = 字段值的方式来更新文档的多个字段的值。
* 调用client.updateByQuery方法,传入UpdateByQueryRequest对象和默认的RequestOptions,执行批量更新操作,并返回BulkByScrollResponse对象。
* Collections.emptyMap()是setScript脚本的参数,此方法没有设置其他参数,使用一个空的map
* */
public void updataMoreColumBatchByQuery(String index, BoolQueryBuilder boolQuery, Map<String,String> modifyColum_value) throws IOException, InterruptedException {
UpdateByQueryRequest request = new UpdateByQueryRequest(index);
request.setQuery(boolQuery);
request.setScript(new Script(ScriptType.INLINE, "painless", getIdOrCode(modifyColum_value), Collections.emptyMap()));
BulkByScrollResponse response = client.updateByQuery(request, RequestOptions.DEFAULT);
long updated = response.getUpdated();
log.info("更新条数{}",updated);
}
private String getIdOrCode(Map<String,String> colum_value){
StringBuffer script = new StringBuffer();
colum_value.forEach((colum,value)->{
script.append("ctx._source."+colum+" = '"+value+"';");
});
return script.toString();
}
/**
* 这段代码是一个用于获取Elasticsearch索引中某个字段的最大值的方法。
* getColumMax方法:
* 参数:index表示索引名称,boolQuery表示查询条件,colum表示要获取最大值的字段名。
* 创建一个搜索请求SearchRequest,并将index作为参数传入。
* 创建一个搜索源构建器SearchSourceBuilder,并设置其大小为0(即只返回聚合结果,不返回具体文档)。
* 如果boolQuery不为空,则将其设置为搜索源构建器的查询条件。
* 创建一个TermsAggregationBuilder聚合器,用于按照字段colum进行聚合。
* 设置聚合器的大小为1,表示只返回一个聚合结果。
* 设置聚合器的排序方式为按照聚合桶的键值降序排序。
* 将聚合器添加到搜索源构建器中。
* 将搜索源构建器设置为搜索请求的源。
* 使用client.search方法执行搜索请求,返回一个SearchResponse对象。
* 从SearchResponse中获取聚合结果Aggregations。
* 根据聚合结果中的聚合器名称"groupByColum"获取到对应的Terms聚合器。
* 从Terms聚合器中获取聚合桶codeBuckets,即按照字段colum聚合后的结果。
* 如果聚合桶不为空,则返回第一个聚合桶的键值转换为字符串;否则返回null。
* 如果执行搜索请求过程中发生异常,则打印异常堆栈并返回null。
* 该方法的作用是执行一个查询请求,按照指定字段进行聚合,并获取聚合结果中的最大值。返回的最大值是一个字符串类型。
* */
public String getColumMax(String index, BoolQueryBuilder boolQuery,String colum){
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.size(0);
if (ObjectUtil.isNotEmpty(boolQuery)){
searchSourceBuilder.query(boolQuery);
}
TermsAggregationBuilder aggregation = AggregationBuilders.terms("groupByColum")
.field(colum)
.size(1)
.order(BucketOrder.key(false));
searchSourceBuilder.aggregation(aggregation);
searchRequest.source(searchSourceBuilder);
try {
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
Aggregations aggregations = response.getAggregations();
Terms groupCode = aggregations.get("groupByColum");
List<? extends Terms.Bucket> codeBuckets = groupCode.getBuckets();
return CollectionUtil.isNotEmpty(codeBuckets)? codeBuckets.get(0).getKeyAsString() : null;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 根据文档id更新
* 参数:index表示索引名称,id表示文档的id,args表示要更新的字段和对应的值。
* 首先判断args是否为空,如果为空则直接返回。
* 创建一个更新请求UpdateRequest,并将index和id作为参数传入。
* 使用XContentBuilder构建要更新的内容,将args中的字段和值添加到contentBuilder中。
* 将contentBuilder设置为更新请求的内容。
* 使用client.update方法执行更新请求,返回一个UpdateResponse对象。
* 根据UpdateResponse的结果进行处理,如果更新成功,则打印日志;如果没有进行任何更改,则打印日志;如果更新失败,则打印日志。*/
public void updateById(String index,String id , Map<String,Object> args) throws IOException {
if (CollectionUtil.isEmpty(args)){
return;
}
// 执行更新请求
UpdateResponse response = client.update(createUpdateRequest(index,id,args), RequestOptions.DEFAULT);
// 处理更新的响应结果
if (response.getResult() == DocWriteResponse.Result.UPDATED) {
// log.info("{}修改操作处理成功",id);
} else if (response.getResult() == DocWriteResponse.Result.NOOP) {
// log.info("{}没有进行任何更改",id);
} else {
log.info("{}更新失败eeeeeeeeeeeeeeeeeeeeeeeee",id);
}
}
/**整体更新*/
public boolean updateById(String index, String id, JSONObject jsonStr) {
UpdateRequest request = new UpdateRequest(index, id);
//刷新策略,默认
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
request.setRefreshPolicy("true");
request.doc(jsonStr, XContentType.JSON);
try {
UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
return response.status() == RestStatus.OK;
} catch (IOException e) {
e.printStackTrace();
log.warn("更新doc失败, _index=[{}], _id=[{}],_jsonStr=[{}]", index, id, jsonStr);
}
return false;
}
/**
* 根据id更新,批量更新
* 参数:index表示索引名称,batch表示要批量更新的文档,其中batch是一个Map,key为文档的id,value为要更新的字段和对应的值。
* 首先判断batch是否为空,如果为空则直接返回。
* 创建一个批量请求BulkRequest。
* 遍历batch中的每个文档,将每个文档的id和要更新的字段和值创建一个更新请求UpdateRequest,并将其添加到bulkRequest中。
* 使用client.bulk方法执行批量更新请求,返回一个BulkResponse对象。
* 根据BulkResponse的结果进行处理,如果有更新失败的情况,则打印日志;如果全部更新成功,则打印日志。
* */
public void bulkUpdateDocuments(String index, Map<String , Map<String,Object> > batch) throws IOException {
if(CollectionUtil.isEmpty(batch)){
return;
}
BulkRequest bulkRequest = new BulkRequest();
// 添加批量更新的请求
batch.forEach((id,args)->{
try {
bulkRequest.add(createUpdateRequest(index,id,args));
} catch (IOException e) {
log.info("添加更新请求异常");
}
});
// 添加更多的更新请求
// 执行批量更新请求
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
// 处理批量更新的响应结果
if (bulkResponse.hasFailures()) {
// 处理失败的情况
log.info("批量更新失败{}",batch);
} else {
// 处理成功的情况
log.info("批量更新成功{}",batch);
}
}
/**
* 创建更新请求对象
* 参数:index表示索引名称,documentId表示文档的id,args表示要更新的字段和对应的值。
* 创建一个更新请求UpdateRequest,并将index和documentId作为参数传入。
* 使用XContentBuilder构建要更新的内容,将args中的字段和值添加到contentBuilder中。
* 将contentBuilder设置为更新请求的内容。
* 返回更新请求UpdateRequest对象。*/
private UpdateRequest createUpdateRequest(String index,String documentId, Map<String,Object> args) throws IOException {
UpdateRequest request = new UpdateRequest(index, documentId);
// 创建要更新的内容
XContentBuilder contentBuilder = XContentFactory.jsonBuilder();
contentBuilder.startObject();
args.forEach((fieldName,fieldValue) ->
{
try {
contentBuilder.field(fieldName, fieldValue);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
);
contentBuilder.endObject();
// 设置更新请求的内容
request.doc(contentBuilder);
return request;
}
/**
* 根据id删除doc
*
* @param index
* @param id
* @return
*/
public Boolean delById(String index, String id) {
try {
DeleteRequest deleteRequest = new DeleteRequest(index, id);
DeleteResponse delete = client.delete(deleteRequest, RequestOptions.DEFAULT);
if (delete.status() == RestStatus.OK) {
log.info("DELETE /{}/_doc/{}/\r\n", index, id);
return true;
}
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
public void delByIdAsync(String index, String id) {
DeleteRequest request = new DeleteRequest(index, id);
try {
client.deleteAsync(request, RequestOptions.DEFAULT, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
log.info("删除doc成功, _index=[{}], _id=[{}]", index, deleteResponse.getId());
}
@Override
public void onFailure(Exception e) {
e.printStackTrace();
log.warn("删除doc失败, _index=[{}], _id=[{}]", index, id);
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 根据id批量删除
*
* */
public void delByIds(String index,List<String> ids){
if(CollectionUtil.isEmpty(ids)){
return;
}
// 创建BulkRequest对象
BulkRequest bulkRequest = new BulkRequest();
ids.forEach(e -> {
// 添加多个DeleteRequest对象
bulkRequest.add(new DeleteRequest(index, e));
});
// 执行批量请求
try {
BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
// 处理批量更新的响应结果
if (bulk.hasFailures()) {
// 处理失败的情况
log.info("批量删除失败{}",ids);
} else {
// 处理成功的情况
log.info("批量删除成功{}",ids);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 根据id批量删除,List中的字符串需要满足 索引库名称-id格式,如 subjectdatabase2024==76578897678687656785
*
* */
public void delByIds(List<String> index_id){
if(CollectionUtil.isEmpty(index_id)){
return;
}
// 创建BulkRequest对象
BulkRequest bulkRequest = new BulkRequest();
index_id.forEach(e -> {
String[] split = e.split("==");
String index = split[0];
String id = split[1];
// 添加多个DeleteRequest对象
bulkRequest.add(new DeleteRequest(index, id));
});
// 执行批量请求
try {
BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
// 处理批量更新的响应结果
if (bulk.hasFailures()) {
// 处理失败的情况
log.info("批量删除失败{}",index_id);
} else {
// 处理成功的情况
log.info("批量删除成功{}",index_id);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 根据条件删除
*
* @param index
* @param query
* @return
*/
public Long delByQuery(final String index, QueryBuilder query) {
try {
DeleteByQueryRequest request = new DeleteByQueryRequest(index);
request.setQuery(query).setRefresh(true);
BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(request, RequestOptions.DEFAULT);
return bulkByScrollResponse.getDeleted();
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
/**索引库所有数据分页查询**/
public <T> List<T> getAllPageList (String index,Integer page, Integer pagesize, Class<T> entry) throws Exception {
List<T> list = new ArrayList<>();
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//创建查询对象
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
//排序createDate
searchSourceBuilder.sort("publishDate", SortOrder.ASC);
searchSourceBuilder.size(pagesize);
searchSourceBuilder.from((page - 1)*pagesize);
searchRequest.source(searchSourceBuilder);
SearchResponse search = client.search(searchRequest, RequestOptions.DEFAULT);
SearchHit[] hits = search.getHits().getHits();
if (hits.length>0){
List<SearchHit> collect = Arrays.stream(hits).collect(Collectors.toList());
collect.forEach( e -> {
T t = JSON.parseObject(e.getSourceAsString(), entry);
list.add(t);
});
return list;
}
return list;
}
/**
* 根据index,id索引文件
*
* @param index
* @param id
* @return
*/
public <T> T getInfoByid(String index, String id, Class<T> entry) {
T res = null;
try {
GetRequest searchRequest = new GetRequest(index, id);
GetResponse documentFields = client.get(searchRequest, RequestOptions.DEFAULT);
res = JSON.parseObject(documentFields.getSourceAsString(), entry);
return res;
} catch (IOException e) {
log.warn("查询doc异常,index=[{}],id=[{}], ex=[{}]", index, id, e.getMessage());
}
return res;
}
/**
* 根据id查询各类资讯详情
* */
public <T> T getInfoByid1(String index, String id, Class<T> entry){
try {
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("id", id);
searchSourceBuilder.query(termQueryBuilder);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
SearchHit[] hits = searchResponse.getHits().getHits();
T res = null;
if (hits.length>0){
res = JSON.parseObject(hits[0].getSourceAsString(), entry);
}
return res;
} catch (IOException e) {
log.info("查询异常{}",e.getMessage(),e);
return null;
}
}
/**
* 通用查询,
* */
public <T> List<T> queryList(String index, QueryBuilder queryBuilder, Class<T> entry,int size) {
List<T> list = new ArrayList<>();
try {
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.size(size);
searchSourceBuilder.query(queryBuilder);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
// 处理搜索结果
SearchHit[] hits = searchResponse.getHits().getHits();
if (hits.length>0){
Arrays.stream(hits).forEach(e -> {
T t = JSON.parseObject(e.getSourceAsString(), entry);
list.add(t);
});
}
return list;
} catch (IOException e) {
log.info("查询异常{}",e.getMessage(),e);
return list;
}
}
/**
* 通用分页查询
* */
public <T> Page<T> queryPage(String index, QueryBuilder queryBuilder, Class<T> entry, Integer pageNo, Integer pageSize) {
List<T> list = new ArrayList<>();
Page<T> pageData = new Page<>(pageNo, pageSize);
try {
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//设置分页参数
searchSourceBuilder.size(pageSize);
searchSourceBuilder.from((pageNo - 1) * pageSize);
//默认最大数量是10000,设置为true后,显示准确数量
searchSourceBuilder.sort("id", SortOrder.ASC);
searchSourceBuilder.trackTotalHits(true);
searchSourceBuilder.query(queryBuilder);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
// 处理搜索结果
SearchHit[] hits = searchResponse.getHits().getHits();
if (hits.length>0){
Arrays.stream(hits).forEach(e -> {
T t = JSON.parseObject(e.getSourceAsString(), entry);
list.add(t);
});
}
pageData.setTotal(searchResponse.getHits().getTotalHits().value);
pageData.setRecords(list);
return pageData;
} catch (IOException e) {
log.info("查询异常{}",e.getMessage(),e);
return pageData;
}
}
/**
* 通用分页查询
* */
public <T> Page<T> queryPage(String index, SearchSourceBuilder searchSourceBuilder, Class<T> entry, Integer pageNo, Integer pageSize) throws IOException {
List<T> list = new ArrayList<>();
Page<T> pageData = new Page<>(pageNo, pageSize);
SearchRequest searchRequest = new SearchRequest(index);
searchSourceBuilder.size(pageSize);
searchSourceBuilder.from((pageNo - 1) * pageSize);
//默认最大数量是10000,设置为true后,显示准确数量
searchSourceBuilder.trackTotalHits(true);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
// 处理搜索结果
SearchHit[] hits = searchResponse.getHits().getHits();
if (hits.length>0){
Arrays.stream(hits).forEach(e -> {
T t = JSON.parseObject(e.getSourceAsString(), entry);
list.add(t);
});
}
pageData.setTotal(searchResponse.getHits().getTotalHits().value);
pageData.setRecords(list);
return pageData;
}
/**
* 非聚合类查询,通用结果集封装
* */
public <T> List<T> queryPageBase(String index, Class<T> entry,SearchSourceBuilder searchSourceBuilder) throws IOException {
List<T> list = new ArrayList<>();
SearchRequest searchRequest = new SearchRequest(index);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
// 处理搜索结果
SearchHit[] hits = searchResponse.getHits().getHits();
if (hits.length>0){
Arrays.stream(hits).forEach(e -> {
T t = JSON.parseObject(e.getSourceAsString(), entry);
list.add(t);
});
}
return list;
}
/**
* 保存json
*
* @param index
* @param id
* @param jsonStr
* @return
*/
public String docSaveByJson(String index, String id, String jsonStr) {
try {
IndexRequest request = new IndexRequest(index)
.id(id)
.source(jsonStr, XContentType.JSON);
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
return indexResponse.getId();
} catch (IOException e) {
log.warn("同步保存doc失败, _index=[{}], _id=[{}]", index, id);
}
return index;
}
/**
* 异步创建doc
*
* @param index
* @param id
* @param jsonStr
* @return
*/
public void docSaveByJsonAsync(String index, String id, String jsonStr) {
IndexRequest request = new IndexRequest(index);
request.id(id);
request.source(jsonStr, XContentType.JSON);
client.indexAsync(request, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
log.info("异步保存doc, _index=[{}], _id=[{}]成功, _version=[{}], _result=[{}]", index, indexResponse.getId(), indexResponse.getVersion(), indexResponse.getResult());
}
@Override
public void onFailure(Exception e) {
e.printStackTrace();
log.warn("异步保存失败,尝试同步方式保存doc, ex=[{}]", e.getMessage());
try {
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
DocWriteResponse.Result result = response.getResult();
if (!(result == DocWriteResponse.Result.UPDATED || result == DocWriteResponse.Result.CREATED)) {
log.warn("同步保存doc失败,_index=[{}], _id=[{}], _body=[{}]", index, id, jsonStr);
}
} catch (IOException io) {
io.printStackTrace();
}
}
});
}
/**
* 根据索引名称获取带有当前年的索引名称 subjectdatabase --> subjectdatabase_2023
* @param index
* @return
*/
public String getIndexYear(String index){
return index + "_" + LocalDateTime.now().getYear();
}
/**
* 根据id更新,批量更新
* 参数:index表示索引名称,batch表示要批量更新的文档,其中batch是一个Map,key为文档的id,value为要更新的字段和对应的值。
* 首先判断batch是否为空,如果为空则直接返回。
* 创建一个批量请求BulkRequest。
* 遍历batch中的每个文档,将每个文档的id和要更新的字段和值创建一个更新请求UpdateRequest,并将其添加到bulkRequest中。
* 使用client.bulk方法执行批量更新请求,返回一个BulkResponse对象。
* 根据BulkResponse的结果进行处理,如果有更新失败的情况,则打印日志;如果全部更新成功,则打印日志。
* */
public void bulkUpdateDocuments(Map<String , Map<String,Object> > batch) throws IOException {
if(CollectionUtil.isEmpty(batch)){
return;
}
BulkRequest bulkRequest = new BulkRequest();
// 添加批量更新的请求
batch.forEach((idIndex,args)->{
String[] split = idIndex.split("-");
String id = split[0];
String index = split[1];
try {
bulkRequest.add(createUpdateRequest(index,id,args));
} catch (IOException e) {
log.info("添加更新请求异常");
}
});
// 添加更多的更新请求
// 执行批量更新请求
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
// 处理批量更新的响应结果
if (bulkResponse.hasFailures()) {
// 处理失败的情况
log.info("批量更新失败{}",batch);
} else {
// 处理成功的情况
log.info("批量更新成功{}",batch);
}
}
public boolean docUpdateById(String index, String id, String jsonStr) {
UpdateRequest request = new UpdateRequest(index, id);
//刷新策略,默认
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
request.setRefreshPolicy("true");
request.doc(jsonStr, XContentType.JSON);
try {
UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
return response.status() == RestStatus.OK;
} catch (IOException e) {
e.printStackTrace();
log.warn("更新doc失败, _index=[{}], _id=[{}],_jsonStr=[{}]", index, id, jsonStr);
}
return false;
}
public boolean docEditByEntity(String index, String id, Object object) {
return docUpdateById(index, id, JSON.toJSONString(object));
}
/**
* 通用分页查询
* */
public <T> Page<T> queryPage1(String index, QueryBuilder queryBuilder, Class<T> entry, Integer pageNo, Integer pageSize) {
List<SubjectInfoVo> list = new ArrayList<>();
Page<T> pageData = new Page<>(pageNo, pageSize);
try {
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//设置分页参数
searchSourceBuilder.size(pageSize);
searchSourceBuilder.from((pageNo - 1) * pageSize);
//默认最大数量是10000,设置为true后,显示准确数量
searchSourceBuilder.sort("processDate", SortOrder.ASC);
searchSourceBuilder.trackTotalHits(true);
searchSourceBuilder.query(queryBuilder);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
// 处理搜索结果
SearchHit[] hits = searchResponse.getHits().getHits();
if (hits.length>0){
for (SearchHit hit : hits) {
SubjectInfoVo subjectInfoVo = JSON.parseObject(hit.getSourceAsString(), SubjectInfoVo.class);
String hitIndex = hit.getIndex();
subjectInfoVo.setIndex(hitIndex);
list.add(subjectInfoVo);
}
}
pageData.setTotal(searchResponse.getHits().getTotalHits().value);
pageData.setRecords((List<T>) list);
return pageData;
} catch (IOException e) {
log.info("查询异常{}",e.getMessage(),e);
return pageData;
}
}
}
package com.zzsn.leaderbase.util;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.*;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.TrustStrategy;
import org.apache.http.util.EntityUtils;
import org.springframework.util.CollectionUtils;
import javax.net.ssl.SSLContext;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.Charset;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.*;
/**
* @Description: Http工具类
* @Author: zhangshuo
* @Date: 2021-06-08
* @Version: V1.0
*/
public class HttpUtil {
private static final CloseableHttpClient httpClient;
private static final String CHARSET = "utf-8";
// 采用静态代码块,初始化超时时间配置,再根据配置生成默认httpClient对象
static {
RequestConfig config = RequestConfig.custom().setConnectTimeout(10000).setSocketTimeout(10000).build();
httpClient = HttpClientBuilder.create().setDefaultRequestConfig(config).build();
}
/**
* HTTP Get 获取内容
*
* @param url 请求的url地址
* @param params 请求的参数
* @param charset 编码格式
* @return 页面内容
*/
public static String doGet(String url, Map<String, String> params, String charset) {
if (StringUtils.isBlank(url)) {
return null;
}
try {
if (params != null && !params.isEmpty()) {
List<NameValuePair> pairs = new ArrayList<NameValuePair>(params.size());
for (Map.Entry<String, String> entry : params.entrySet()) {
String value = entry.getValue();
if (value != null) {
pairs.add(new BasicNameValuePair(entry.getKey(), value));
}
}
// 将请求参数和url进行拼接
url += "?" + EntityUtils.toString(new UrlEncodedFormEntity(pairs, charset));
}
HttpGet httpGet = new HttpGet(url);
CloseableHttpResponse response = httpClient.execute(httpGet);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200) {
httpGet.abort();
throw new RuntimeException("HttpClient,error status code :" + statusCode);
}
HttpEntity entity = response.getEntity();
String result = null;
if (entity != null) {
result = EntityUtils.toString(entity, "utf-8");
}
EntityUtils.consume(entity);
response.close();
return result;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* HTTP Post 获取内容
*
* @param url 请求的url地址
* @return 页面内容
* @throws IOException
*/
public static String doPost(String url, JSONObject jsonObject, int ExTime)
throws IOException {
HttpClient httpclient = HttpClients.createDefault();
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(ExTime).setConnectTimeout(ExTime).build();
HttpPost httpPost = new HttpPost(url);
httpPost.setHeader("Content-Type", "application/json;charset=UTF-8");
httpPost.setHeader("Accept", "application/json");
httpPost.setConfig(requestConfig);
StringEntity se = new StringEntity(jsonObject.toJSONString(), "utf-8");
se.setContentType("application/json");
httpPost.setEntity(se);
HttpResponse response = httpclient.execute(httpPost);
String result = EntityUtils.toString(response.getEntity());
return result;
}
public static String doPost(String url, String jsonString, int ExTime)
throws IOException {
HttpClient httpclient = HttpClients.createDefault();
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(ExTime).setConnectTimeout(ExTime).build();
HttpPost httpPost = new HttpPost(url);
httpPost.setHeader("Content-Type", "application/json;charset=UTF-8");
httpPost.setHeader("Accept", "application/json");
httpPost.setConfig(requestConfig);
StringEntity se = new StringEntity(jsonString, "utf-8");
se.setContentType("application/json");
httpPost.setEntity(se);
HttpResponse response = httpclient.execute(httpPost);
String result = EntityUtils.toString(response.getEntity());
return result;
}
/**
* HTTP Post 获取内容
*
* @param url 请求的url地址
* @return 页面内容
* @throws IOException
*/
public static String doPostWithHeader(String url, JSONObject jsonObject, int ExTime , Map<String, String> headers)
throws IOException {
HttpClient httpclient = HttpClients.createDefault();
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(ExTime).setConnectTimeout(ExTime).build();
HttpPost httpPost = new HttpPost(url);
if (!CollectionUtils.isEmpty(headers)) {
for (Map.Entry<String, String> entry : headers.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
httpPost.setHeader(key, value);
}
}
httpPost.setConfig(requestConfig);
StringEntity se = new StringEntity(jsonObject.toJSONString(), "utf-8");
se.setContentType("application/json");
httpPost.setEntity(se);
HttpResponse response = httpclient.execute(httpPost);
String result = EntityUtils.toString(response.getEntity());
return result;
}
/**
* HTTP Post 获取内容
*
* @param url 请求的url地址
* @return 页面内容
* @throws IOException
*/
public static String doPost(String url, JSONArray data, int ExTime)
throws IOException {
HttpClient httpclient = HttpClients.createDefault();
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(ExTime).setConnectTimeout(ExTime).build();
HttpPost httpPost = new HttpPost(url);
httpPost.setHeader("Content-Type", "application/json;charset=UTF-8");
httpPost.setHeader("Accept", "application/json");
httpPost.setConfig(requestConfig);
StringEntity se = new StringEntity(data.toJSONString(), "utf-8");
se.setContentType("application/json");
httpPost.setEntity(se);
HttpResponse response = httpclient.execute(httpPost);
String result = EntityUtils.toString(response.getEntity());
return result;
}
/**
* HTTPS Get 获取内容(无SSL证书验证)
*
* @param url 请求的url地址
* @param params 请求的参数
* @param charset 编码格式
* @return 页面内容
*/
public static CloseableHttpResponse doGetSSL(String url, Map<String, String> params, String charset) {
if (StringUtils.isBlank(url)) {
return null;
}
try {
if (params != null && !params.isEmpty()) {
List<NameValuePair> pairs = new ArrayList<NameValuePair>(params.size());
for (Map.Entry<String, String> entry : params.entrySet()) {
String value = entry.getValue();
if (value != null) {
pairs.add(new BasicNameValuePair(entry.getKey(), value));
}
}
url += "?" + EntityUtils.toString(new UrlEncodedFormEntity(pairs, charset));
}
HttpGet httpGet = new HttpGet(url);
httpGet.setHeader("Content-Type", "application/x-www-form-urlencoded;charset=utf-8");
httpGet.setHeader(HttpHeaders.CONNECTION, "close");
// https 注意这里获取https内容,使用了忽略证书的方式,当然还有其他的方式来获取https内容
CloseableHttpClient httpsClient = createSSLClientDefault();
CloseableHttpResponse response = httpsClient.execute(httpGet);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200) {
httpGet.abort();
throw new RuntimeException("HttpClient,error status code :" + statusCode);
}
return response;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 这里创建了忽略整数验证的CloseableHttpClient对象
*
* @return SSLClientDefault
*/
public static CloseableHttpClient createSSLClientDefault() {
try {
SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() {
// 信任所有
public boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException {
return true;
}
}).build();
SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext);
return HttpClients.custom().setSSLSocketFactory(sslsf).build();
} catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException e) {
e.printStackTrace();
}
return HttpClients.createDefault();
}
/**
* ip代理请求
*
* @throws Exception
*/
public static CloseableHttpResponse getProxyHttpClient(String url) {
//获取代理ip信息
//TODO
String proxyHost = "";
int proxyPort = 0;
String userName = "";
String password = "";
try {
//设置代理IP和端口并设置链接、传输时间
HttpHost proxy = new HttpHost(proxyHost, proxyPort);
RequestConfig config = RequestConfig.custom().setProxy(proxy).setConnectTimeout(60000).setSocketTimeout(60000).build();
//设置账号密码
CredentialsProvider provider = new BasicCredentialsProvider();
provider.setCredentials(new AuthScope(proxy), new UsernamePasswordCredentials(userName, password));
CloseableHttpClient httpClient = HttpClients.custom().setDefaultRequestConfig(config)
.setDefaultCredentialsProvider(provider)
.build();
HttpGet httpGet = new HttpGet(url);
httpGet.setHeader("Content-Type", "application/x-www-form-urlencoded;charset=utf-8");
httpGet.setHeader(HttpHeaders.CONNECTION, "close");
CloseableHttpResponse response = httpClient.execute(httpGet);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200) {
httpGet.abort();
throw new RuntimeException("HttpClient,error status code :" + statusCode);
}
return response;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public static String sendPost1(String url, String data, Map<String, String> header) {
String response = null;
try {
CloseableHttpClient httpclient = null;
CloseableHttpResponse httpresponse = null;
try {
httpclient = HttpClients.createDefault();
HttpPost method = new HttpPost(url);
StringEntity stringentity = new StringEntity(data, Charset.forName("UTF-8"));
stringentity.setContentEncoding("UTF-8");
for(Map.Entry<String, String> item : header.entrySet()){
method.setHeader(item.getKey(), item.getValue());
}
method.setEntity(stringentity);
httpresponse = httpclient.execute(method);
response = EntityUtils.toString(httpresponse.getEntity());
} finally {
if (httpclient != null) {
httpclient.close();
}
if (httpresponse != null) {
httpresponse.close();
}
}
} catch (Exception e) {
// throw new Exception("http link fail", e);
e.printStackTrace();
}
return response;
}
/**
*
* @param httpUrl 请求的url
* @param param form表单的参数(key,value形式)
* @return
*/
public static String doPostForm(String httpUrl, Map param, Integer expire) {
HttpURLConnection connection = null;
InputStream is = null;
OutputStream os = null;
BufferedReader br = null;
String result = null;
try {
URL url = new URL(httpUrl);
// 通过远程url连接对象打开连接
connection = (HttpURLConnection) url.openConnection();
// 设置连接请求方式
connection.setRequestMethod("POST");
// 设置连接主机服务器超时时间:15000毫秒
connection.setConnectTimeout(expire);
// 设置读取主机服务器返回数据超时时间:60000毫秒
connection.setReadTimeout(expire);
// 默认值为:false,当向远程服务器传送数据/写数据时,需要设置为true
connection.setDoOutput(true);
// 默认值为:true,当前向远程服务读取数据时,设置为true,该参数可有可无
connection.setDoInput(true);
// 设置传入参数的格式:请求参数应该是 name1=value1&name2=value2 的形式。
connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
// 设置鉴权信息:Authorization: Bearer da3efcbf-0845-4fe3-8aba-ee040be542c0
//connection.setRequestProperty("Authorization", "Bearer da3efcbf-0845-4fe3-8aba-ee040be542c0");
// 通过连接对象获取一个输出流
os = connection.getOutputStream();
// 通过输出流对象将参数写出去/传输出去,它是通过字节数组写出的(form表单形式的参数实质也是key,value值的拼接,类似于get请求参数的拼接)
os.write(createLinkString(param).getBytes());
// 通过连接对象获取一个输入流,向远程读取
if (connection.getResponseCode() == 200) {
is = connection.getInputStream();
// 对输入流对象进行包装:charset根据工作项目组的要求来设置
br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
StringBuffer sbf = new StringBuffer();
String temp = null;
// 循环遍历一行一行读取数据
while ((temp = br.readLine()) != null) {
sbf.append(temp);
sbf.append("\r\n");
}
result = sbf.toString();
}
} catch (MalformedURLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
// 关闭资源
if (null != br) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (null != os) {
try {
os.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (null != is) {
try {
is.close();
} catch (IOException e) {
e.printStackTrace();
}
}
// 断开与远程地址url的连接
connection.disconnect();
}
return result;
}
/**
* 把数组所有元素排序,并按照“参数=参数值”的模式用“&”字符拼接成字符串
* @param params 需要排序并参与字符拼接的参数组
* @return 拼接后字符串
*/
public static String createLinkString(Map<String, String> params) {
List<String> keys = new ArrayList<String>(params.keySet());
Collections.sort(keys);
StringBuilder prestr = new StringBuilder();
for (int i = 0; i < keys.size(); i++) {
String key = keys.get(i);
String value = params.get(key);
if (i == keys.size() - 1) {// 拼接时,不包括最后一个&字符
prestr.append(key).append("=").append(value);
} else {
prestr.append(key).append("=").append(value).append("&");
}
}
return prestr.toString();
}
public static String sendPost(String url, Map<String, Object> params, String charset, int ExTime) {
String content = "";
CloseableHttpClient httpClient = HttpClients.createDefault();
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(ExTime).setConnectTimeout(ExTime).build();
HttpPost httpPost = new HttpPost(url);
JSONObject jsonObject = new JSONObject();
// 通过map集成entrySet方法获取entity
Set<Map.Entry<String, Object>> entrySet = params.entrySet();
// 循环遍历,获取迭代器
for (Map.Entry<String, Object> mapEntry : entrySet) {
if(mapEntry.getValue()!=null){
jsonObject.put(mapEntry.getKey(), mapEntry.getValue());
}
}
try {
if (null != params) {
//解决中文问题。
httpPost.addHeader("Content-type","application/json; charset=utf-8");
httpPost.setHeader("Accept", "application/json");
//System.out.println("urlEncodedFormEntity:" + urlEncodedFormEntity);
httpPost.setEntity(new StringEntity(jsonObject.toString(),"UTF-8"));
httpPost.setConfig(requestConfig);
}
System.out.println("execurting request:" + httpPost.getURI());
HttpResponse httpResponse = null;
httpResponse = httpClient.execute(httpPost);
HttpEntity httpEntity = httpResponse.getEntity();
if (httpEntity != null) {
content = EntityUtils.toString(httpEntity, charset);
}
} catch (ClientProtocolException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
// 关闭连接,释放资源
try {
httpClient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return content;
}
}
package com.zzsn.leaderbase.vo;
import lombok.Data;
@Data
public class InfoExtractionParam {
private String title;
private String content;
private String id;
private String publishDate;
private String origin;
}
package com.zzsn.leaderbase.vo;
import lombok.Data;
import java.util.List;
@Data
public class SubjectInfoVo {
private String id;
//专题id
private String subjectId;
//文章链接
private String url;
//专题标题
private String title;
//专题摘要
private String summary;
//内容
private String content;
//作者
private String author;
//发布时间
private String publishDate;
//来源
private String origin;
//得分
private String score;
//企业名称
private String enterpriseName;
//倾向性
private String orientation;
//风险类型
private String riskType;
//企业性质
private String enterpriseNature;
//区域
private String area;
//行业
private String trade;
//信息类型
private String InfoType;
//专题库类型
private String libraryType;
//ids
private List<String> ids;
//置顶标识(0取消置顶 1置顶)
private String type;
private String sourceAddress;
//开始时间
private String startTime;
//结束时间
private String endTime;
//倾向性(前端交互参数约定 0:中性 1:负面 2:正面)
private Integer tendency;
//专题库类型(1:政策;2:领导讲话;3:专家观点;4:企业案例)
private Integer classificationType;
//审核操作(0:未审核 1:审核通过 2:审核未通过 3:暂定 默认值为0)
private List<Integer> checkStatusList;
private Integer checkStatus;
//删除标记(1:删除;0:保留)
private Integer deleteFlag = 0;
//国家
private String countryIds;
//城市
private String cityIds;
//企业
private String enterpriseIds;
//人物
private String characterIds;
//字段
private String fields;
//操作类型 add .update
private String action;
//日期最大值
private String maxValue;
//关键词检索范围 1标题 2正文 3 全部
private Integer searchScope;
//精确度("精确"| --)
private String searchAccuracy;
private String lang;
private String contentWithTag;
private String index;
}
...@@ -11,3 +11,21 @@ spring: ...@@ -11,3 +11,21 @@ spring:
uris: ["114.116.90.53:9200"] uris: ["114.116.90.53:9200"]
username: elastic username: elastic
password: elastic password: elastic
redis:
database: 0
host: 114.115.236.206
password: clbzzsn
port: 6379
python:
# getInfoUrl: http://114.115.130.239:1818/update_extraction/
getInfoUrl: http://192.168.1.116:1818/update_extraction/
es1:
endpoint1: 114.115.215.250
endpoint1port: 9700
endpoint2: 114.116.19.92
endpoint2port: 9700
endpoint3: 114.116.54.108
endpoint3port: 9200
username: elastic
password: zzsn9988
\ No newline at end of file
...@@ -6,18 +6,18 @@ import org.springframework.boot.test.context.SpringBootTest; ...@@ -6,18 +6,18 @@ import org.springframework.boot.test.context.SpringBootTest;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
//
@SpringBootTest //@SpringBootTest
class LeaderBaseApplicationTests { //class LeaderBaseApplicationTests {
//
@Test // @Test
void contextLoads() { // void contextLoads() {
} // }
//
//
@Test // @Test
void translate(){ // void translate(){
//
} // }
//
} //}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论