Apache Beam KafkaIO提及主题分区而不是主题名
创始人
2024-11-10 01:00:11
0

使用Apache Beam KafkaIO时,可以通过指定主题分区而不是主题名来读取或写入消息。以下是一个使用KafkaIO读取消息的示例代码:

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollection;

public class KafkaIOTopicPartitionExample {
  public static void main(String[] args) {
    // 创建Pipeline
    PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline pipeline = Pipeline.create(options);

    // 从Kafka主题的分区0读取消息
    String topic = "my-topic";
    int partition = 0;
    PCollection messages = pipeline.apply(
        KafkaIO.read()
            .withBootstrapServers("localhost:9092")
            .withTopicPartitions(KafkaIO.TopicPartition.of(topic, partition))
            .withKeyDeserializer(StringDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withoutMetadata()
            .commitOffsetsInFinalize()
            .withConsumerConfigUpdates(ImmutableMap.of("group.id", "my-consumer-group"))
    ).apply(Values.create());

    // 处理读取到的消息
    messages.apply(ParDo.of(new MyDoFn()));

    // 运行Pipeline
    pipeline.run().waitUntilFinish();
  }
}

上述代码中,通过KafkaIO.TopicPartition.of(topic, partition)方法指定了要读取的主题分区。

类似地,可以使用类似的方法写入消息到指定的主题分区。以下是一个使用KafkaIO写入消息的示例代码:

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollection;

public class KafkaIOTopicPartitionExample {
  public static void main(String[] args) {
    // 创建Pipeline
    PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline pipeline = Pipeline.create(options);

    // 从PCollection中写入消息到Kafka主题的分区0
    String topic = "my-topic";
    int partition = 0;
    PCollection> messages = ... // 从其他来源获取消息

    messages.apply(
        KafkaIO.write()
            .withBootstrapServers("localhost:9092")
            .withTopic(topic)
            .withKeySerializer(StringSerializer.class)
            .withValueSerializer(StringSerializer.class)
            .withProducerConfigUpdates(ImmutableMap.of("acks", "all"))
            .values()
    );

    // 运行Pipeline
    pipeline.run().waitUntilFinish();
  }
}

在上述代码中,通过withTopic(topic)方法指定了要写入的主题,然后将消息写入到指定的主题分区。

这样,就可以通过指定主题分区而不是主题名来读取或写入消息。

相关内容

热门资讯

透视模拟器!wpk辅助器下载,... 透视模拟器!wpk辅助器下载,途游四川小程序脚本辅助,攻略教程(有挂技巧);科技安装教程;13670...
透视软件!wepoker辅助透... 1、透视软件!wepoker辅助透视软件,随意玩透视辅助,微扑克教程(有挂辅助)。2、随意玩透视辅助...
透视ai代打!红龙poker作... 透视ai代打!红龙poker作弊指令,老k麻将有挂吗,攻略方法(有挂技巧)1、点击下载安装,微扑克w...
辅助透视!德普之星透视辅助软件... 1、辅助透视!德普之星透视辅助软件下载,中至赣牌圈祈福有用吗,玩家教程(有挂教程)。2、中至赣牌圈祈...
透视app!wpk辅助购买,欢... 透视app!wpk辅助购买,欢乐对决辅助,黑科技教程(有挂软件);小薇(透视辅助)致您一封信;亲爱欢...
透视好牌!智星菠萝辅助怎么买,... 这是一款非常优秀的丽水都来大菠萝脚本辅助 ia辅助检测软件,能够让你了解到丽水都来大菠萝脚本辅助中牌...
透视实锤!hhpoker辅助器... 透视实锤!hhpoker辅助器,闲来山水广西辅助,2025新版技巧(有挂软件);实战中需综合运用上述...
透视好牌!智星菠萝辅助,心悦填... 透视好牌!智星菠萝辅助,心悦填大坑辅助器,可靠技巧(有挂神器);最新版2024是一款经典耐玩的益智游...
透视最新!智星德州菠萝安装,越... 透视最新!智星德州菠萝安装,越乡游义乌辅助,新2025教程(有挂技巧);科技安装教程;1367043...
透视中牌率!wpk辅助软件,雀... 透视中牌率!wpk辅助软件,雀友会广东潮汕辅助透视,爆料教程(有挂方法);小薇(透视辅助)致您一封信...