如何解决Apache Beam:从具有存储数据的多个主题中读取
我需要从多个 Kafka 主题中读取按时间计算的起始偏移量,按时间戳对它们进行排序并发送到另一个 kafka 主题。所有 kafka 主题都有 1 个分区。
再举一个例子来更好地描述用例。
我们有 inputTopic1,inputTopic2
和 outputTopic
。我们需要使用过去 12 小时内来自 inputTopics
的数据并继续使用实时数据。所有消耗的数据都需要排序并发布到outputTopic
。
我尝试创建自定义 windowFn
来处理此用例,但我在 output
主题中获得了无序数据。
我有两个问题。
我是否选择了正确的方法来解决这个问题?我是否以正确的方式实施它?
管道
Instant NowAsInstant = Instant.Now();
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read from Topics",KafkaIO.<String,String>read()
.withTopics(List.of("topic1","topic2"))
.withBootstrapServers("localhost:9092")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withCreateTime(Duration.ZERO)
.withStartReadTime(NowAsInstant.minus(Duration.standardHours(12)))
.withConsumerConfigUpdates(consumerConfig)
.commitOffsetsInFinalize())
.apply(Window.into(new CustomWindowFn(NowAsInstant,Duration.millis(500))))
.apply(Combine.globally(new ListCombiner()).withoutDefaults())
.apply("Sort",MapElements.via(
new SimpleFunction<Iterable<KafkaRecord<String,String>>,List<KafkaRecord<String,String>>>() {
@Override
public List<KafkaRecord<String,String>> apply(Iterable<KafkaRecord<String,String>> input) {
return StreamSupport.stream(input.spliterator(),false)
.sorted(KAFKA_RECORD_COMParaTOR)
.collect(Collectors.toUnmodifiableList());
}
}
))
.apply(Flatten.iterables())
.apply("mapToProducerRecord",MapElements.<ProducerRecord<String,String>>into(new ProducerRecordCoder<>(
StringUtf8Coder.of(),StringUtf8Coder.of()).getEncodedTypeDescriptor())
.via((SerializableFunction<KafkaRecord<String,String>,ProducerRecord<String,String>>)
new SimpleFunction<KafkaRecord<String,String>>() {
@Override
public ProducerRecord<String,String> apply(KafkaRecord<String,String> input) {
return new ProducerRecord<>("outputTopic",null,input.getTimestamp(),input.getKV().getKey(),input.getKV().getValue(),input.getHeaders());
}
}))
.setCoder(ProducerRecordCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()))
.apply("Write to Kafka",String>writeRecords()
.withBootstrapServers("localhost:9092")
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class));
LOG.info("Starting pipeline...");
pipeline.run();
CustomWindowFn
public class CustomWindowFn extends PartitioningWindowFn<KafkaRecord<String,IntervalWindow> {
private final Instant startingTime;
private final Duration size;
private Instant intervalStartTime;
public CustomWindowFn(Instant startingTime,Duration size) {
this.startingTime = startingTime;
this.size = size;
}
@Override
public IntervalWindow assignWindow(Instant timestamp) {
if (timestamp.isBefore(startingTime)) {
Instant firstRecordTimestamp = getIntervalStartTime(timestamp);
return new IntervalWindow(firstRecordTimestamp,startingTime);
}
Instant start =
new Instant(timestamp.getMillis()
- timestamp.plus(size).getMillis() % size.getMillis());
return new IntervalWindow(start,start.plus(size));
}
private Instant getIntervalStartTime(Instant timestamp) {
if (isNull(intervalStartTime)) {
intervalStartTime = timestamp;
}
return intervalStartTime;
}
@Override
public boolean isCompatible(WindowFn<?,?> other) {
return this.equals(other);
}
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
@Override
public void verifyCompatibility(WindowFn<?,?> other) throws IncompatibleWindowException {
if (!this.isCompatible(other)) {
throw new IncompatibleWindowException(
other,String.format(
"Only %s objects with the same size and startingTime are compatible.",CustomWindowFn.class.getSimpleName()));
}
}
public Instant getStartingTime() {
return startingTime;
}
public Duration getSize() {
return size;
}
@Override
public boolean equals(@Nullable Object object) {
if (!(object instanceof CustomWindowFn)) {
return false;
}
CustomWindowFn other = (CustomWindowFn) object;
return getStartingTime().equals(other.getStartingTime())
&& getSize().equals(other.getSize());
}
@Override
public int hashCode() {
return Objects.hash(size,startingTime);
}
}
ListCombiner
public class ListCombiner extends Combine.CombineFn<KafkaRecord<String,String>>> {
@Override
public List<KafkaRecord<String,String>> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<KafkaRecord<String,String>> addInput(List<KafkaRecord<String,String>> mutableAccumulator,KafkaRecord<String,String> input) {
if(input != null) {
mutableAccumulator.add(input);
}
return mutableAccumulator;
}
@Override
public List<KafkaRecord<String,String>> mergeAccumulators(Iterable<List<KafkaRecord<String,String>>> accumulators) {
return StreamSupport.stream(accumulators.spliterator(),false)
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
@Override
public List<KafkaRecord<String,String>> extractOutput(List<KafkaRecord<String,String>> accumulator) {
return accumulator;
}
}
我还阅读了 timely 和 stateful 处理文章,但我不确定如何将其用于以下任务。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。