为业务逻辑类添加序列化方法。Flink 中使用的 DataStream 或 KeyedStream 的算子需要对输入和输出的数据进行序列化和反序列化。如果这些数据包含非序列化对象,则会导致运行时错误。要解决这个问题,可以在业务逻辑类上实现 Serializable 接口,并确保所有的成员变量也都实现了 Serializable 接口。
示例代码:
public class MyBusinessLogic implements Serializable { private String field1; private int field2;
public MyBusinessLogic(String field1, int field2) {
this.field1 = field1;
this.field2 = field2;
}
public String getField1() {
return field1;
}
public void setField1(String field1) {
this.field1 = field1;
}
public int getField2() {
return field2;
}
public void setField2(int field2) {
this.field2 = field2;
}
// 添加序列化方法
private void writeObject(java.io.ObjectOutputStream out) throws IOException {
out.writeObject(field1);
out.writeInt(field2);
}
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
field1 = (String) in.readObject();
field2 = in.readInt();
}
}
然后在 Flink 的 DataStream 或者 KeyedStream 中使用 MyBusinessLogic 时就不会报 NoSerializationException 异常了。