在使用Spring Cloud Stream处理Kafka CloudEvents的过程中,由于批处理消费者未正确使用,可能会导致无法正常工作。为了解决这个问题,可以按照以下代码示例进行操作:
在application.yml中添加以下配置:
spring:
cloud:
stream:
kafka:
binder:
consumer-properties:
enable.batching: true #开启批处理
max.poll.records: 25 #每次最多拉取25条记录
auto.offset.reset: earliest #从最早的offset开始消费
在代码中使用以下注解:
@EnableBinding(value = { MySink.class })
public class BatchConsumer {
@StreamListener(value = "input", condition="(headers['ce-type']=='test.event.type')")
public void consumeEvents(List> messages) {
for (Message message : messages) {
//处理事件
}
}
}
需要注意的是,当enable.batching设置为true时,需要将@StreamListener的参数从Message
上一篇:BatchConsumer的ConsumerDefinition中使用UseMessageRetry在启动时引发错误。
下一篇:BatchConvertColumnsfromchrtonumwitheitherread_excelordplyr