如何解决转换不是1:1而是1:many时如何创建Spark DataSet
我正在编写一个结构化的流Spark应用程序,在这里我从Kafka队列中读取并处理收到的消息。我想要的最终结果是一个DataSet[MyMessage]
(其中MyMessage
是一个自定义对象),我想排队到另一个Kafka主题。事实是,来自使用者Kafka队列的每个输入消息都可以产生多个MyMessage
对象,因此转换不是1:1,1:Many。
所以我在做
val messagesDataSet: DataSet[List[MyMessage]] = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers","server1")
.option("subscribe","topic1")
.option("failOnDataLoss",false)
.option("startingOffsets","offset1")
.load()
.select($"value")
.mapPartitions{r => createMessages(r)}
val createMessages(row: Iterator[Row]): List[MyMessage] = {
// ...
}
很显然,messagesDataSet
是DataSet[List[MyMessage]]
。有没有办法我只能得到DataSet[MyMessage]
?
或者是否可以采用DataSet[List[MyMessage]]
然后将每个MyMessage
对象写入另一个Kafka主题? (毕竟这是我的最终目标)
解决方法
您可以使用mapPartitions创建多个值(因此,其工作方式与flatMap类似),但是您必须返回Iterator:
def createMessages(row: Iterator[Row]): Iterator[MyMessage] = {
row.map(/*...*/) //you need too return iterator here
}
,
尝试
messagesDataSet.flatMap(identity)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。