在使用Avro序列化时,我们可能会遇到使用Schema Registry时出现的异常,如下所示:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40401
这个异常通常意味着我们的Schema Registry中没有找到所需的Schema,可以通过检查Schema Registry中是否存在所需的Schema来解决。以下是一个示例代码,以演示如何在Kafka中使用Avro进行序列化和反序列化:
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import java.util.HashMap;
import java.util.Map;
public class AvroSerializerDeserializerExample {
private static final String TOPIC = "test-topic";
private static final String SCHEMA_REGISTRY_URL = "http://localhost:8081";
private static final String USER_SCHEMA = "{\"namespace\": \"example.avro\",\n" +
" \"type\": \"record\",\n" +
" \"name\": \"User\",\n" +
" \"fields\": [\n" +
" {\"name\": \"name\", \"type\": \"string\"},\n" +
" {\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]},\n" +
" {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n" +
" ]\n" +
"}";
public static void main(String[] args) throws Exception {
Map props = new HashMap<>();
props.put("schema.registry.url", SCHEMA_REGISTRY_URL);
Serializer serializer = getAvroSerializer(props);
Deserializer deserializer = getAvroDeserializer(props);
User user = createUser();
System.out.println("User Before Serialization: " +