提交 f2addc94 作者: obcy

创建专题,开启专题,发起数据处理调度

上级 4a009e24
...@@ -77,4 +77,5 @@ public class Constants { ...@@ -77,4 +77,5 @@ public class Constants {
public static final String KEY_WORDS_TO_REDIS_PREFIX = "KEY_WORDS_TO_REDIS::"; public static final String KEY_WORDS_TO_REDIS_PREFIX = "KEY_WORDS_TO_REDIS::";
//关键词采集通道 //关键词采集通道
public static final String KEY_WORDS_COLLECT_TOPIC = "eventKeyWordsInfo"; public static final String KEY_WORDS_COLLECT_TOPIC = "eventKeyWordsInfo";
public static final String EVENT_SUBJECT_MODEL = "eventSubjectModel";
} }
...@@ -143,6 +143,7 @@ public class EventDataController { ...@@ -143,6 +143,7 @@ public class EventDataController {
iXxlJobInfoService.keyWordsInsert(redisKeywordDTO); iXxlJobInfoService.keyWordsInsert(redisKeywordDTO);
//为了立即响应,关键词新增时放入一个首次录入消息队列 //为了立即响应,关键词新增时放入一个首次录入消息队列
kafkaTemplate.send(Constants.KEY_WORDS_COLLECT_TOPIC, JSON.toJSONString(redisKeywordDTO)); kafkaTemplate.send(Constants.KEY_WORDS_COLLECT_TOPIC, JSON.toJSONString(redisKeywordDTO));
kafkaTemplate.send(Constants.EVENT_SUBJECT_MODEL, event.getEventCode());
}); });
return Result.OK(); return Result.OK();
} else { } else {
......
...@@ -166,6 +166,7 @@ public class EventManageController { ...@@ -166,6 +166,7 @@ public class EventManageController {
iXxlJobInfoService.keyWordsInsert(redisKeywordDTO); iXxlJobInfoService.keyWordsInsert(redisKeywordDTO);
//为了立即响应,关键词新增时放入消息队列 //为了立即响应,关键词新增时放入消息队列
kafkaTemplate.send(Constants.KEY_WORDS_COLLECT_TOPIC, JSON.toJSONString(redisKeywordDTO)); kafkaTemplate.send(Constants.KEY_WORDS_COLLECT_TOPIC, JSON.toJSONString(redisKeywordDTO));
kafkaTemplate.send(Constants.EVENT_SUBJECT_MODEL, event.getEventCode());
}); });
return Result.OK(); return Result.OK();
...@@ -246,6 +247,9 @@ public class EventManageController { ...@@ -246,6 +247,9 @@ public class EventManageController {
KeywordsVO keywordsVO = keyWordsService.keywordInfoByEventId(eventId); KeywordsVO keywordsVO = keyWordsService.keywordInfoByEventId(eventId);
iXxlJobInfoService.update(Wrappers.<XxlJobInfo>lambdaUpdate().eq(XxlJobInfo::getInfoSourceCode, keywordsVO.getWordsCode()) iXxlJobInfoService.update(Wrappers.<XxlJobInfo>lambdaUpdate().eq(XxlJobInfo::getInfoSourceCode, keywordsVO.getWordsCode())
.set(XxlJobInfo::getTriggerStatus, status)); .set(XxlJobInfo::getTriggerStatus, status));
if (1==status){
kafkaTemplate.send(Constants.EVENT_SUBJECT_MODEL, event.getEventCode());
}
}); });
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论