微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在 flink 中,当我将assignTimestampsAndWatermarks() 与接口 AscendingTimestampExtractor() 一起使用时,我收到了弃用消息

如何解决在 flink 中,当我将assignTimestampsAndWatermarks() 与接口 AscendingTimestampExtractor() 一起使用时,我收到了弃用消息

代码的替代方法是什么以防止弃用。 我什至也使用了assignTimeStamp接口,但我遇到了错误

DataStream<Tuple2<Long,String>> sum = data.map(new MapFunction<String,Tuple2<Long,String>>()
                {
                    public Tuple2<Long,String> map(String s)
                    {
                        String[] words = s.split(",");
                        return new Tuple2<Long,String>(Long.parseLong(words[0]),words[1]);
                    }
                })
                
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<Long,String>>()
                        
                {
                    public long extractAscendingTimestamp(Tuple2<Long,String> t)
                    {
                        return t.f0;
                    }
                })
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
                .reduce(new ReduceFunction<Tuple2<Long,String>>()
                        {
                    public Tuple2<Long,String> reduce(Tuple2<Long,String> t1,String> t2)
                    {
                        int num1 = Integer.parseInt(t1.f1);
                        int num2 = Integer.parseInt(t2.f1);
                        int sum = num1 + num2;
                        Timestamp t = new Timestamp(System.currentTimeMillis());
                        return new Tuple2<Long,String>(t.getTime(),"" + sum);
                    }
                });

解决方法

您需要使用WatermarkStrategy.forMonotonousTimestamps

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.sql.Timestamp;

public class Test {
    public static void main(String[] args) {
        StreamExecutionEnvironment flink = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data = flink.fromElements("hello","world");
        data.map((MapFunction<String,Tuple2<Long,String>>) s -> {
                    String[] words = s.split(",");
                    return new Tuple2<>(Long.parseLong(words[0]),words[1]);
                }
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Long,String>>forMonotonousTimestamps()
                .withTimestampAssigner((SerializableTimestampAssigner<Tuple2<Long,String>>) (element,recordTimestamp) -> element.f0))
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
                .reduce((ReduceFunction<Tuple2<Long,String>>) (t1,t2) -> {
                    int num1 = Integer.parseInt(t1.f1);
                    int num2 = Integer.parseInt(t2.f1);
                    int sum = num1 + num2;
                    Timestamp t = new Timestamp(System.currentTimeMillis());
                    return new Tuple2<>(t.getTime(),"" + sum);
                });
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。