微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Kinesis Stream 未获取日志

如何解决Kinesis Stream 未获取日志

我正在 Kinesis 数据流中接收 cloudtrail 日志。我正在调用流处理 lambda 函数,如 here 所述。然后将返回到流的最终结果存储到 S3 存储桶中。截至目前,处理失败并在 S3 存储桶中创建以下错误文件

{"attemptsMade":4,"arrivalTimestamp":1619677225356,"errorCode":"Lambda.FunctionError","errorMessage":"Check your function and make sure the output is in required format. In addition to that,make sure the processed records contain valid result status of Dropped,Ok,or ProcessingFailed","attemptEndingTimestamp":1619677302684,

在此处添加 Python lambda 函数以供参考:

import base64
import gzip
import json
import logging

# Setup logging configuration
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

def unpack_kinesis_stream_records(event):

    # decode and decompress each base64 encoded data element
    return [gzip.decompress(base64.b64decode(k["data"])).decode('utf-8') for k in event["records"]]

def decode_raw_cloud_trail_events(cloudTrailEventDataList):

    #Convert Raw Event Data List
    eventList =  [json.loads(e) for e in cloudTrailEventDataList]

    #Filter out-non DATA_MESSAGES
    filteredEvents = [e for e in eventList if e["messageType"] == 'DATA_MESSAGE']

    #Covert each indidual log Event Message
    events = []
    for f in filteredEvents:
        for e in f["logEvents"]:
            events.append(json.loads(e["message"]))

    logger.info("{0} Event Logs Decoded".format(len(events)))
    return events

def handle_request(event,context):

    #Log Raw Kinesis Stream Records
    #logger.debug(json.dumps(event,indent=4))

    # Unpack Kinesis Stream Records
    kinesisData = unpack_kinesis_stream_records(event)
    #[logger.debug(k) for k in kinesisData]

    # Decode and filter events
    events = decode_raw_cloud_trail_events(kinesisData)

    ####### INTEGRATION CODE GOES HERE #########

    return f"Successfully processed {len(events)} records."

def lambda_handler(event,context):
    return handle_request(event,context)

谁能帮我理解这里的问题。

解决方法

我相信您使用的是“kinesis firehose”服务而不是“kinesis data stream”。您使用的代码用于直接从 kinesis 数据流中读取并处理 cloudtrail 事件。

kinesis firehose 数据转换 lambda 函数不同。 Firehose 将接收到的 cloudtrail 事件发送到 lambda 函数。 Lambda 处理/转换事件并将这些事件发送回 firehose,以便 firehose 可以将它们传送到目标 S3 存储桶。

您的 lambda 函数应该以与 firehose 期望的完全相同的格式返回记录,并且每个记录都应该具有 [Dropped、Ok 或 ProcessingFailed] 状态之一。您可以在 aws doc

中阅读更多信息

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。