微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用由键值对组成的字符串创建 DataFrame?

如何解决如何使用由键值对组成的字符串创建 DataFrame?

我从 CEF Format 中的防火墙获取日志作为字符串,如下所示:

ABC|XYZ|F123|1.0|DSE|DSE|4|externalId=e705265d0d9e4d4fcb218b cn2=329160 cn1=3053998 dhost=SRV2019 duser=admin msg=Process accessed NTDS fname=ntdsutil.exe filePath=\\Device\\HarddiskVolume2\\Windows\\System32 cs5="C:\\Windows\\system32\\ntdsutil.exe" "ac i ntds" ifm "create full ntdstest3" q q filehash=80c8b68240a95 dntdom=adminDomain cn3=13311 rt=1610948650000 tactic=Credential Access technique=Credential Dumping objective=Gain Access patterndisposition=Detection. outcome=0

如何从这种字符串创建一个 DataFrame,其中我得到由 = 分隔的键值对?

我的目标是使用键动态地从该字符串推断模式,即从 = 运算符的左侧提取键并使用它们创建模式。

我目前所做的非常蹩脚(恕我直言)并且方法不是很动态。(因为键值对的数量可以根据不同类型的日志而变化)

val a: String = "ABC|XYZ|F123|1.0|DSE|DCE|4|externalId=e705265d0d9e4d4fcb218b cn2=329160 cn1=3053998 dhost=SRV2019 duser=admin msg=Process accessed NTDS fname=ntdsutil.exe filePath=\\Device\\HarddiskVolume2\\Windows\\System32 cs5="C:\\Windows\\system32\\ntdsutil.exe" "ac i ntds" ifm "create full ntdstest3" q q filehash=80c8b68240a95 dntdom=adminDomain cn3=13311 rt=1610948650000 tactic=Credential Access technique=Credential Dumping objective=Gain Access patterndisposition=Detection. outcome=0"

val ttype: String = "DCE"

type parseReturn = (String,String,List[String],Int)

def cefParser(a: String,ttype: String): parseReturn = {
    val firstPart = a.split("\\|")
    var pD = new ListBuffer[String]()
    var listSize: Int = 0
    if (firstPart.size == 8 && firstPart(4) == ttype) {
      pD += firstPart(0)
      pD += firstPart(1)
      pD += firstPart(2)
      pD += firstPart(3)
      pD += firstPart(4)
      pD += firstPart(5)
      pD += firstPart(6)
      val secondPart = parseSecondPart(firstPart(7),ttype)
      pD ++= secondPart
      listSize = pD.toList.length
      (firstPart(2),ttype,pD.toList,listSize)
    } else {
      val temp: List[String] = List(a)
      (firstPart(2),"IRRELEVANT",temp,temp.length)
    }
  }

  

parseSecondPart 方法是:

def parseSecondPart(m:String,ttype:String): ListBuffer[String] = ttype match {
    case auditactivity.ttype=>parseAuditEvent(m)

一个函数调用只是替换日志中的一些文本

def parseAuditEvent(msg: String): ListBuffer[String] = {
    val updated_msg = msg.replace("cat=","Metadata_event_type=")
      .replace("destinationtranslatedaddress=","event_user_ip=")
      .replace("duser=","event_user_id=")
      .replace("deviceprocessname=","event_service_name=")
      .replace("cn3=","Metadata_offset=")
      .replace("outcome=","event_success=")
      .replace("devicecustomdate1=","event_utc_timestamp=")
      .replace("rt=","Metadata_event_creation_time=")

    parseEvent(updated_msg)
  }

获取值的最终函数

def parseEvent(msg: String): ListBuffer[String] = {
    val newMsg = msg.replace("\\=","$_equal_$")
    val pD = new ListBuffer[String]()
    val splitData = newMsg.split("=")
    val mSize = splitData.size
    for (i <- 1 until mSize) {
      if(i < mSize-1) {
        val a = splitData(i).split(" ")
        val b = a.size-1
        val c = a.slice(0,b).mkString(" ")
        pD += c.replace("$_equal_$","=")
      } else if(i == mSize-1) {
        val a = splitData(i).replace("$_equal_$","=")
        pD += a
      } else {
        logExceptions(newMsg)
      }
    }
    pD
  }

返回在第 3 个位置包含一个 ListBuffer[String],我使用它创建了一个 DataFrame,如下所示:

val df = ss.sqlContext
    .createDataFrame(tempRDD.filter(x => x._1 != "IRRELEVANT")
    .map(x => Row.fromSeq(x._3)),schema)

stackoverflow 的人们,我真的需要你的帮助来改进我的代码,无论是性能还是方法。 任何形式的帮助和/或建议将不胜感激。 提前致谢。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。