org.apache.camel
camel-rabbitmq
x.x.x
from("rabbitmq:{{rabbitmq.queue}}?autoAck=false&queue=q.foo&routingKey=k.bar")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
// Your code goes here
}
});
其中,queue参数指定要消费的RabbitMQ队列名称,routingKey参数指定用于消费的RabbitMQ Routing Key。
autoAck参数设置为false以禁用自动确认。
Processor每次从队列中接收到消息时,都会调用它。您可以在此处执行任何操作,例如将消息存储在数据库中或将其转发到其他系统。
from("rabbitmq:{{rabbitmq.queue}}?autoAck=false&queue=q.foo&routingKey=k.bar")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
// Your code goes here
exchange.getIn().getHeaders().put(RabbitMQConstants.CHANNEL, exchange.getProperty(RabbitMQConstants.CHANNEL));
exchange.getIn().getHeaders().put(RabbitMQConstants.DELIVERY_TAG, exchange.getProperty(RabbitMQConstants.DELIVERY_TAG));
}
})
.to("rabbitmq:{{rabbitmq.queue}}?queue=q.foo&routingKey=k.bar&autoAck=false")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
Channel channel = exchange.getIn().getHeader(RabbitMQConstants.CHANNEL, Channel.class);
Long deliveryTag = exchange.getIn().getHeader(RabbitMQConstants.DELIVERY_TAG, Long.class);
channel.basicAck(deliveryTag, false);
}
});
其中,我们通过设置autoAck