使用Avro生产者发送无键模式的消息,可以按照以下步骤进行:
String schemaString = "{\"type\":\"record\",\"name\":\"Message\",\"fields\":[{\"name\":\"value\",\"type\":\"string\"}]}";
Schema schema = new Schema.Parser().parse(schemaString);
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
KafkaProducer producer = new KafkaProducer<>(props);
GenericRecord message = new GenericData.Record(schema);
message.put("value", "Hello Kafka!");
ProducerRecord record = new ProducerRecord<>("topic-name", message);
producer.send(record);
完整的示例代码:
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class AvroProducerExample {
public static void main(String[] args) {
// Define Avro Schema
String schemaString = "{\"type\":\"record\",\"name\":\"Message\",\"fields\":[{\"name\":\"value\",\"type\":\"string\"}]}";
Schema schema = new Schema.Parser().parse(schemaString);
// Create Kafka Producer configuration
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
// Create Kafka Producer instance
KafkaProducer producer = new KafkaProducer<>(props);
// Create Avro message
GenericRecord message = new GenericData.Record(schema);
message.put("value", "Hello Kafka!");
// Send message to Kafka cluster
ProducerRecord record = new ProducerRecord<>("topic-name", message);
producer.send(record);
// Close the producer
producer.close();
}
}
以上代码使用了Confluent提供的KafkaAvroSerializer,该序列化器可以将Avro消息转换为Kafka的字节数组格式。在使用KafkaAvroSerializer之前,需要确保已经启动了Schema Registry,并且配置了正确的URL。