微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

是否可以在Spark结构化流中使用foreachBatch将两个不相交的数据集写入数据同步中?

如何解决是否可以在Spark结构化流中使用foreachBatch将两个不相交的数据集写入数据同步中?

我正在尝试将数据从单一来源写入多个DataSink(Mongo和Postgres DB)。 传入数据

Dataset<Row> df = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers","localhost:9092")
        .option("subscribe","topic1")
        .load();

Dataset<Row> personalDetails = df.selectExpr("name","id","age");


personalDetails.writeStream()
    .outputMode(OutputMode.Update())
    .foreachBatch((dataframe,bachId) -> {
            dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
                    .option("uri","mongodb://localhost/employee")
                    .option("database","employee")
                    .option("collection","PI").save();
    }).start();

Dataset<Row> salDetails = df.selectExpr("basicSal","bonus");
salDetails.writeStream()
    .outputMode(OutputMode.Update())
    .foreachBatch((dataframe,"SAL").save();
    }).start();

问题是,我可以看到Spark正在打开两个Stream,并且两次读取相同的事件。 是否可以读取一次并应用不同的转换并写入不同的集合?

解决方法

您应该缓存DataFrame。 参见here

写入多个位置-如果要将流查询的输出写入多个位置,则只需将输出DataFrame / Dataset多次写入即可。但是,每次写入尝试都可能导致重新计算输出数据(包括可能重新读取输入数据)。为了避免重新计算,您应该缓存输出DataFrame / Dataset,将其写入多个位置,然后取消缓存。

及其示例:

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame,batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()
}

您可以将所有代码放在一个foreachBatch中,并将数据帧写入两个接收器。您可以通过缓存数据帧,然后在此缓存的数据帧上执行selectExpr并保存它来完成此操作。

作为旁注-请注意,无论如何,如果您想要“全有或全无”(即您不希望自己写信给mongo而不是写postgres的情况),您只能使用一个foreachBatch,因为否则(如果您有2个foreachBatch,例如您的问题),则有2个独立的批次-对于相同的数据,一个批次可能会失败,而另一个批次可能会失败。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。