问题描述:在使用Apache Flink的FlinkKinesisConsumer时,无法使用本地Kinesis进行测试和开发。
解决方法:
org.apache.flink
flink-connector-kinesis_${scala.binary.version}
${flink.version}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
public class KinesisSourceExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置Kinesis配置
env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
env.getConfig().setBoolean(ConsumerConfigConstants.STREAM_INITIAL_POSITION_LATEST, true);
// 创建FlinkKinesisConsumer
FlinkKinesisConsumer consumer = new FlinkKinesisConsumer<>(
"my-stream",
new SimpleStringSchema(),
env.getConfig().getProperties());
// 添加数据源
env.addSource(consumer)
.print();
// 执行作业
env.execute("Kinesis Source Example");
}
}
src/main/resources
目录下创建一个flink-conf.yaml
文件,并添加以下内容:kinesis.endpoint: localhost:4567
启动本地Kinesis服务,可以使用Amazon Kinesis Local或者LocalStack等。
运行程序,即可使用本地Kinesis进行测试和开发。
注意:在生产环境中,需要修改Kinesis的配置为实际的AWS Kinesis服务。