业务背景

Why? 为什么要在指定时间段消费 ?

在某些公司或单位,希望在9.00-6.00 收到消息通知(不限于钉钉 短信) 其余时间的消息进行积压到明天

大致思路

How to do ?

如果我们能够启动一个线程进行消费,在某个时间点开启线程,某个时间点结束线程。就完成了时间段消费的策略。

具体方案

引入依赖 Spring-kafka

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

Bean Configuration

  1. 自定义 containerFactory
@Bean
    public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
        container.setConsumerFactory(consumerFactory);
		//container.setConcurrency(10);
        //禁止自动启动
        container.setAutoStartup(false);
		//factory.getContainerProperties().setPollTimeout(1500);
        //factory.setBatchListener(true);//设置为批量消费,每个批次数量在Kafka配置参数中设置         
        // factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//设置手动提交ackMode
        return container;
    }

上面包含了一些常见的KafkaContainer的配置,按需开启

  1. 定义消费者
//为listener定义id,containerFactory
  //这里的id是后面获取需要用到的哈
  @KafkaListener(id = "durable", topics = "topic.quick.durable",containerFactory = "delayContainerFactory")
  1. 控制消费者的启动、暂停、恢复
private KafkaListenerEndpointRegistry registry; 

registry.getListenerContainer("id")

	start,resume,pause 方法控制

e.g.

    //定时器,每天凌晨0点开启监听
    @Scheduled(cron = "0 0 0 * * ?")
    public void startListener() {
        log.info("开启监听");
        //判断监听容器是否启动,未启动则将其启动
        if (!registry.getListenerContainer(DURABLE).isRunning()) {
            registry.getListenerContainer(DURABLE).start();
        }
        registry.getListenerContainer(DURABLE).resume();
    }

    //定时器,每天早上10点关闭监听
    @Scheduled(cron = "0 0 10 * * ?")
    public void shutDownListener() {
        log.info("关闭监听");
        registry.getListenerContainer(DURABLE).pause();
    }

写在末尾

相同业务,需要分角色控制时间段的话 可以定制不同的topic进行区分