根据 Flink 官方文档,Async I/O 运算符的每个实例都会有自己的广播状态,不会共享。如果需要多个实例共享广播状态,则需要手动实现 BroadcastState。
以下是示例代码:
// 定义广播状态描述
public class MyBroadcastStateDescriptor extends MapStateDescriptor {
public MyBroadcastStateDescriptor(String name, TypeInformation keyTypeInfo, TypeInformation valueTypeInfo) {
super(name, keyTypeInfo, valueTypeInfo);
}
}
public class MyAsyncFunction extends RichAsyncFunction {
private transient MapState broadcastState;
// 初始化广播状态
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MyBroadcastStateDescriptor descriptor = new MyBroadcastStateDescriptor("broadcastState", Types.STRING, Types.LONG);
broadcastState = getRuntimeContext().getMapState(descriptor);
if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
// 只在并行度为1时进行广播
broadcastState.put("key1", 1L);
broadcastState.put("key2", 2L);
broadcastState.put("key3", 3L);
}
}
// 异步处理数据,访问广播状态
@Override
public void asyncInvoke(IN input, ResultFuture resultFuture) throws Exception {
Long value = broadcastState.get("key1");
// ...
}
// 更新广播状态
@Override
public void processBroadcastElement(String value, Context ctx, Collector out) throws Exception {
// ...
broadcastState.put("key1", 10L);
// ...
}
}