微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark 日志到数据框,从日志文件创建数据框

如何解决Spark 日志到数据框,从日志文件创建数据框

#Spark #Python

目标:

读取日志文件的位置,从日志中提取csv文本表格数据并打印表格数据的json(表格列(CSV检索到的表格列+序列号+时间戳)

  1. 数据库中读取serial_no、time、s3_path
  2. s3_path 包含 csv 文件
  3. 输出需要作为表中列的数据框 + primary_key + 时间戳

当前代码伪:

df = sparkSession.read \
            .format("com.databricks.spark.redshift") \
            .option("url","some url with id{}&password={}".format(
                        redshift_user,redshift_pass)) \
            .option("query",query) \
            .option("tempdir",s3_redshift_temp_dir) \
            .option("forward_spark_s3_credentials",True)
            df = df_context.load()
+-------------+-------------------+--------------------+
|serial_number|          test_date|             s3_path|
+-------------+-------------------+--------------------+
|     A0123456|2019-07-10 04:11:52|s3://test-bucket-...|
|     A0123456|2019-07-24 23:48:03|s3://test-bucket-...|
|     A0123456|2019-07-22 20:56:57|s3://test-bucket-...|
|     A0123456|2019-07-22 20:56:57|s3://test-bucket-...|
|     A0123456|2019-07-22 20:58:36|s3://test-bucket-...|
+-------------+-------------------+--------------------+

由于我们无法将 spark 上下文传递给工作节点,因此使用 boto3 读取文本文件并处理文本以获取 csv 表结构。 此处不共享用于从日志中检索表的专有代码

spark.udf.register("read_s3_file",read_s3_file)
df_with_string_csv = df.withColumn('files_dataframes',read_s3_file(drive_event_tab.s3_path))

df_with_string_csv 现在包含以下示例

+-------------+-------------------+--------------------+----------------------+
|serial_number|          test_date|             s3_path|    table_csv_data    |
+-------------+-------------------+--------------------+----------------------+
|     1050D1B0|2019-05-07 15:41:11|s3://test-bucket-...|col1,col2,col3,col4...|
|     1050D1B0|2019-05-07 15:41:11|s3://test-bucket-...|col1,col4...|
|     1050D1BE|2019-05-08 09:26:55|s3://test-bucket-...|col1,col4...|
|     A0123456|2019-07-25 06:54:28|s3://test-bucket-...|col1,col4...|
|     A0123456|2019-07-22 21:07:21|s3://test-bucket-...|col1,col4...|
|     A0123456|2019-07-25 00:19:52|s3://test-bucket-...|col1,col4...|
|     A0123456|2019-07-24 22:24:40|s3://test-bucket-...|col1,col4...|
|     A0123456|2019-09-12 22:15:19|s3://test-bucket-...|col1,col4...|
|     A0123456|2019-07-22 21:27:56|s3://test-bucket-...|col1,col4...|
+-------------+-------------------+--------------------+----------------------+

示例 table_csv_data 列包含:

timestamp,partition,offset,key,value
1625218801350,97,33009,2CKXTKAT_20210701193302_6400_UCMP,458969040
1625218801349,41,33018,3FGW9S6T_20210701193210_6400_UCMP,17569160

尝试实现如下最终数据框,请帮忙

+-------------+-------------------+--------------------+-----------------+-----------+-----------------------------------+--------------+
|serial_number|          test_date|           timestamp|    partition    |  offset   |    key                            |    value     |
+-------------+-------------------+--------------------+-----------------+-----------+-----------------------------------+--------------+
|     1050D1B0|2019-05-07 15:41:11| 1625218801350      |    97           |  33009    | 2CKXTKAT_20210701193302_6400_UCMP |  458969040   |
|     1050D1B0|2019-05-07 15:41:11| 1625218801349      |    41           |  33018    | 3FGW9S6T_20210701193210_6400_UCMP |  17569160    |               
..
..
..

+-------------+-------------------+--------------------+----------------------+

解决方法

对于 Spark 2.4.0+,您只需要拆分、爆炸和 array_except

的一些组合

请使用repartition进行优化,因为explode可能会创建很多行。

from pyspark.sql.session import SparkSession
from pyspark.sql.functions import split,explode,col,array_except,array,trim
from pyspark.sql.types import IntegerType

spark = SparkSession.builder.getOrCreate()

df = df \
    .withColumn('table_csv_data',split(col('table_csv_data'),'\n')) \
    .withColumn('table_csv_data',array_except(col('table_csv_data'),array([col('table_csv_data')[0]]))) \
    .withColumn('table_csv_data',explode(col('table_csv_data'))) \
    .withColumn('table_csv_data',',')) \
    .withColumn('timestamp',trim(col('table_csv_data')[0])) \
    .withColumn('partition',trim(col('table_csv_data')[1])) \
    .withColumn('offset',trim(col('table_csv_data')[2])) \
    .withColumn('key',trim(col('table_csv_data')[3])) \
    .withColumn('value',trim(col('table_csv_data')[4])) \
    .drop('table_csv_data')


df.show(truncate=False)


+-------------+-------------------+-----------------+-------------+---------+------+---------------------------------+---------+
|serial_number|test_date          |s3_path          |timestamp    |partition|offset|key                              |value    |
+-------------+-------------------+-----------------+-------------+---------+------+---------------------------------+---------+
|1050D1B0     |2019-05-07 15:41:11|s3://test-bucket-|1625218801350|97       |33009 |2CKXTKAT_20210701193302_6400_UCMP|458969040|
|1050D1B0     |2019-05-07 15:41:11|s3://test-bucket-|1625218801349|41       |33018 |3FGW9S6T_20210701193210_6400_UCMP|17569160 |
|1050D1B0     |2019-05-07 15:41:11|s3://test-bucket-|1625218801350|97       |33009 |2CKXTKAT_20210701193302_6400_UCMP|458969040|
|1050D1B0     |2019-05-07 15:41:11|s3://test-bucket-|1625218801349|41       |33018 |3FGW9S6T_20210701193210_6400_UCMP|17569160 |
+-------------+-------------------+-----------------+-------------+---------+------+---------------------------------+---------+

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?