微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

收到n个元素后如何退出Akka流?

如何解决收到n个元素后如何退出Akka流?

我是Akka的新手,我只是想抓住它。

作为一个实验,我想从Kinesis流中读取并收集n条消息并停止。

我发现唯一会停止读取记录的是Sink.head()。但这只会返回一条记录,我想获得更多。

我不太清楚在收到n条消息后如何停止从流中读取。

这是我到目前为止尝试过的代码

  @Test
  public void testReadingFromKinesisNRecords() throws ExecutionException,InterruptedException {
    final ActorSystem system = ActorSystem.create("foo");
    final Materializer materializer = ActorMaterializer.create(system);

    ProfileCredentialsProvider profileCredentialsProvider = ProfileCredentialsProvider.create();

    final KinesisAsyncclient kinesisClient = KinesisAsyncclient.builder()
        .credentialsProvider(profileCredentialsProvider)
        .region(Region.US_WEST_2)
            .httpClient(AkkaHttpClient.builder()
                .withActorSystem(system).build())
            .build();

    system.registerOnTermination(kinesisClient::close);

    String streamName = "akka-test-stream";
    String shardId = "shardId-000000000000";

    int numberOfRecordsToRead = 3;

    final ShardSettings settings = ShardSettings.create(streamName,shardId)
            .withRefreshInterval(Duration.ofSeconds(1))
            .withLimit(numberOfRecordsToRead) // return a maximum of n records (and quit?!)
            .withShardIterator(ShardIterators.latest());

    final Source<Record,NotUsed> sourceKinesisBasic = KinesisSource.basic(settings,kinesisClient);

    Flow<Record,String,NotUsed> flowMapRecordToString = Flow.of(Record.class).map(record -> extractDataFromrecord(record));
    Flow<String,NotUsed> flowPrinter = Flow.of(String.class).map(s -> debugPrint(s));
//    Flow<String,List<String>,NotUsed> flowGroupedWithinMinute =
//        Flow.of(String.class).groupedWithin(
//            numberOfRecordsToRead,// group size
//            Duration.ofSeconds(60) // group time
//        );

    Source<String,NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
        .via(flowMapRecordToString)
        .via(flowPrinter);
//        .via(flowGroupedWithinMinute); // nope

    // sink to list of strings
//    Sink<String,CompletionStage<List<String>>> sinkToList = Sink.seq();
    Sink<String,CompletionStage<List<String>>> sink10 = Sink.takeLast(10);
//    Sink<String,CompletionStage<String>> sinkHead = Sink.head(); // only gives you one message

    CompletionStage<List<String>> streamCompletion = sourceStringsFromKinesisRecords
        .runWith(sink10,materializer);
    CompletableFuture<List<String>> CompletableFuture = streamCompletion.toCompletableFuture();
    CompletableFuture.join(); // never stops running...
    List<String> result = CompletableFuture.get();
    int foo = 1;
  }

  private String extractDataFromrecord(Record record) {
    String encType = record.encryptionTypeAsstring();
    Instant arrivalTimestamp = record.approximateArrivalTimestamp();
    String data = record.data().asstring(StandardCharsets.UTF_8);
    return data;
  }

  private String debugPrint(String s) {
    System.out.println(s);
    return s;
  }

感谢您提供任何线索

解决方法

我发现答案是在流量级别使用takeN

...
Flow<String,String,NotUsed> flowTakeN = Flow.of(String.class).take(numberOfRecordsToRead);

Source<String,NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
    .via(flowMapRecordToString)
    .via(flowPrinter)
    .via(flowTakeN);
...
,

只需添加您找到的答案,也可以不用via更直接地表达事物:

Source<String,NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
    .map(record -> extractDataFromRecord(record))
    .map(s -> debugPrint(s))
    .take(10)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。