我们可以使用AWS CDK构建基础设施即代码,包括配置Kinesis Data Analytic应用程序的VPC设置。以下是一些示例代码:
import * as cdk from 'aws-cdk-lib';
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
import * as kda from 'aws-cdk-lib/aws-kinesisanalytics';
import * as ec2 from 'aws-cdk-lib/aws-ec2';
const app = new cdk.App();
const stack = new cdk.Stack(app, 'my-stack');
// 创建VPC
const vpc = new ec2.Vpc(stack, 'my-vpc');
// 创建Kinesis Data Stream
const inputStream = new kinesis.Stream(stack, 'my-input-stream', {
streamName: 'my-input-stream',
shardCount: 1,
});
// 创建Kinesis Data Analytics应用
const kdaApp = new kda.CfnApplicationV2(stack, 'my-kda-app', {
applicationName: 'my-kda-app',
runtimeEnvironment: 'SQL-1_0',
serviceExecutionRole: 'arn:aws:iam::123456789012:role/my-kda-app-service-role',
applicationConfiguration: {
inputs: [{
namePrefix: 'input',
inputSchema: {
recordColumns: [{
name: 'value',
mapping: '$.value.N',
sqlType: 'DOUBLE',
}],
recordFormat: {
recordFormatType: 'JSON',
mappingParameters: {
jsonMappingParameters: {
recordRowPath: '$',
},
},
},
},
kinesisStreamsInput: {
resourceArn: inputStream.streamArn,
roleArn: 'arn:aws:iam::123456789012:role/my-kda-app-kinesis-streams-input-role',
},
inputParallelism: {
count: 1,
},
inputProcessingConfiguration: {
inputLambdaProcessor: {
resourceArn: processFunction.functionArn,
roleArn: 'arn:aws:iam::123456789012:role/my-kda-app-lambda-function-role',
},
},
kinesisFirehoseInput: {
resourceArn: firehoseStream.streamArn,
roleArn: 'arn:aws:iam::123456789012:role/my-kda-app-firehose-input-role',
},
}],
},
});
// 更新Kinesis Data Analytics应用的VPC配置
kdaApp.addPropertyOverride('vpcConfiguration', {
subnetIds: vpc.privateSubnets.map(subnet => subnet.subnetId),
securityGroupIds: [vpc.vpcDefaultSecurityGroup