在Kafka中,BATCH和MANUAL是两种不同的处理方式,ack和commit也是两种不同的确认方式。
BATCH是指消费者一次获取多个消息,等到一定数量的消息后再进行处理,可以提高处理效率。而MANUAL是指消费者需要手动确认每一个消息。在进行处理后,消费者需要调用ack()
方法来告诉Kafka这个消息已经被处理完成。
在确认方式上,ack是指消费者在处理完消息后,向Kafka服务器发送确认消息,告诉Kafka这个消息已经被处理完成;而commit则是指消费者向Kafka服务器提交确认,确认这批消息已经处理完毕。
下面给出两种具体的解决方案,分别是使用BATCH和使用MANUAL的:
1.使用BATCH方式:
Properties props = new Properties();
props.put("enable.auto.commit", "false");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
int batchSize = 10; // 一次获取10条消息
List> buffer = new ArrayList<>();
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
buffer.add(record);
}
if (buffer.size() >= batchSize) {
// 处理消息
processRecords(buffer);
// 确认消息已处理完
consumer.commitSync();
// 清空缓冲区
buffer.clear();
}
}
2.使用MANUAL方式:
Properties props = new Properties();
props.put("enable.auto.commit", "false");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
// 处理消息
processRecord(record);
// 手