业务背景
Why? 为什么要在指定时间段消费 ?
在某些公司或单位,希望在9.00-6.00 收到消息通知(不限于钉钉 短信) 其余时间的消息进行积压到明天
大致思路
How to do ?
如果我们能够启动一个线程进行消费,在某个时间点开启线程,某个时间点结束线程。就完成了时间段消费的策略。
具体方案
引入依赖 Spring-kafka
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Bean Configuration
- 自定义 containerFactory
@Bean
public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
container.setConsumerFactory(consumerFactory);
//container.setConcurrency(10);
//禁止自动启动
container.setAutoStartup(false);
//factory.getContainerProperties().setPollTimeout(1500);
//factory.setBatchListener(true);//设置为批量消费,每个批次数量在Kafka配置参数中设置
// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//设置手动提交ackMode
return container;
}
上面包含了一些常见的KafkaContainer的配置,按需开启
- 定义消费者
//为listener定义id,containerFactory
//这里的id是后面获取需要用到的哈
@KafkaListener(id = "durable", topics = "topic.quick.durable",containerFactory = "delayContainerFactory")
- 控制消费者的启动、暂停、恢复
private KafkaListenerEndpointRegistry registry;
registry.getListenerContainer("id")
start,resume,pause 方法控制
e.g.
//定时器,每天凌晨0点开启监听
@Scheduled(cron = "0 0 0 * * ?")
public void startListener() {
log.info("开启监听");
//判断监听容器是否启动,未启动则将其启动
if (!registry.getListenerContainer(DURABLE).isRunning()) {
registry.getListenerContainer(DURABLE).start();
}
registry.getListenerContainer(DURABLE).resume();
}
//定时器,每天早上10点关闭监听
@Scheduled(cron = "0 0 10 * * ?")
public void shutDownListener() {
log.info("关闭监听");
registry.getListenerContainer(DURABLE).pause();
}
写在末尾
相同业务,需要分角色控制时间段的话 可以定制不同的topic进行区分