微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

错误 MicroBatchExecution:查询因错误 java.lang.NoClassDefFoundError 终止:org/apache/spark/internal/Logging$class

如何解决错误 MicroBatchExecution:查询因错误 java.lang.NoClassDefFoundError 终止:org/apache/spark/internal/Logging$class

无法将 DataFrame "dfTransformed" 加载到数据库表 (MS sql)

import java.time.LocalDateTime
import java.util.TimeZone
import java.io.File
import com.typesafe.config._
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{BinaryType,DoubleType,LongType,MapType,StringType,StructField,StructType}
import org.apache.spark.sql.functions.{col,concat_ws,from_json,posexplode_outer,schema_of_json}
import sun.nio.cs.StandardCharsets
import org.apache.commons.lang.StringUtils.{left,substring}
import org.xml.sax.SAXParseException
import org.apache.spark.sql.{SparkSession,_}



val loadType = "Load.Azure.Synapse"
val loadLocation = "UbiStaging.CEC"
val sc = SparkContext.getorCreate()
val spark = SparkSession.builder().getorCreate()
TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
val environment = "Development"
var avroPath = ""


val containerPath = "..."
val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd hh:mm:ss.SSS")
var batchStartDate = LocalDateTime.Now()


object Config {
  val parsedConfig = ConfigFactory.parseFile(new File("./src/main/Resources/application.conf"))
  //val parsedConfig = ConfigFactory.parseFile(new File("E:/Dev/sparkstream/src/main/Resources/application.conf"))
  val conf = ConfigFactory.load(parsedConfig)
  def apply() = conf.getConfig(environment)
}

avroPath = Config().getString("avrobasepath") + batchStartDate.getYear + "/" + f"${batchStartDate.getMonthValue}%02d" + "/" + f"${batchStartDate.getDayOfMonth}%02d" + "/" + f"${(batchStartDate.getHour)}%02d" + "/*/*"
val checkpointLocation = "C:\\SparkStreams\\CheckPoints\\" + avroPath.replace("*","x").replace("/","_").toLowerCase() + "_" + loadLocation.replace(".","_").toLowerCase()

