不重新启动消费者的情况下重置Kafka偏移量。
创始人
2025-01-12 14:00:15
0

在Kafka中,可以使用seek()方法来重置消费者的偏移量,而无需重新启动消费者。下面是一个示例代码:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Properties;

public class OffsetResetWithoutRestart {
    public static void main(String[] args) {
        // Kafka集群的地址
        String bootstrapServers = "localhost:9092";
        // 消费者配置
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        // 创建消费者
        KafkaConsumer consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Arrays.asList("test-topic"));

        // 消费者在某个分区上重置偏移量
        long newOffset = 12345L;
        TopicPartition partition = new TopicPartition("test-topic", 0);
        consumer.seek(partition, newOffset);

        // 消费消息
        while (true) {
            ConsumerRecords records = consumer.poll(100);
            for (ConsumerRecord record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}

在上面的示例中,我们首先创建了一个KafkaConsumer实例,并订阅了test-topic主题。然后,我们使用seek()方法将消费者在分区0上的偏移量重置为12345。最后,我们通过循环调用poll()方法来消费消息。注意,我们没有重新启动消费者,而是直接使用seek()方法来重置偏移量。

相关内容

热门资讯

透视肯定(we辅助poker德... 1、透视肯定(we辅助poker德之星)wpk辅助器是真的(详细辅助扑克教程)一向有挂;该软件可以轻...
wepoke辅助插件!微扑克辅... wepoke辅助插件!微扑克辅助,(来玩德州app)切实有挂(详细透视辅助教程);支持多人共享记分板...
透视最新(wepoke有挂)德... 透视最新(wepoke有挂)德扑之星开房间教程(详细辅助详细教程)原先是真的有挂1、下载好辅助软件之...
wpk真的有外挂!wpk到底有... WePoker透视辅助版本解析‌,wpk真的有外挂!wpk到底有没有辅助,(wEPoke)都是真的是...
透视有挂(微扑克德州专用辅助器... 透视有挂(微扑克德州专用辅助器)来玩app辅助(详细辅助分享教程)真是是真的有挂您好,,确实是有挂的...
微扑克辅助机器人!EV扑克辅助... 微扑克辅助机器人!EV扑克辅助软件,(aapOker)切实是真的有挂(详细辅助细节方法);精心打造了...
透视ai(wpk ai辅助)w... 透视ai(wpk ai辅助)wpk辅助(详细辅助安装教程)好像真的是有挂1)辅助挂:进一步探索辅助透...
微扑克ai辅助!AA POKE... 微扑克ai辅助!AA POKER下载软件,(AAPOKer)一向是真的有挂(详细辅助玩家教你);超受...
透视了解(wpk提高胜率)微扑... 透视了解(wpk提高胜率)微扑克模拟器是什么(详细辅助德州教程)先前是真的有挂;一、有挂的是的,亲,...
微扑克辅助软件!德州wpk德州... 微扑克辅助软件!德州wpk德州真的,(AApoker)原来真的有挂(详细透视教你攻略)1、许多玩家不...