要将数据使用Avro序列化并发送到Kafka主题,可以使用以下步骤:
org.apache.kafka
kafka-clients
2.8.0
io.confluent
kafka-avro-serializer
6.2.0
String avroSchema = "{\"type\":\"record\",\"name\":\"MyRecord\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}, {\"name\":\"field2\",\"type\":\"int\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(avroSchema);
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
Producer producer = new KafkaProducer<>(props);
GenericRecord record = new GenericData.Record(schema);
record.put("field1", "value1");
record.put("field2", 123);
ProducerRecord producerRecord = new ProducerRecord<>("topic-name", "key", record);
producer.send(producerRecord);
producer.flush();
producer.close();
要从Kafka主题中读取并反序列化Avro数据,可以执行以下步骤:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put("specific.avro.reader", "false");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("topic-name"));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
GenericRecord avroRecord = record.value();
// 反序列化的逻辑
String field1Value = avroRecord.get("field1").toString();
int field2Value = (int) avroRecord.get("field2");
System.out.println("field1: " + field1Value + ", field2: " + field2Value);
}
}
consumer.close();
上述代码示例展示了如何使用Avro序列化和反序列化数据到/从Kafka主题。请确保根据您的实际环境和需求进行适当的配置和修改。