要在Apache Kafka中使用Apache NIFI推送消息,你可以使用KafkaProducer类来发送消息。下面是一个使用NIFI和KafkaProducer的示例代码:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaNIFIPusher {
public static void main(String[] args) {
// 设置Kafka生产者的配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka broker的地址
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建Kafka生产者
KafkaProducer producer = new KafkaProducer<>(props);
// 构造消息
String topic = "your_topic";
String message = "Hello Kafka!";
// 发送消息到Kafka
ProducerRecord record = new ProducerRecord<>(topic, message);
producer.send(record);
// 关闭Kafka生产者
producer.close();
}
}
以上代码会发送一条消息到指定的Kafka主题。你需要根据你的实际情况修改以下参数:
确保你的NIFI流程正确配置了Kafka Producer处理器,并且使用了正确的Kafka主题和相应的序列化器。这样,当NIFI流程运行时,它将会将消息推送到Kafka中。