微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

AWS kinesis getRecords 返回空的 Records[]

如何解决AWS kinesis getRecords 返回空的 Records[]

我在玩 kinesis,我尝试了一个非常简单的例子。 我已经执行的步骤: 执行Producer放置一些成功的记录。

在执行 getRecords 时在消费者端面临错误。 我试图更改所有这些方法以从 api 获取记录: 'AT_SEQUENCE_NUMBER | AFTER_SEQUENCE_NUMBER | TRIM_HORIZON |最新'。

回复

Aws\Result Object

( [数据:Aws\结果:私有] => 数组 ( [记录] => 数组 ( )

        [NextShardIterator] => AAAAAAAAAA.....
        [MillisBehindLatest] => 0
        [@Metadata] => Array

代码

$streamName = 'test';
$numberOfRecordsPerBatch = 10000;

require_once 'vendor/autoload.PHP';
$sdk = new \Aws\Sdk();
$kinesisClient = $sdk->createKinesis([
    'region' => '{REGION}','version' => '2013-12-02','credentials' => [
        'key' => '{API_KEY}','secret' => '{API_SECRET}'
    ]
]);

// get all shard ids
$res = $kinesisClient->describeStream([ 'StreamName' => $streamName ]);
$shardIds = $res->search('StreamDescription.Shards[].ShardId');

$count = 0;
$startTime = microtime(true);

foreach ($shardIds as $shardId) {
    echo "ShardId: $shardId\n";

    // get initial shard iterator
    $res = $kinesisClient->getShardIterator([
        'ShardId' => $shardId,'ShardIteratorType' => 'LATEST',// 'AT_SEQUENCE_NUMBER| AFTER_SEQUENCE_NUMBER | TRIM_HORIZON|LATEST'
        // 'StartingSequenceNumber' => '<string>','StreamName' => $streamName,]);
    $shardIterator = $res->get('ShardIterator');

    do {
        echo "Get Records\n";
        $res = $kinesisClient->getRecords([
            'Limit' => $numberOfRecordsPerBatch,'ShardIterator' => $shardIterator
        ]);

        $shardIterator = $res->get('NextShardIterator');
        $localCount = 0;

        foreach ($res->search('Records[].[SequenceNumber,Data]') as $data) {
            list($sequenceNumber,$item) = $data;
            echo "- [$sequenceNumber] $item\n";
            $count++;
            $localCount++;
        }
        echo "Processed $localCount records in this batch\n";
        sleep(1);
    } while ($localCount>0);
}

我也浏览了 AWS 文档,发现我们发送的所有详细信息都是正确的,但我们仍然没有收到任何记录作为回应。

谢谢

解决方法

当您将记录摄取到 kinesis 数据流时,您将收到每条记录的 sequence number。相同分区键的序列号通常会随着时间的推移而增加。写请求之间的时间间隔越长,序列号就越大。

当您执行 GetShardIterator 时,您基本上指向该分片中的特定序列号。无法保证摄取的数据在当前指针可用。因此,第一个 GetRecords 可能不返回任何记录。您必须循环运行 GetRecords。当前,如果第一个 GetRecords 没有返回任何结果,则您的 while 条件将失败。相反,您可以有条件来检查“NextShardIterator”是否在 while 中不为空以连续从分片读取。

如果您想在第一个 GetRecords 调用中获取记录,则

  1. 保存作为 PutRecord 调用响应返回的序列号。
  2. 在 GetShardIterator 中使用“AT_SEQUENCE_NUMBER”分片迭代器类型,并将保存的序列号提供给 StartingSequenceNumber 字段。
  3. 使用步骤 2 中返回的分片迭代器运行 GetRecords

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。