提交 363efb25 作者: JQW

模型开发

上级 2deaf9fb
......@@ -70,7 +70,7 @@ public class MemSubjectController extends JeecgController<MemSubject, IMemSubjec
LoginUser user = (LoginUser) SecurityUtils.getSubject().getPrincipal();
LambdaQueryWrapper<MemSubject> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(MemSubject::getUserId, user.getId());
queryWrapper.select(MemSubject::getSubjectId);
queryWrapper.select(MemSubject::getLabelId);
return Result.OK(memSubjectService.list(queryWrapper).stream().map(m -> m.getLabelId()).collect(Collectors.toList()));
}
......@@ -94,9 +94,11 @@ public class MemSubjectController extends JeecgController<MemSubject, IMemSubjec
memSubjectService.remove(queryWrapper);
List<MemSubject> list = new ArrayList<>();
jsonArray.forEach(x -> {
Map<String,String> jo =(Map<String,String>)x;
MemSubject memSubject = new MemSubject();
memSubject.setUserId(user.getId());
memSubject.setSubjectId(x.toString());
memSubject.setSubjectId(jo.get("subjectId"));
memSubject.setLabelId(jo.get("labelId"));
list.add(memSubject);
});
//添加新数据
......
......@@ -2,20 +2,21 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.zzsn.clb.member.mapper.SubjectMapper">
<select id="getSubjectList" resultType="java.util.Map">
SELECT s.id subjectId,
s.subject_name subjectName,
lb.label_mark as labelMark,
le.id labelId,
le.`name` labelName
SELECT s.id subjectId,
s.subject_name subjectName,
lb.label_name labelTypeName,
lb.label_mark as labelMark,
le.id labelId,
le.`name` labelName
FROM `subject` s
INNER JOIN subject_type_map stm ON s.id = stm.subject_id
INNER JOIN subject_model_map smm ON s.id = smm.subject_id
INNER JOIN python_model pm ON smm.model_id = pm.id
INNER JOIN sys_base_label_type lb ON lb.id = pm.label_id
INNER JOIN sys_base_label_type lb ON lb.top_id = pm.label_id
INNER JOIN sys_base_label_type_map lm ON lb.id = lm.label_id
INNER JOIN label_entity le ON le.id = lm.relation_id
WHERE stm.type_id = '1627954091596574722'
AND le.`status` = 1
WHERE le.`status` = 1
and pm.type = 3
ORDER BY s.id,
le.sort ASC
</select>
......
package com.zzsn.clb.task;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.zzsn.clb.common.util.HttpUtil;
import com.zzsn.clb.member.mapper.SubjectMapper;
import com.zzsn.common.Constants;
import com.zzsn.utils.MapToBeanUtil;
import javafx.beans.binding.ObjectExpression;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.collapse.CollapseBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.jeecg.common.util.DateUtils;
import org.jeecg.common.util.RedisUtil;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
/***
* 标签类的模型处理
*/
@Component
@EnableScheduling
@Slf4j
@DS("multi-clb-project")
public class ModelLabelTask {
String redisCacheName = "job:model:date:label";
String modelUrl = "http://114.116.9.59:4000/jxyq_label_app";
String queryBgDate = null;
String queryEndate = null;
@Resource
private RestHighLevelClient client;
@Resource
SubjectMapper subjectMapper;
@Resource
RedisUtil redisUtil;
@Scheduled(cron = "0 */10 * * * ?")// 每10分钟执行一次
//@Bean
public void run() {
if (redisUtil.get(redisCacheName) != null) {
queryBgDate = redisUtil.get(redisCacheName).toString();
} else {
queryBgDate = "2000-01-01T00:00:00";
}
Date dateNow = new Date();
queryEndate = DateUtils.formatDate(dateNow, "yyyy-MM-dd") + "T" + DateUtils.formatDate(dateNow, "HH:mm:ss");
//获取所有的专题标签信息
List<Map<String, Object>> subjectLabelList = subjectMapper.getSubjectList();
//获取已经采集的数据进行模型处理
processInfo(subjectLabelList);
//保存执行数据的时间
redisUtil.set(redisCacheName, queryEndate);
}
/***
* 获取已经采集的数据进行模型处理
* @param subjectLabelList
*/
private void processInfo(List<Map<String, Object>> subjectLabelList) {
int current = 1;
int size = 20;
int total = 0;
List<Map<String, Object>> dataList = new ArrayList<>();
while (true) {
//1.分页查询资讯列表
IPage<Map<String, Object>> pageList = getSubjectArticleList(current, size);
log.info("数量条数:{}", pageList.getRecords().size());
if (pageList == null || pageList.getRecords() == null || pageList.getRecords().size() == 0) {
break;
}
List<Map<String, Object>> list = pageList.getRecords();
dataList.clear();
JSONObject jo = new JSONObject();
for (Map<String, Object> map : list) {
jo.put("title", map.get("title").toString());
jo.put("content", map.get("content").toString());
try {
JSONObject result = JSONObject.parseObject(HttpUtil.doPost(modelUrl, jo, 60000));
if (result.getInteger("code") == 200 && result.getString("flag").equals("1")) {
setEsDataList(dataList, map, result.getJSONArray("label_info"), subjectLabelList);
}
} catch (IOException e) {
e.printStackTrace();
}
}
total += dataList.size();
log.info("页码:{},本次:{},合计:{}", current, dataList.size(), total);
addElasticsearchData(dataList);
current++;
}
}
/***
* 对模型返回数据进行处理
* @param dataList
* @param mapData
* @param jsonArray
* @param subjectLabelList
*/
private void setEsDataList(List<Map<String, Object>> dataList, Map<String, Object> mapData, JSONArray jsonArray, List<Map<String, Object>> subjectLabelList) {
//遍历数据模型返回的数据
for (Object x : jsonArray) {
JSONObject jo = (JSONObject) x;
if (jo.get("level1") == null || jo.get("level2") == null)
return;
//获取标签分类数组
String[] labelTypeNames = {jo.getString("level1")};
if (jo.getString("level1").contains("|")) {
labelTypeNames = jo.getString("level1").split("\\|");
}
//获取标签数组
String[] labelNames = {jo.getString("level2")};
if (jo.getString("level2").contains("|")) {
labelNames = jo.getString("level2").split("\\|");
}
//遍历标签分类
for (String labelTypeName : labelTypeNames) {
List<Map<String, Object>> returnList = this.setEsDataMap(mapData, labelTypeName, labelNames, subjectLabelList);
if (returnList != null)
dataList.addAll(returnList);
}
}
}
/***
* 装配每个数据对象
* @param mapData
* @param labelTypeName
* @param labelNames
* @param subjectLabelList
* @return
*/
private List<Map<String, Object>> setEsDataMap(Map<String, Object> mapData, String labelTypeName, String[] labelNames, List<Map<String, Object>> subjectLabelList) {
List<Map<String, Object>> labelList = null; //标签集合
//装配标签,处理一条信息属于多个标签
for (String labelName : labelNames) {
Optional<Map<String, Object>> tmpOpt = subjectLabelList.stream().filter(f -> f.get("labelName").equals(labelName) && f.get("labelTypeName").equals(labelTypeName)).findFirst();
//如果标签存在处理数据,否则执行下一个循环
if (!tmpOpt.isPresent())
continue;
Map<String, Object> tmpMap = tmpOpt.get();
//装配标签数据
Map<String, Object> mapLabel = new HashMap<String, Object>() {{
put("labelId", tmpMap.get("labelId"));
put("labelMark", tmpMap.get("labelMark"));
put("labelName", tmpMap.get("labelName"));
}};
if (labelList == null)
labelList = new ArrayList<>();
labelList.add(mapLabel);
}
//如果标签不存在,返回null
if (labelList == null)
return null;
//装配入库信息,处理一条标签分类信息属于多个专题,如果这几类标签都属于一个专题,会出现重复情况,除非一级栏目是专题(负面舆情就有问题了)
List<String> subjectIdList = subjectLabelList.stream().filter(f -> f.get("labelTypeName").equals(labelTypeName)).map(m -> m.get("subjectId").toString()).distinct().collect(Collectors.toList());
return getMultSubjectData(subjectIdList, mapData, labelList);
}
/***
*
* @param subjectIdList
* @param mapData
* @param labelList
* @return
*/
private List<Map<String, Object>> getMultSubjectData(List<String> subjectIdList, Map<String, Object> mapData, List<Map<String, Object>> labelList) {
List<Map<String, Object>> dataList = new ArrayList<>();
for (String id : subjectIdList) {
Map<String, Object> mapSubject = new HashMap<>();
mapSubject.putAll(mapData);
//指定所需的其它信息
mapSubject.put("subjectId", id);
mapSubject.put("labels", labelList);
mapSubject.put("deleteFlag", 0);
mapSubject.put("checkStatus", 0);
mapSubject.put("topNum", 0);
mapSubject.put("flag", "1");
mapSubject.put("id", String.valueOf(IdWorker.getId()));
dataList.add(mapSubject);
}
return dataList;
}
public void addElasticsearchData(List<Map<String, Object>> addEsDataMapList) {
try {
//创建请求
BulkRequest bulkRequest = new BulkRequest();
//创建index请求 千万注意,这个写在循环外侧,否则UDP协议会有丢数据的情况,看运气
IndexRequest requestData = null;
for (Map<String, Object> addEsDataMap : addEsDataMapList) {//添加数据
requestData = new IndexRequest(Constants.ES_DATA_FOR_SUBJECT).id(addEsDataMap.get("id").toString()).source(addEsDataMap, XContentType.JSON);
bulkRequest.add(requestData);
}
log.info("es同步数据数量:{}", bulkRequest.numberOfActions());
//设置索引刷新规则
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
//分批次提交,数量控制
if (bulkRequest.numberOfActions() >= 1) {
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
// log.info("es同步数据结果:{}", bulkResponse.hasFailures());
}
} catch (Exception e) {
e.printStackTrace();
log.error("es同步数据执行失败:{}", addEsDataMapList);
}
}
/***
* @param current
* @param size
* @return
*/
private IPage<Map<String, Object>> getSubjectArticleList(Integer current, Integer size) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolBuilder = (BoolQueryBuilder) searchSourceBuilder.query();
if (boolBuilder == null) {
boolBuilder = QueryBuilders.boolQuery();
}
String timeFiled = "createDate";
boolBuilder.must(QueryBuilders.rangeQuery(timeFiled).gte(queryBgDate));
boolBuilder.must(QueryBuilders.rangeQuery(timeFiled).lt(queryEndate));
//boolBuilder.must(QueryBuilders.termsQuery("id", new String[]{"23022700000232"}));
//分页
searchSourceBuilder.from((current - 1) * size).size(size);
searchSourceBuilder.sort("createDate", SortOrder.DESC);
searchSourceBuilder.collapse(new CollapseBuilder("sourceAddress.keyword"));//去重 collapse只允许keywords或者numbers类型,字段xxx不是keywords或者numbers类型
searchSourceBuilder.query(boolBuilder);
return getSubjectArticleList(searchSourceBuilder);
}
/***
* 执行专题信息查询操作
* @param searchSourceBuilder
* @return
*/
private IPage<Map<String, Object>> getSubjectArticleList(SearchSourceBuilder searchSourceBuilder) {
try {
SearchRequest searchRequest = new SearchRequest(Constants.ES_BASE_DATA);
searchSourceBuilder.trackTotalHits(true);
searchRequest.source(searchSourceBuilder);
SearchResponse search = client.search(searchRequest, RequestOptions.DEFAULT);
List<Map<String, Object>> records = new ArrayList<>();
for (SearchHit documentFields : search.getHits().getHits()) {
records.add(documentFields.getSourceAsMap());
}
IPage<Map<String, Object>> resPage = new Page<>();
resPage.setTotal(search.getHits().getTotalHits().value);
resPage.setCurrent(searchSourceBuilder.from() + 1);
resPage.setSize(searchSourceBuilder.size());
resPage.setRecords(records);
return resPage;
} catch (IOException e) {
log.error("查询es专题资讯数据失败,e:{}", e);
}
return null;
}
}
......@@ -43,9 +43,9 @@ public class ImportExcelData {
SubjectMapper subjectMapper;
public void importData() {
String path = "C:\\Users\\EDY\\Documents\\WeChat Files\\wxid_7d7tl0zyfdxb21\\FileStorage\\File\\2023-02\\克虏宝汇总审核后数据2次修改.xlsx";
String path = "C:\\Users\\EDY\\Documents\\WeChat Files\\wxid_7d7tl0zyfdxb21\\FileStorage\\File\\2023-02\\管理动态-下属机构工作.xlsx";
try {
impData(401, 10, path, 410);
impData(1, 10, path, 6);
} catch (IOException e) {
e.printStackTrace();
}
......@@ -202,7 +202,7 @@ public class ImportExcelData {
subjectData.setFlag("1");
log.info(JSON.toJSONString(subjectData));
IndexRequest request = new IndexRequest("subjectdatabase1")
IndexRequest request = new IndexRequest("subjectdatabase")
.id(id)
.source(JSON.toJSONString(subjectData), XContentType.JSON);
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
......
......@@ -2,5 +2,7 @@ package com.zzsn.common;
public class Constants {
//处理后的专题资讯信息存储索引
public final static String ES_DATA_FOR_SUBJECT = "subjectdatabase";
public final static String ES_DATA_FOR_SUBJECT = "tmp_subjectdatabase";
//新采集库(22.04.23)
public final static String ES_BASE_DATA = "basedata";
}
package com.zzsn.utils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.beanutils.BeanMap;
import org.apache.commons.beanutils.BeanUtils;
import java.util.Map;
/**
* @author 闫鑫
* @date 2022/8/26 12:51
*/
@Slf4j
public class MapToBeanUtil {
/**
* 把map转为Bean对象
*
* @param map 待转换的map
* @param beanClass class类型
* @param <T> 返回值类型
* @return 返回值
*/
public static <T> T mapToObject(Map<String, Object> map, Class<T> beanClass) {
try {
if (map == null || map.size() == 0) {
return null;
}
T obj = beanClass.newInstance();
BeanUtils.populate(obj, map);
return obj;
} catch (Exception e) {
log.error("map->Bean转换失败,map:{},e:{}", map, e);
}
return null;
}
/**
* 对象转map
* @param obj
* @return
*/
public static Map<?, ?> objectToMap(Object obj) {
if (obj == null) {
return null;
}
return new BeanMap(obj);
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论