出现"Not a SubType"异常通常是由于AvroCoder注册器无法正确识别Avro类型导致的。以下是解决方法的代码示例:
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.ReflectData;
public class AvroTypeRegistrar {
public static void registerAvroTypes() {
ReflectData.get().getSchema(SomeAvroType.class);
// 注册其他Avro类型
}
}
在Flink job启动前调用AvroTypeRegistrar.registerAvroTypes()
方法,确保所有Avro类型已正确注册。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.java.typeutils.TypeExtractor;
public class FlinkJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 注册Avro类型
env.getConfig().registerTypeWithKryoSerializer(SomeAvroType.class, SpecificAvroSerializer.class);
// 注册其他Avro类型
// 添加其他Flink job代码
env.execute("Flink Job");
}
}
使用env.getConfig().registerTypeWithKryoSerializer()
方法注册Avro类型。
AvroCoder.of()
方法手动注册Avro类型:import org.apache.avro.generic.GenericData;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CoderRegistry;
public class AvroTypeRegistrar {
public static void registerAvroTypes() {
CoderRegistry coderRegistry = CoderRegistry.createDefault();
// 注册Avro类型
coderRegistry.registerCoderForClass(SomeAvroType.class, AvroCoder.of(SomeAvroType.class));
// 注册其他Avro类型
// 设置CoderRegistry到PipelineOptions
GenericData.setCoderRegistry(coderRegistry);
}
}
在Flink job启动前调用AvroTypeRegistrar.registerAvroTypes()
方法,手动注册Avro类型。
通过以上解决方法,您应该能够解决在使用AvroCoderRegistrar的Flink Runner上启动Beam WordCount Docker镜像时出现的"Not a SubType"异常。