如何解决如何展开多个 Flux 并保留原件?
我想生成两个对象类型的交错流。
我有两个辅助方法
-
Aaa genAaa()
(或Mono<Aaa>
?)创建一个Aaa
和 -
Flux<Bbb> genBbbs(Aaa a)
创建一组从Bbb
到a
的Flux<JsonNode>
。
总体结果应该是 [ {'name':'Aaa-X'},{'name':'Bbb-x1'},{'name':'Bbb-x2'},{'name':'Aaa-Y'},{'name':'Bbb-y1'},{'name':'Bbb-y2'},{'name':'Bbb-y3'}
]
以适应两种对象类型的混合。
所以,结果将类似于
final ObjectMapper om = new ObjectMapper();
public Flux<JsonNode> create() {
return Flux.range(0,2) // create 2
.map( idx -> genAaa() ) // bare Aaa's
.flatMap( a -> genBbbs(a) ) // bare Aaa to Flux<Bbb> ???
.map( om::valueToTree ); // anything to JsonNode
}
作为一个粗略的草图,我试过这个:
Aaa
但我这里有几个大问题:
因为我转换了 JsonNodes
对象(并因此消耗了它们),所以它们不再出现在结果中。我不知道如何“使用”并将它们保留在这种情况下。
我在想是否可以将“flux in progress”作为参数传递给 generate 函数,所以它们在创建时都添加了 1) Tests\Unit\HandShake\ConfirmApplicationHandShakeActionTest::BasedOnAnExistentAppIConfirmTheHandShake
TypeError: Argument 1 passed to Illuminate\Support\Testing\Fakes\EventFake::__construct() must implement interface Illuminate\Contracts\Events\Dispatcher,null given,called in /Users/pablo/Workspace/xxxx/vendor/laravel/framework/src/Illuminate/Support/Facades/Event.php on line 38
,但这感觉不对(完全不是异步的),我反正现在不会怎么样。我想 Fluxes 中有一个概念让我难以理解。
解决方法
您可以在传递给 genBbbs
的函数内将 Flux#concat
与 flatMap
方法一起使用:
private static Flux<JsonNode> combine() {
ObjectMapper objectMapper = new ObjectMapper();
return Flux.concat(getAaa("a0"),getAaa("a1"))// Flux<Aaa>
.flatMap(aaa -> Flux.concat(Mono.just(aaa),getBbbs(aaa))) // Flux<Object>
.map(objectMapper::valueToTree); // Flux<JsonNode>
}
concat
方法只是连接两个源:
- 使用
Mono.just
人工创建 - 来自
Flux<B>
调用的getBbbs(aaa)
示例输出:
{"name":"a0"}
{"name":"B1-a0"}
{"name":"B2-a0"}
{"name":"a1"}
{"name":"B1-a1"}
{"name":"B2-a1"}
完整列表:
public class Main {
@AllArgsConstructor
@Data
private static class Aaa {
private String name;
}
@AllArgsConstructor
@Data
private static class Bbb {
private String name;
}
private static Mono<Aaa> getAaa(String name) {
return Mono.just(new Aaa(name));
}
private static Flux<Bbb> getBbbs(Aaa aaa) {
return Flux.just(new Bbb("B1-" + aaa.getName()),new Bbb("B2-" + aaa.getName()));
}
public static void main(String[] args) {
combine().subscribe(System.out::println);
}
private static Flux<JsonNode> combine() {
ObjectMapper objectMapper = new ObjectMapper();
return Flux.concat(getAaa("a0"),getAaa("a1"))// Flux<Aaa>
.flatMap(aaa -> Flux.concat(Mono.just(aaa),getBbbs(aaa))) // Flux<Object>
.map(objectMapper::valueToTree); // Flux<JsonNode>
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。