sc.hadoopConfiguration.set("fs.wasbs.impl","org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set(Config().getString("fsazureaccountname"),Config().getString("fsazureaccountkey"))

val avroSchema = new StructType().add("SequenceNumber",false).add("Offset",false).add("EnqueuedTimeUtc",false).add("SystemProperties",MapType(StringType,StructType(Array(StructField("member0",true),StructField("member1",StructField("member2",StructField("member3",BinaryType,true))),false),false).add("Properties",false).add("Body",true)

val dfRawStream = spark.readStream.schema(avroSchema).option("path",{Config().getString("containerpath") + avroPath}).format("avro").load().select(col("Offset").cast("string"),col("SequenceNumber").cast("string"),col("EnqueuedTimeUtc").cast("string"),col("Body").cast("string"))

//****Transform ***

import com.google.common.net.HostAndPort
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col,date_format,udf,posexplode,unbase64,lit}
import org.apache.spark.sql.types.{ArrayType,StructType}

    val bodySchema = new StructType().
    add("AppName",true).
    add("ClientIP",true).
    add("CommandInput",true).
    add("CommandName",true).
    add("CommandOutput",true).
    add("Country",true).
    add("EnableIDTransformation",true).
    add("EnableSearchFiltering",true).
    add("FromCachedData",true).
    add("IsACSupported",true).
    add("IsInternalOnly",true).
    add("Misc",true).
    add("OemId",true).
    add("OperationName",ArrayType(StringType,true).
    add("ProcessEndTime",true).
    add("ProcessstartTime",true).
    add("ProductCode",true).
    add("Region",true).
    add("ResultCode",true).
    add("TransformlocalIDOnly",true).
    add("UserName",true).
    add("Version",true)

    val commandInputSchema = new StructType().add("inputJson",true)
    val inputJsonSchema = new StructType().add("FormatVersion",true).add("PredictiveOptions",true).add("Devices",true)
    val devicesSchema = new StructType().add("FormatVersion",true).add("deviceid",true).add("Signatures",true).add("DefiniteAttributes",true).add("PotentialAttributes",true)
    val signaturesSchema = new StructType().add("CEC",true)
    val cecSchema = new StructType().add("vendorid",true).add("osd",true).add("logicaladdress",true)
    val fnGetPort: (String => String) = (ipAddress: String) => {HostAndPort.fromString(ipAddress).getPortOrDefault(0).toString()}
    val fnGetHost: (String => String) = (ipAddress: String) => {HostAndPort.fromString(ipAddress).getHostText()}
    val sfnGetPort = udf(fnGetPort)
    val sfnGetHost = udf(fnGetHost)

    val dfBodyStream = dfRawStream.
    select("Offset","EnqueuedTimeUtc","SequenceNumber","Body").withColumn("a",from_json(col("Body"),bodySchema)).
    select("a.CommandName","a.CommandInput","a.CommandOutput","Offset","a.ClientIP","a.ProcessstartTime","a.ProcessEndTime","EnqueuedTimeUtc").
    filter(col("CommandName") === "Predictivediscovery")

    val dfDevicesstream = dfBodyStream.
    withColumn("a",from_json(col("CommandInput"),commandInputSchema)).
    withColumn("inputJsonDecoded",unbase64(col("a.inputJson"))).
    withColumn("inputJsonDecodedString",col("inputJsonDecoded").cast(StringType)).
    withColumn("b",from_json(col("inputJsonDecodedString"),inputJsonSchema)).
    select(col("Offset"),col("ClientIP"),col("ProcessstartTime"),col("ProcessEndTime"),col("SequenceNumber"),col("EnqueuedTimeUtc"),posexplode(col("b.Devices"))).
    withColumn("c",from_json(col("col"),devicesSchema)).
    select("Offset","ClientIP","ProcessstartTime","ProcessEndTime","c.Signatures")

    val dfSignaturesstream = dfDevicesstream.
    withColumn("a",from_json(col("Signatures"),signaturesSchema)).
    select(col("Offset"),posexplode(col("a.CEC"))).filter(col("col").isNotNull).
    withColumn("CEC",col("col")).withColumn("SeqNbr",col("pos") + 1).
    withColumn("CECDecoded",unbase64(col("col"))).
    withColumn("CECDecodedString",col("CECDecoded").cast(StringType)).
    withColumn("b",from_json(col("CECDecodedString"),cecSchema)).
    withColumn("CECId",lit("hash_goes_here")).
    withColumn("vendorIdParsed",lit("vendor_parsed_goes_here")).
    withColumn("OSDParsed",lit("osd_parsed_goes_here")).
    withColumn("HEX",lit("hex_goes_here")).
    withColumn("OSD",col("b.osd")).
    withColumn("vendorId",col("b.vendorid")).
    withColumn("LogicalAddress",col("b.logicaladdress")).
    select("Offset","CECId","CEC","vendorId","vendorIdParsed","OSD","OSDParsed","HEX","LogicalAddress","SeqNbr")

    val dfTransformed = dfSignaturesstream.
    withColumn("IP",sfnGetHost(col("ClientIP"))).
    withColumn("Port",sfnGetPort(col("ClientIP"))).
    withColumn("ProcessstartDateUt",date_format(col("ProcessstartTime"),"yyyyMMdd").cast("integer")).
    withColumn("ProcessstartTimeUt","HH:mm:ss.S")).
    withColumn("ProcessEndDateUt",date_format(col("ProcessEndTime"),"yyyyMMdd").cast("integer")).
    withColumn("ProcessEndTimeUt","HH:mm:ss.S")).
    withColumn("EnqueuedTimeUt",col("EnqueuedTimeUtc")).as("a").
    select("Offset","a.ProcessstartDateUt","a.ProcessstartTimeUt","a.ProcessEndDateUt","a.ProcessEndTimeUt","a.IP","a.Port","SeqNbr","EnqueuedTimeUt").na.fill("UnkNown").na.fill(-1)
    //.filter(col("Offset") === "4900106716867360")



import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.streaming.{StreamingQuery,Trigger}
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent,QueryStartedEvent,QueryTerminatedEvent}

  val query = dfTransformed.coalesce(1).writeStream.queryName(checkpointLocation).option("checkpointLocation",checkpointLocation).outputMode("append").trigger(Trigger.ProcessingTime("2 minutes")).foreachBatch {
      (batchDF: DataFrame,batchId: Long) => batchDF.write.format("com.microsoft.sqlserver.jdbc.spark").mode("append")
      .option("batchsize",1000000)
      .option("mssqlIsolationLevel","READ_UNCOMMITTED")
      .option("url",Config().getString("databaseurl") + ";databaseName=" + Config().getString("databasename") + ";applicationName=" + checkpointLocation)
      .option("dbtable","[" + loadLocation.replace(".","].[") + "]").option("user",Config().getString("databaseusername"))
      .option("password",Config().getString("databasepassword")).save()
    }.start()

加载命令::load ./src/test/scala/file_name.scala

进入Spark Shell: spark-shell --packages org.apache.spark:spark-avro_2.12:3.0.1 --packages com.databricks:spark-xml_2.12:0.10.0 --conf spark.dynamicAllocation.enabled=true --conf spark.shuffle.service.enabled=true --conf spark.executor.memory=2g --conf spark.dynamicAllocation.maxExecutors=1 --jars C:\BigDataLocalSetup\spark3.0\jars\hadoop-azure-3.3.0 .jar,C:\BigDataLocalSetup\spark3.0\jars\azure-storage-8.6.6.jar,C:\BigDataLocalSetup\spark3.0\jars\jetty-util-ajax-9.4.20.v20190813.jar,C :\BigDataLocalSetup\spark3.0\jars\jetty-util-9.4.20.v20190813.jar,C:\BigDataLocalSetup\spark3.0\jars\config-1.4.1.jar

IDE:VS 代码

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。