以下是使用Apache Camel和Kafka组件实现单个生产者多个消费者的代码示例:
import org.apache.camel.builder.RouteBuilder;
public class KafkaRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
// 配置Kafka生产者
from("direct:kafkaProducer")
.to("kafka:myTopic");
// 配置Kafka消费者
from("kafka:myTopic")
.split().tokenize("\n")
.to("direct:kafkaConsumer");
// 配置多个消费者
from("direct:kafkaConsumer")
.process(exchange -> {
String message = exchange.getIn().getBody(String.class);
// 处理消息
System.out.println("Received message: " + message);
});
}
}
import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultCamelContext;
public class Application {
public static void main(String[] args) throws Exception {
// 创建Camel Context
CamelContext context = new DefaultCamelContext();
// 添加Kafka路由构建器
context.addRoutes(new KafkaRouteBuilder());
// 启动Camel Context
context.start();
// 在这里可以发送消息到Kafka主题
context.createProducerTemplate().sendBody("direct:kafkaProducer", "Hello Kafka!");
// 等待一段时间
Thread.sleep(5000);
// 停止Camel Context
context.stop();
}
}
pom.xml
文件中添加Apache Camel和Kafka组件的依赖项。
org.apache.camel
camel-core
x.x.x
org.apache.camel
camel-kafka
x.x.x
org.apache.kafka
kafka-clients
x.x.x
确保将x.x.x
替换为所需的Apache Camel和Kafka版本号。
以上代码示例演示了如何使用Apache Camel和Kafka组件实现单个生产者多个消费者模式。您可以根据需要修改代码以适应您的应用程序。