该错误一般是因为使用了较老的 Pulsar 版本所导致的。解决方法是将 Pulsar 升级到较新的版本。同时,建议使用最近更新的 Pulsar API 来操作 schema。以下是一个示例,展示了在 Pulsar 2.8.1 中使用 Schema 的建议方法:
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.*;
public class PulsarSchemaDemo {
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
CustomerSchema customerSchema = new CustomerSchema();
Producer producer = client.newProducer(customerSchema)
.topic("persistent://my-tenant/my-namespace/my-topic")
.create();
Customer myCustomer = new Customer();
myCustomer.setName("John");
myCustomer.setAge(35);
producer.send(myCustomer);
consumer.close();
client.close();
}
private static class Customer {
String name;
int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
private static class CustomerSchema implements Schema {
@Override
public byte[] encode(Customer message) throws SchemaSerializationException {
// Encode Customer object to byte array
}
@Override
public Customer decode(byte[] bytes) throws SchemaSerializationException {
// Decode byte array to Customer object
}
@Override
public SchemaInfo getSchemaInfo() {
// Create SchemaInfo object with schema details
}
}
}