如何解决集成DataStreamAPI和TableAPI
除了这个question之外,我还创建了这个示例来集成DataStreamAPI
和TableAPI
,这次我没有错误,我有两个工作,而不是一个,一个是为运行完美的DataStreamAPI
创建的,另一个工作也为运行完美的TableAPI
创建的,但是唯一的问题是永远不会从{{1}获得任何值},例如:
DataStreamAPI
这样做,我可以在记录器中看到以下行:
/*FILTERING NULL IDs*/
final SingleOutputStreamOperator<Event> stream_filtered = eventsStream
.filter(new NullidEventsFilterFunction())
.uid("id_filter_operator")
.name("Event Filter");
final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(TableEnvironmentConfiguration.getEnv(),fsSettings);
SingleOutputStreamOperator<String> toTable = stream_filtered.map(x -> x.id).name("Map for table");
Table source = fsTableEnv.fromDataStream(toTable);
source.execute(); /*without this line the TableAPI job is not started,but nothing happens if is not there either*/
DataStream<String> finalRes = fsTableEnv.toAppendStream(source,String.class);
finalRes.map((MapFunction<String,String>) value -> value)
.name("Mapping after table")
.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value) {
LOG.info("Record from table: " + value);
}
}).name("Sink after map from table");
/*STARTING TRANSFORMATIONS*/
Init.init(stream_filtered);
env.execute(job_name);
但没有记录被接收或发送。
查看INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Event Mapper -> Watermarks Added -> Event Filter -> Map for table -> SourceConversion(table=[Unregistered_DataStream_5],fields=[f0]) -> SinkConversionToRow -> Sink: Select table sink (1/1) (0d3cd78d35480c44f09603786bf775e7) switched from DEPLOYING to RUNNING.
作业的图像
有什么主意吗? 提前致谢。 亲切的问候!
解决方法
如果您要编写一个以DataStream API开头和结尾并在中间使用Table API的作业,那么这是您可以建立的一个简单示例。
请注意,所涉及的详细信息在各个发行版之间有所更改,并且此特定示例按Flink 1.11编写。 FLIP-136: Improve interoperability between DataStream and Table API正在努力使这一过程变得更加容易。
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
public class BackAndForth {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
SingleOutputStreamOperator<Tuple2<String,Long>> rawInput = env.fromElements(
new Tuple2<>("u2",0L),new Tuple2<>("u1",5L),new Tuple2<>("u2",1L),new Tuple2<>("u3",3L),2L));
Table events = tableEnv.fromDataStream(rawInput,$("userId"),$("value"));
Table results = events
.select($("userId"),$("value"))
.where($("value").isGreater(0));
tableEnv
.toAppendStream(results,Row.class)
.print();
env.execute();
}
}
您可能会担心在Web UI中显示“已发送记录:0”和“已接收记录:0”。这是非常误导的。这些Flink度量标准仅测量Flink中的记录和字节流,并且不报告与外部系统的任何I / O。这些度量标准也不报告链接在一起的运算符之间的记录和字节。这两个作业中的所有内容都是链接在一起的,因此在这种情况下,发送/接收的记录/字节将始终为零。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。