• springboot集成kafka消费手动启动停止


    项目场景:

    在月结,或者某些时候,我们需要停掉kafka所有的消费端,让其暂时停止消费,而后等月结完成,再从新对消费监听恢复,进行消费,此动作不需要重启服务,最后源码下载


    解决分析

    KafkaListenerEndpointRegistry这是kafka与spring集成的监听注册bean,可以通过它获取监听容器对象,然后对监听容器对象实行启动,暂停,恢复等操作
    1. /**
    2. * kafka服务操作类
    3. * @author liangxi.zeng
    4. */
    5. @Service
    6. @Slf4j
    7. public class KafkaService {
    8. @Autowired
    9. private KafkaListenerEndpointRegistry registry;
    10. /**
    11. * 开启消费
    12. * @param listenerId
    13. */
    14. public void start(String listenerId) {
    15. MessageListenerContainer messageListenerContainer = registry
    16. .getListenerContainer(listenerId);
    17. if(Objects.nonNull(messageListenerContainer)) {
    18. if(!messageListenerContainer.isRunning()) {
    19. messageListenerContainer.start();
    20. } else {
    21. if(messageListenerContainer.isContainerPaused()) {
    22. log.info("listenerId:{},恢复",listenerId);
    23. messageListenerContainer.resume();
    24. }
    25. }
    26. }
    27. }
    28. /**
    29. * 停止消费
    30. * @param listenerId
    31. */
    32. public void pause(String listenerId) {
    33. MessageListenerContainer messageListenerContainer = registry
    34. .getListenerContainer(listenerId);
    35. if(Objects.nonNull(messageListenerContainer) && !messageListenerContainer.isContainerPaused()) {
    36. log.info("listenerId:{},暂停",listenerId);
    37. messageListenerContainer.pause();
    38. }
    39. }
    40. }


    kafka启动,停止,恢复触发场景

    1.通过定时任务自动触发,通过@Scheduled,在某个时间点暂停kafka某个监听的消费,也可以在某个时间点恢复kafka某个监听的消费

    1. /**
    2. * 卡夫卡配置类
    3. * @author liangxi.zeng
    4. */
    5. @Configuration
    6. @EnableScheduling
    7. public class KafkaConfigure {
    8. @Autowired
    9. private KafkaService kafkaService;
    10. @Autowired
    11. private KafkaConfigParam kafkaConfigParam;
    12. @Scheduled(cron = "0/10 * * * * ?")
    13. public void startListener() {
    14. List topics = kafkaConfigParam.getStartTopics();
    15. System.out.println("开启。。。"+topics);
    16. Optional.ofNullable(topics).orElse(new ArrayList<>()).forEach(topic -> {
    17. kafkaService.start(topic);
    18. });
    19. }
    20. @Scheduled(cron = "0/10 * * * * ?")
    21. public void pauseListener() {
    22. List topics = kafkaConfigParam.getPauseTopics();
    23. System.out.println("暂停。。。"+topics);
    24. Optional.ofNullable(topics).orElse(new ArrayList<>()).forEach(topic -> {
    25. kafkaService.pause(topic);
    26. });
    27. }
    28. }

    2.通过访问接口手动触发kafka消费的启动,暂停,恢复

    1. @RequestMapping("/start/{kafkaId}")
    2. public String start(@PathVariable String kafkaId) {
    3. if(!registry.getListenerContainer(kafkaId).isRunning()) {
    4. registry.getListenerContainer(kafkaId).start();
    5. } else {
    6. registry.getListenerContainer(kafkaId).resume();
    7. }
    8. return "ok";
    9. }
    10. @RequestMapping("/pause/{kafkaId}")
    11. public String pause(@PathVariable String kafkaId) {
    12. registry.getListenerContainer(kafkaId).pause();
    13. return "ok";
    14. }

    3.监听nacos配置文件,完成动态的启停操作

    1. /**
    2. * nacos配置变更监听
    3. * @author liangxi.zeng
    4. */
    5. @Component
    6. @Slf4j
    7. public class NacosConfigListener {
    8. @Autowired
    9. private NacosConfigManager nacosConfigManager;
    10. @Autowired
    11. private KafkaService kafkaService;
    12. @Autowired
    13. private KafkaStartPauseParam kafkaStartPauseParam;
    14. /**
    15. * 分隔符
    16. */
    17. private static final String SPLIT=",";
    18. private static final String GROUP = "DEFAULT_GROUP";
    19. /**
    20. * nacos 配置文件监听
    21. * @throws NacosException
    22. */
    23. @PostConstruct
    24. private void reloadConfig() throws NacosException {
    25. nacosConfigManager.getConfigService().addListener(kafkaStartPauseParam.getDataId(), GROUP, new AbstractConfigChangeListener() {
    26. @Override
    27. public void receiveConfigChange(final ConfigChangeEvent event) {
    28. ConfigChangeItem pauseListeners = event.getChangeItem(KafkaStartPauseParam.PREFIX+".pause-listener");
    29. ConfigChangeItem startListeners = event.getChangeItem(KafkaStartPauseParam.PREFIX+".start-listener");
    30. if(Objects.nonNull(pauseListeners)) {
    31. pause(pauseListeners);
    32. }
    33. if(Objects.nonNull(startListeners)) {
    34. start(startListeners);
    35. }
    36. }
    37. });
    38. }
    39. /**
    40. * 暂停消费
    41. * @param pauseListeners
    42. */
    43. private void pause(ConfigChangeItem pauseListeners) {
    44. String pauseValue = pauseListeners.getNewValue();
    45. log.info("暂停listener:{}",pauseValue);
    46. if(!StringUtils.isEmpty(pauseValue)) {
    47. String[] pauseListenerIds = pauseValue.split(SPLIT);
    48. for(String pauseListenerId:pauseListenerIds) {
    49. kafkaService.pause(pauseListenerId);
    50. }
    51. }
    52. }
    53. /**
    54. * 恢复消费
    55. * @param startListeners
    56. */
    57. private void start(ConfigChangeItem startListeners) {
    58. String startValue = startListeners.getNewValue();
    59. log.info("启动listener:{}",startValue);
    60. if(!StringUtils.isEmpty(startValue)) {
    61. String[] startListenerIds = startValue.split(SPLIT);
    62. for(String startListenerId:startListenerIds) {
    63. kafkaService.start(startListenerId);
    64. }
    65. }
    66. }
    67. }

    配置类

    1. /**
    2. * kafka配置参数
    3. * @author liangxi.zeng
    4. */
    5. @ConfigurationProperties(prefix = KafkaStartPauseParam.PREFIX)
    6. @Data
    7. @Component
    8. @RefreshScope
    9. public class KafkaStartPauseParam {
    10. public static final String PREFIX = "tcl.kafka";
    11. private String pauseListener;
    12. private String startListener;
    13. private String dataId;
    14. }

    源码分析

    1.springboot集成kafka,集成配置类org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration

    2.@Import({KafkaAnnotationDrivenConfiguration.class})

    1. @Configuration(
    2. proxyBeanMethods = false
    3. )
    4. @EnableKafka
    5. @ConditionalOnMissingBean(
    6. name = {"org.springframework.kafka.config.internalKafkaListenerAnnotationProcessor"}
    7. )
    8. static class EnableKafkaConfiguration {
    9. EnableKafkaConfiguration() {
    10. }
    11. }
    1. @Target(ElementType.TYPE)
    2. @Retention(RetentionPolicy.RUNTIME)
    3. @Documented
    4. @Import(KafkaListenerConfigurationSelector.class)
    5. public @interface EnableKafka {
    6. }
    1. @Order
    2. public class KafkaListenerConfigurationSelector implements DeferredImportSelector {
    3. @Override
    4. public String[] selectImports(AnnotationMetadata importingClassMetadata) {
    5. return new String[] { KafkaBootstrapConfiguration.class.getName() };
    6. }
    7. }
    1. public class KafkaBootstrapConfiguration implements ImportBeanDefinitionRegistrar {
    2. @Override
    3. public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
    4. if (!registry.containsBeanDefinition(
    5. KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {
    6. registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
    7. new RootBeanDefinition(KafkaListenerAnnotationBeanPostProcessor.class));
    8. }
    9. if (!registry.containsBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
    10. registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
    11. new RootBeanDefinition(KafkaListenerEndpointRegistry.class));
    12. }
    13. }
    14. }

    3.KafkaListenerAnnotationBeanPostProcessor这个类,就是消费监听的解析类,在此类中,将监听的方法放入了监听容器MessageListenerContainer

    4.监听容器中有ListenerConsumer监听消费者的属性,此内部内实现了SchedulingAwareRunnable接口,此接口继承了Runnable接口,完成了定时异步消费等操作

    1. @Override
    2. public void run() {
    3. while (isRunning()) {
    4. try {
    5. pollAndInvoke();
    6. }
    7. }
    8. wrapUp();
    9. }
    10. protected void pollAndInvoke() {
    11. if (!this.autoCommit && !this.isRecordAck) {
    12. processCommits();
    13. }
    14. idleBetweenPollIfNecessary();
    15. if (this.seeks.size() > 0) {
    16. processSeeks();
    17. }
    18. pauseConsumerIfNecessary();
    19. this.lastPoll = System.currentTimeMillis();
    20. this.polling.set(true);
    21. ConsumerRecords records = doPoll();
    22. if (!this.polling.compareAndSet(true, false) && records != null) {
    23. /*
    24. * There is a small race condition where wakeIfNecessary was called between
    25. * exiting the poll and before we reset the boolean.
    26. */
    27. if (records.count() > 0) {
    28. this.logger.debug(() -> "Discarding polled records, container stopped: " + records.count());
    29. }
    30. return;
    31. }
    32. resumeConsumerIfNeccessary();
    33. debugRecords(records);
    34. if (records != null && records.count() > 0) {
    35. if (this.containerProperties.getIdleEventInterval() != null) {
    36. this.lastReceive = System.currentTimeMillis();
    37. }
    38. invokeListener(records);
    39. }
    40. else {
    41. checkIdle();
    42. }
    43. }


    遗留问题

    在对kafka消费监听启停的过程中,发现当暂停消费的时候,对于存量的topic还是会消费完,不会立即停止,只是对于新产生的topic不会再消费了

    源码地址 

    kafka消费热启停组件下载

  • 相关阅读:
    react 中setState 的三种写法
    【开发技术】2万字详细介绍Docker 和 web项目的部署监控,docker部署,拉取kafana,prometheus镜像监控
    Pandas 操作数据(三)
    Linux学习记录——이십구 网络基础(2)
    洛谷_P1339 [USACO09OCT]Heat Wave G_最短路
    new`是如何创建对象实例的?
    Web前端:如何为Web应用程序开发找到合适的技术堆栈?
    Java“牵手”淘宝商品评论数据采集+淘宝商品评价接口,淘宝商品追评数据接口,行业商品质检接口,API接口申请指南
    Presto资源管理之Resource Groups And Selector
    pytorch训练错误记录
  • 原文地址:https://blog.csdn.net/zengliangxi/article/details/126711032