微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

将 Flink 动态表转换为 Pandas 数据框

如何解决将 Flink 动态表转换为 Pandas 数据框

我正在使用 pyflink 表 api 从 Kafka 读取数据。现在我想将结果表转换为 Pandas 数据框。这是我的代码

exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = StreamTableEnvironment \
    .create(exec_env,environment_settings=EnvironmentSettings
            .new_instance()
            .in_streaming_mode()
            .use_blink_planner().build())

INPUT_TABLE = "source"

t_env \
    .connect(  # declare the external system to connect to
    Kafka()
        .version("universal")
        .topic("Rides")
        .start_from_earliest()
        .property("zookeeper.connect","zookeeper:2181")
        .property("bootstrap.servers","kafka:9092")) \
    .with_format(  # declare a format for this system
    Json()
        .fail_on_missing_field(True)
        .schema(DataTypes.ROW([
        DataTypes.FIELD("rideId",DataTypes.BIGINT()),DataTypes.FIELD("isstart",DataTypes.BOOLEAN()),DataTypes.FIELD("eventTime",DataTypes.STRING()),DataTypes.FIELD("lon",DataTypes.FLOAT()),DataTypes.FIELD("lat",DataTypes.FIELD("psgCnt",DataTypes.INT()),DataTypes.FIELD("taxiId",DataTypes.BIGINT())]))) \
    .with_schema(  # declare the schema of the table
    Schema()
        .field("rideId",DataTypes.BIGINT())
        .field("taxiId",DataTypes.BIGINT())
        .field("isstart",DataTypes.BOOLEAN())
        .field("lon",DataTypes.FLOAT())
        .field("lat",DataTypes.FLOAT())
        .field("psgCnt",DataTypes.INT())
        .field("eventTime",DataTypes.STRING())) \
    .in_append_mode() \
    .create_temporary_table(INPUT_TABLE)

table = t_env.from_path(INPUT_TABLE)
df = table.to_pandas()

在这里我既没有得到错误也没有得到结果。我正在使用 Flink 1.11.3。有没有办法将这个动态表转换成静态表或使 table.to_pandas() 工作的东西?

解决方法

我们不能在无界流表中调用 to_pandasto_pandas 只能在有界表中调用。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。