如何解决Spark读取具有多种记录类型的文件
全部
我正在尝试在Spark中读取具有多种记录类型的文件,但不知道如何执行。 有人可以指出是否有办法吗?或一些现有的软件包?
以下示例-我们有一个文本文件,其中包含2个单独的文件(可能超过2个)
记录类型:
00X-record_ind |名|姓氏
00Y-record_ind |帐号#|州|国家
input.txt
------------
00X|Atun|Varma
00Y|0002355|IL|USA
00X|Diya|Reddy
00Y|0002345|FL|USA
sample output :
output.txt
------------
1|ATUL|VARMA|002355|USA
2|Diya|Reddy|0002345|USA
解决方法
假设Y
记录属于其前面的X
记录,并且在文件中保证了该顺序,则可以按以下步骤进行操作。
- 在添加行号值的同时读取文本文件:
注意:我在这里读取一个字符串,您可以为文件修改它。
val inputText = """00X|Atun|Varma
00Y|0002355|IL|USA
00X|Diya|Reddy
00Y|0002345|FL|USA"""
val input = inputText.split("\n").zipWithIndex.map{case (line,i) => s"$i | ${line.trim}"}.mkString("\n")
print(input)
0 | 00X|Atun|Varma
1 | 00Y|0002355|IL|USA
2 | 00X|Diya|Reddy
3 | 00Y|0002345|FL|USA
我现在可以按如下所示将此文本读入数据框。
注意:我要添加显式列名。您不必也可以依靠Spark生成的那些,因为到目前为止它们是毫无意义的。
val df = spark.read
.option("ignoreTrailingWhiteSpace","true")
.option("ignoreLeadingWhiteSpace","true")
.option("delimiter","|")
.option("header","true")
.csv(spark.sparkContext.parallelize(("line_num|record_ind|first|second|third\n"+input).split("\n")).toDS)
df.show
df.createOrReplaceTempView("df")
+--------+----------+-------+------+-----+
|line_num|record_ind| first|second|third|
+--------+----------+-------+------+-----+
| 0| 00X| Atun| Varma| null|
| 1| 00Y|0002355| IL| USA|
| 2| 00X| Diya| Reddy| null|
| 3| 00Y|0002345| FL| USA|
+--------+----------+-------+------+-----+
最后,您可以按如下所示将每一行连接到其前一行:
spark.sql("""select x.first first_name,x.second last_name,y.first,y.second State,y.third Country
from df x
inner join df y
on x.record_ind = '00X'
and y.record_ind = '00Y'
and y.line_num = x.line_num +1""").show
+----------+---------+-------+-----+-------+
|first_name|last_name| first|State|Country|
+----------+---------+-------+-----+-------+
| Atun| Varma|0002355| IL| USA|
| Diya| Reddy|0002345| FL| USA|
+----------+---------+-------+-----+-------+
如果可以连接任意数量的行,则record_ind必须遵循一种模式才能确定所需的自联接的深度。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。