Apache Kafka 的生产者只能连接到指定的 Kafka 代理,并且只能发送消息到其感兴趣的主题。生产者无法直接感知整个网络的存在。
以下是一个使用 Java 语言编写的 Apache Kafka 生产者代码示例:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// 配置 Kafka 生产者属性
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka 代理地址
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建 Kafka 生产者
Producer producer = new KafkaProducer<>(props);
// 发送消息到指定主题
String topic = "my-topic";
String key = "key1";
String value = "Hello Kafka";
ProducerRecord record = new ProducerRecord<>(topic, key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
// 发送失败处理逻辑
exception.printStackTrace();
} else {
// 发送成功处理逻辑
System.out.println("Message sent successfully. Offset: " + metadata.offset());
}
}
});
// 关闭 Kafka 生产者
producer.close();
}
}
在上述示例中,我们使用了 Kafka 的 Java 客户端库,设置了 Kafka 代理地址和消息序列化器,并创建了一个 Kafka 生产者。然后,我们发送一个包含键、值和主题的消息记录,并通过回调函数处理发送结果。最后,我们关闭了生产者。
请注意,上述代码中的 localhost:9092
是 Kafka 代理的地址,你需要根据你的实际环境进行相应的配置。另外,你需要在 Maven 或 Gradle 等构建工具中添加对 Kafka 客户端库的依赖。
希望以上信息对你有所帮助!