提交 056629a1 作者: 925993793@qq.com

事件脉络消费处理逻辑修改

上级 38495c56
...@@ -79,15 +79,23 @@ public class KafkaConsumer { ...@@ -79,15 +79,23 @@ public class KafkaConsumer {
public void eventContext(ConsumerRecord<String, String> record) { public void eventContext(ConsumerRecord<String, String> record) {
String value = record.value(); String value = record.value();
if (StringUtils.isNotEmpty(value)) { if (StringUtils.isNotEmpty(value)) {
String subjectId;
Integer category = 2;
try { try {
List<SubjectAnalysis> subjectAnalyses = JSON.parseArray(value, SubjectAnalysis.class); List<SubjectAnalysis> subjectAnalyses = JSON.parseArray(value, SubjectAnalysis.class);
for (SubjectAnalysis e : subjectAnalyses) { subjectId = subjectAnalyses.get(0).getSubjectId();
if (exist(e)) { subjectAnalyses.forEach(e -> {
e.setTitle(removeNonBmpUniCodes(e.getTitle())); e.setCategory(category);
subjectAnalysisService.save(e); e.setTitle(removeNonBmpUniCodes(e.getTitle()));
} });
LambdaQueryWrapper<SubjectAnalysis> queryWrapper = Wrappers.lambdaQuery();
queryWrapper.eq(SubjectAnalysis::getSubjectId, subjectId).eq(SubjectAnalysis::getCategory, category);
int count = subjectAnalysisService.count(queryWrapper);
if (count > 0) {
subjectAnalysisService.remove(queryWrapper);
} }
log.info("id为-{}-的专题,此次-事件脉络-数据更新完成", subjectAnalyses.get(0).getSubjectId()); subjectAnalysisService.saveBatch(subjectAnalyses);
log.info("id为-{}-的专题,此次-事件脉络-数据更新完成", subjectId);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
...@@ -103,7 +111,7 @@ public class KafkaConsumer { ...@@ -103,7 +111,7 @@ public class KafkaConsumer {
public void eventContext_fake(ConsumerRecord<String, String> record) { public void eventContext_fake(ConsumerRecord<String, String> record) {
String value = record.value(); String value = record.value();
if (StringUtils.isNotEmpty(value)) { if (StringUtils.isNotEmpty(value)) {
String subjectId = null; String subjectId;
Integer category = 3; Integer category = 3;
try { try {
List<SubjectAnalysis> subjectAnalyses = JSON.parseArray(value, SubjectAnalysis.class); List<SubjectAnalysis> subjectAnalyses = JSON.parseArray(value, SubjectAnalysis.class);
...@@ -119,10 +127,10 @@ public class KafkaConsumer { ...@@ -119,10 +127,10 @@ public class KafkaConsumer {
subjectAnalysisService.remove(queryWrapper); subjectAnalysisService.remove(queryWrapper);
} }
subjectAnalysisService.saveBatch(subjectAnalyses); subjectAnalysisService.saveBatch(subjectAnalyses);
log.info("id为-{}-的专题,此次-伪事件脉络-数据更新完成", subjectId);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
log.info("id为-{}-的专题,此次-伪事件脉络-数据更新完成", subjectId);
} }
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论