如何解决如何在pyspark中处理动态类型的实时日志流?
我正在尝试制作一个Spark应用程序来处理和处理实时流式动态日志。 下面是日志结构:
2020-09-24S08:07:54.181Z ip1 Sep 08:07:54 region1 staus1: Deny tcp src outside-myip1/100 dst inside:mydstIP1/80
2020-09-24S08:07:55.181Z ip2 Sep 08:07:54 region2 staus2: Deny tcp src outside-myip2/101 dst inside:mydstIP2/80
2020-09-24S08:07:56.181Z ip3 Sep 08:07:54 region3 staus3: Deny tcp src outside-myip3/102 dst inside:mydstIP3/80
2020-09-24S08:07:57.181Z ip4 Sep 08:07:54 region4 staus4: other requested to drop TCP packet from outside-myip01/132 to dmz:myip02/443 by the IT Group
2020-09-24S08:07:58.181Z ip5 Sep 08:07:54 region5 staus5: Deny tcp src outside-myip4/103 dst inside:mydstIP4/80
2020-09-24S08:07:59.181Z ip6 Sep 08:07:54 region6 staus6: Deny tcp src outside-myip5/104 dst inside:mydstIP5/80
2020-09-24S08:07:57.181Z ip4 Sep 08:07:54 region4 staus04: other requested to drop TCP packet from outside-myip04/132 to dmz:myip02/443 by the IT Group
2020-09-24S08:08:00.181Z ip7 Sep 08:07:54 region7 staus7: Deny tcp src outside-myip6/105 dst inside:mydstIP6/80
我创建了以下模式,以将上面的日志转换为“结构化数据框”。
schemaDf = StructType([
StructField(" Date",DateType()),StructField("Source IP",StringType()),StructField("Month",StructField("Time Stamp",StructField("Region",StructField("status",StructField("Action",StructField("Protocol",StructField("From",StructField("Source Value",StructField("To",StructField("Destincation value",])
df = session.read.option("header","true").option("delimiter"," ").csv("F:mypath\\firewall.txt",schema=schemaDf)
df.show()
结果:
+----------+---------+-----+----------+-------+--------+------+---------+----+-----------------+---+------------------+
| Date|Source IP|Month|Time Stamp| Region| status|Action| Protocol|From| Source Value| To|Destincation value|
+----------+---------+-----+----------+-------+--------+------+---------+----+-----------------+---+------------------+
|2020-09-24| ip2| Sep| 08:07:54|region2| staus2:| Deny| tcp| src|outside-myip2/101|dst|inside:mydstIP2/80|
|2020-09-24| ip3| Sep| 08:07:54|region3| staus3:| Deny| tcp| src|outside-myip3/102|dst|inside:mydstIP3/80|
|2020-09-24| ip4| Sep| 08:07:54|region4| staus4:| other|requested| to| drop|TCP| packet|
|2020-09-24| ip5| Sep| 08:07:54|region5| staus5:| Deny| tcp| src|outside-myip4/103|dst|inside:mydstIP4/80|
|2020-09-24| ip6| Sep| 08:07:54|region6| staus6:| Deny| tcp| src|outside-myip5/104|dst|inside:mydstIP5/80|
|2020-09-24| ip4| Sep| 08:07:54|region4|staus04:| other|requested| to| drop|TCP| packet|
|2020-09-24| ip7| Sep| 08:07:54|region7| staus7:| Deny| tcp| src|outside-myip6/105|dst|inside:mydstIP6/80|
+----------+---------+-----+----------+-------+--------+------+---------+----+-----------------+---+------------------+
此处创建的架构不适合处理“动态”列中“值”为动态的动态日志。即:“操作”列中的“其他”具有较长的字符串,不适合所定义的架构。
所以我想知道什么是处理此类情况的正确方法...
我是否需要为“其他”值创建新的架构?
感谢帮助吗?
谢谢
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。