提交 498019c9 作者: wangwenqi

待审核人物kafka手动提交偏移量

上级 6d4ce5d9
package com.zzsn.leaderbase.config;
/**
* @Author: wangwenqi
* @CreateTime: 2025-03-13
* @Description: TODO
* @Version: 1.0
*/
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Value("${spring.kafka.properties.security.protocol}")
private String protocol;
@Value("${spring.kafka.properties.sasl.mechanism}")
private String mechanism;
@Value("${spring.kafka.properties.sasl.jaas.config}")
private String jaas;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String reset;
@Bean
public ConsumerFactory<String, String> commitConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, reset);
props.put("security.protocol", protocol);
props.put("sasl.mechanism", mechanism);
props.put("sasl.jaas.config", jaas);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> commitListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(commitConsumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
...@@ -15,6 +15,7 @@ import org.apache.commons.lang3.StringUtils; ...@@ -15,6 +15,7 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.*; import java.util.*;
...@@ -41,10 +42,12 @@ public class PersonOfJobChangeKafkaConsumer { ...@@ -41,10 +42,12 @@ public class PersonOfJobChangeKafkaConsumer {
private String USERSERVER_BASEDATA = "userserver_basedata"; private String USERSERVER_BASEDATA = "userserver_basedata";
@KafkaListener(topics = "personOfJobChangeTopic", groupId = "personGroup") @KafkaListener(topics = "personOfJobChangeTopic", groupId = "personGroup",
public void listen(String message) { containerFactory = "commitListenerContainerFactory")
public void listen(String message, Acknowledgment ack) {
log.info("kafka pull personOfJobChangeTopic: {} ......", message.substring(0,30)); log.info("kafka pull personOfJobChangeTopic: {} ......", message.substring(0,30));
getData(message); getData(message);
ack.acknowledge();
} }
public void getData(String message) { public void getData(String message) {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论