在 Apache Camel 中,当使用 ActiveMQ 的非 Java 消费者时,在桥接期间可能会引起以下错误:
java.lang.UnsupportedOperationException: Not supported for non-Java consumers: destinationType=TOPIC
这是因为 ActiveMQ 支持多种不同的消费者,但只有 Java 消费者可以使用 Apache Camel 桥接。如果您想在 Camel 中使用非 Java 消费者,则需要创建自定义代码来桥接消息。
以下是一些示例代码来处理这个问题。首先,您需要创建一个 ActiveMQ 组件并自定义消息桥接代码:
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;
public class MyBridgeComponent extends ActiveMQComponent {
private String uri; // the ActiveMQ broker URI
public MyBridgeComponent(String uri) {
super(new ActiveMQConnectionFactory(uri));
this.uri = uri;
}
public void bridgeMessage(ActiveMQDestination from, ActiveMQDestination to, Message message) {
// create a new connection to ActiveMQ
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
Connection connection = factory.createConnection();
connection.start();
// create a session and producer for the target destination
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(to);
// create a consumer for the source destination
// if this is a topic, also create a durable subscriber
DurableSubscriptionId subId = null;
MessageConsumer consumer;
if (from.isTopic()) {
subId = new DurableSubscriptionId("MyBridgeComponent", from.getPhysicalName(), message.getJMSMessageID());
consumer = session.createDurableSubscriber((Topic) from, subId.toString());
} else {
consumer = session.createConsumer(from);
}
// get the message and copy its contents to a new message
Message receivedMessage = consumer.receive();
TextMessage newMessage = session.createTextMessage(receivedMessage.getBody(String.class));
// copy message properties and headers
Enumeration property
上一篇:ApacheCamel中Elasticsearch端点失败,出现ClassNotFoundException:org.elasticsearch.common.CheckedConsumer。