在AWS控制台中,创建一个SNS主题。在主题的订阅中添加Lambda函数,并为每个订阅设置适当的协议和端点。
在Lambda函数的代码中,使用aws-sdk Node.js库来发送错误通知到SNS主题。例如,从event对象中获取每个记录的数据并在处理失败时发送SNS消息:
var AWS = require("aws-sdk");
var sns = new AWS.SNS();
exports.handler = function(event, context, callback) {
event.Records.forEach(function(record) {
// Process record ...
if (record.someErrorCondition) {
var params = {
Message: "Error processing Kinesis record: " + JSON.stringify(record),
Subject: "Kinesis Error",
TopicArn: "arn:aws:sns:us-east-1:123456789012:my-topic"
};
sns.publish(params, function(err, data) {
if (err) {
console.log(err, err.stack);
} else {
console.log("SNS message sent: " + JSON.stringify(params));
}
callback(err, null);
});
}
});
};
在上面的示例中,SNS主题的ARN应设置为适当的值。
import json
import boto3
def lambda_handler(event, context):
topic_arn = ""
sns = boto3.client('sns')
for record in event['Records']:
try:
# Process record ...
except:
error_message = "Error processing Kinesis record: {}".format(json.dumps(record))
sns.publish(
TopicArn=topic_arn,
Subject='Kinesis Error',
Message=error_message
)
raise
在上面的示例中,请将topic_arn替换为正确的值。