如何解决在 flink 中,当我将assignTimestampsAndWatermarks() 与接口 AscendingTimestampExtractor() 一起使用时,我收到了弃用消息
此代码的替代方法是什么以防止弃用。 我什至也使用了assignTimeStamp接口,但我遇到了错误
DataStream<Tuple2<Long,String>> sum = data.map(new MapFunction<String,Tuple2<Long,String>>()
{
public Tuple2<Long,String> map(String s)
{
String[] words = s.split(",");
return new Tuple2<Long,String>(Long.parseLong(words[0]),words[1]);
}
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<Long,String>>()
{
public long extractAscendingTimestamp(Tuple2<Long,String> t)
{
return t.f0;
}
})
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<Tuple2<Long,String>>()
{
public Tuple2<Long,String> reduce(Tuple2<Long,String> t1,String> t2)
{
int num1 = Integer.parseInt(t1.f1);
int num2 = Integer.parseInt(t2.f1);
int sum = num1 + num2;
Timestamp t = new Timestamp(System.currentTimeMillis());
return new Tuple2<Long,String>(t.getTime(),"" + sum);
}
});
解决方法
您需要使用WatermarkStrategy.forMonotonousTimestamps
:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.sql.Timestamp;
public class Test {
public static void main(String[] args) {
StreamExecutionEnvironment flink = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> data = flink.fromElements("hello","world");
data.map((MapFunction<String,Tuple2<Long,String>>) s -> {
String[] words = s.split(",");
return new Tuple2<>(Long.parseLong(words[0]),words[1]);
}
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Long,String>>forMonotonousTimestamps()
.withTimestampAssigner((SerializableTimestampAssigner<Tuple2<Long,String>>) (element,recordTimestamp) -> element.f0))
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce((ReduceFunction<Tuple2<Long,String>>) (t1,t2) -> {
int num1 = Integer.parseInt(t1.f1);
int num2 = Integer.parseInt(t2.f1);
int sum = num1 + num2;
Timestamp t = new Timestamp(System.currentTimeMillis());
return new Tuple2<>(t.getTime(),"" + sum);
});
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。