kafka手动拉取消费Topic
一般在SpringBoot使用kafka,通常用
@KafkaListener
注解来进行监听消费。然而某些时候,我们不需要监听而要以定时拉取的方式进行消费,本文主要就是简单记录此方式的实现方法。
//批次大小
private static Integer batchSize = 3;
//批次时间
private static Integer batchTime = 5;
@Resource
private KafkaProperties kafkaProperties;
@Test
void kafkaTest() {
//配置消费者
Map<String, Object> properties = kafkaProperties.buildConsumerProperties();
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");//指定消费组
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, batchSize); //指定批次消费条数
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); //禁用自动提交
//建立消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
//获取所有partition信息
List<PartitionInfo> partitionList = kafkaConsumer.partitionsFor("test-consumer");
Map<TopicPartition, Integer> topicPartitionMap = MapUtil.newHashMap();
partitionList.forEach(item
-> topicPartitionMap.put(new TopicPartition(item.topic(), item.partition()), item.partition()));
//订阅topic并设置起始offset
kafkaConsumer.assign(topicPartitionMap.keySet());
topicPartitionMap.forEach(kafkaConsumer::seek);
//启动消费线程(仅用作示例)
((Runnable) () -> {
Duration duration = Duration.ofSeconds(batchTime);
long batchTimeMs = batchTime * 1000L;
Map<Integer, ConsumerRecord<String, String>> recordMap = MapUtil.newHashMap();
while (true) {
try {
TimeInterval interval = DateUtil.timer();
ConsumerRecords<String, String> records = kafkaConsumer.poll(duration);
int count = records.count();
log.info("测试消费获取到数据 => {} 条", count);
if (count > 0) {
//处理数据
List<String> values = CollUtil.newArrayList();
records.forEach(item -> values.add(item.value()));
//记录当前批次每个Partition最小offset
for (ConsumerRecord<String, String> item : records) {
values.add(item.value());
if (recordMap.containsKey(item.partition())) {
ConsumerRecord<String, String> original = recordMap.get(item.partition());
if (item.offset() < original.offset()) {
recordMap.put(item.partition(), item);
}
} else {
recordMap.put(item.partition(), item);
}
}
//执行业务,抛出异常
throw new RuntimeException("测试错误");
//同步提交offset
kafkaConsumer.commitSync();
//正常提交后清除记录
recordMap.clear();
}
//批次消费达到上限,不休眠直接进行下一次消费
if (batchSize == count) {
continue;
}
//计算消费耗时并休眠
long used = interval.intervalMs();
if (used < batchTimeMs) {
ThreadUtil.safeSleep(batchTimeMs - used);
}
} catch (Exception e) {
log.error("消费出错 => {}", e.getMessage());
recordMap.forEach((k, v) -> kafkaConsumer.seek(new TopicPartition(v.topic(), v.partition()), v.offset()));
log.error(ExceptionUtil.stacktraceToString(e));
ThreadUtil.safeSleep(batchTimeMs);
}
}
}).run();
}
(备注:主要涉及依赖:spring-kafka、hutool)
文章转载自我的个人博客:https://blog.fordes.top,欢迎访问交流,文章如有谬误请务必指出~