如何解决错误 MicroBatchExecution:查询因错误 java.lang.NoClassDefFoundError 终止:org/apache/spark/internal/Logging$class
无法将 DataFrame "dfTransformed" 加载到数据库表 (MS sql)
import java.time.LocalDateTime
import java.util.TimeZone
import java.io.File
import com.typesafe.config._
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{BinaryType,DoubleType,LongType,MapType,StringType,StructField,StructType}
import org.apache.spark.sql.functions.{col,concat_ws,from_json,posexplode_outer,schema_of_json}
import sun.nio.cs.StandardCharsets
import org.apache.commons.lang.StringUtils.{left,substring}
import org.xml.sax.SAXParseException
import org.apache.spark.sql.{SparkSession,_}
val loadType = "Load.Azure.Synapse"
val loadLocation = "UbiStaging.CEC"
val sc = SparkContext.getorCreate()
val spark = SparkSession.builder().getorCreate()
TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
val environment = "Development"
var avroPath = ""
val containerPath = "..."
val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd hh:mm:ss.SSS")
var batchStartDate = LocalDateTime.Now()
object Config {
val parsedConfig = ConfigFactory.parseFile(new File("./src/main/Resources/application.conf"))
//val parsedConfig = ConfigFactory.parseFile(new File("E:/Dev/sparkstream/src/main/Resources/application.conf"))
val conf = ConfigFactory.load(parsedConfig)
def apply() = conf.getConfig(environment)
}
avroPath = Config().getString("avrobasepath") + batchStartDate.getYear + "/" + f"${batchStartDate.getMonthValue}%02d" + "/" + f"${batchStartDate.getDayOfMonth}%02d" + "/" + f"${(batchStartDate.getHour)}%02d" + "/*/*"
val checkpointLocation = "C:\\SparkStreams\\CheckPoints\\" + avroPath.replace("*","x").replace("/","_").toLowerCase() + "_" + loadLocation.replace(".","_").toLowerCase()
sc.hadoopConfiguration.set("fs.wasbs.impl","org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set(Config().getString("fsazureaccountname"),Config().getString("fsazureaccountkey"))
val avroSchema = new StructType().add("SequenceNumber",false).add("Offset",false).add("EnqueuedTimeUtc",false).add("SystemProperties",MapType(StringType,StructType(Array(StructField("member0",true),StructField("member1",StructField("member2",StructField("member3",BinaryType,true))),false),false).add("Properties",false).add("Body",true)
val dfRawStream = spark.readStream.schema(avroSchema).option("path",{Config().getString("containerpath") + avroPath}).format("avro").load().select(col("Offset").cast("string"),col("SequenceNumber").cast("string"),col("EnqueuedTimeUtc").cast("string"),col("Body").cast("string"))
//****Transform ***
import com.google.common.net.HostAndPort
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col,date_format,udf,posexplode,unbase64,lit}
import org.apache.spark.sql.types.{ArrayType,StructType}
val bodySchema = new StructType().
add("AppName",true).
add("ClientIP",true).
add("CommandInput",true).
add("CommandName",true).
add("CommandOutput",true).
add("Country",true).
add("EnableIDTransformation",true).
add("EnableSearchFiltering",true).
add("FromCachedData",true).
add("IsACSupported",true).
add("IsInternalOnly",true).
add("Misc",true).
add("OemId",true).
add("OperationName",ArrayType(StringType,true).
add("ProcessEndTime",true).
add("ProcessstartTime",true).
add("ProductCode",true).
add("Region",true).
add("ResultCode",true).
add("TransformlocalIDOnly",true).
add("UserName",true).
add("Version",true)
val commandInputSchema = new StructType().add("inputJson",true)
val inputJsonSchema = new StructType().add("FormatVersion",true).add("PredictiveOptions",true).add("Devices",true)
val devicesSchema = new StructType().add("FormatVersion",true).add("deviceid",true).add("Signatures",true).add("DefiniteAttributes",true).add("PotentialAttributes",true)
val signaturesSchema = new StructType().add("CEC",true)
val cecSchema = new StructType().add("vendorid",true).add("osd",true).add("logicaladdress",true)
val fnGetPort: (String => String) = (ipAddress: String) => {HostAndPort.fromString(ipAddress).getPortOrDefault(0).toString()}
val fnGetHost: (String => String) = (ipAddress: String) => {HostAndPort.fromString(ipAddress).getHostText()}
val sfnGetPort = udf(fnGetPort)
val sfnGetHost = udf(fnGetHost)
val dfBodyStream = dfRawStream.
select("Offset","EnqueuedTimeUtc","SequenceNumber","Body").withColumn("a",from_json(col("Body"),bodySchema)).
select("a.CommandName","a.CommandInput","a.CommandOutput","Offset","a.ClientIP","a.ProcessstartTime","a.ProcessEndTime","EnqueuedTimeUtc").
filter(col("CommandName") === "Predictivediscovery")
val dfDevicesstream = dfBodyStream.
withColumn("a",from_json(col("CommandInput"),commandInputSchema)).
withColumn("inputJsonDecoded",unbase64(col("a.inputJson"))).
withColumn("inputJsonDecodedString",col("inputJsonDecoded").cast(StringType)).
withColumn("b",from_json(col("inputJsonDecodedString"),inputJsonSchema)).
select(col("Offset"),col("ClientIP"),col("ProcessstartTime"),col("ProcessEndTime"),col("SequenceNumber"),col("EnqueuedTimeUtc"),posexplode(col("b.Devices"))).
withColumn("c",from_json(col("col"),devicesSchema)).
select("Offset","ClientIP","ProcessstartTime","ProcessEndTime","c.Signatures")
val dfSignaturesstream = dfDevicesstream.
withColumn("a",from_json(col("Signatures"),signaturesSchema)).
select(col("Offset"),posexplode(col("a.CEC"))).filter(col("col").isNotNull).
withColumn("CEC",col("col")).withColumn("SeqNbr",col("pos") + 1).
withColumn("CECDecoded",unbase64(col("col"))).
withColumn("CECDecodedString",col("CECDecoded").cast(StringType)).
withColumn("b",from_json(col("CECDecodedString"),cecSchema)).
withColumn("CECId",lit("hash_goes_here")).
withColumn("vendorIdParsed",lit("vendor_parsed_goes_here")).
withColumn("OSDParsed",lit("osd_parsed_goes_here")).
withColumn("HEX",lit("hex_goes_here")).
withColumn("OSD",col("b.osd")).
withColumn("vendorId",col("b.vendorid")).
withColumn("LogicalAddress",col("b.logicaladdress")).
select("Offset","CECId","CEC","vendorId","vendorIdParsed","OSD","OSDParsed","HEX","LogicalAddress","SeqNbr")
val dfTransformed = dfSignaturesstream.
withColumn("IP",sfnGetHost(col("ClientIP"))).
withColumn("Port",sfnGetPort(col("ClientIP"))).
withColumn("ProcessstartDateUt",date_format(col("ProcessstartTime"),"yyyyMMdd").cast("integer")).
withColumn("ProcessstartTimeUt","HH:mm:ss.S")).
withColumn("ProcessEndDateUt",date_format(col("ProcessEndTime"),"yyyyMMdd").cast("integer")).
withColumn("ProcessEndTimeUt","HH:mm:ss.S")).
withColumn("EnqueuedTimeUt",col("EnqueuedTimeUtc")).as("a").
select("Offset","a.ProcessstartDateUt","a.ProcessstartTimeUt","a.ProcessEndDateUt","a.ProcessEndTimeUt","a.IP","a.Port","SeqNbr","EnqueuedTimeUt").na.fill("UnkNown").na.fill(-1)
//.filter(col("Offset") === "4900106716867360")
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.streaming.{StreamingQuery,Trigger}
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent,QueryStartedEvent,QueryTerminatedEvent}
val query = dfTransformed.coalesce(1).writeStream.queryName(checkpointLocation).option("checkpointLocation",checkpointLocation).outputMode("append").trigger(Trigger.ProcessingTime("2 minutes")).foreachBatch {
(batchDF: DataFrame,batchId: Long) => batchDF.write.format("com.microsoft.sqlserver.jdbc.spark").mode("append")
.option("batchsize",1000000)
.option("mssqlIsolationLevel","READ_UNCOMMITTED")
.option("url",Config().getString("databaseurl") + ";databaseName=" + Config().getString("databasename") + ";applicationName=" + checkpointLocation)
.option("dbtable","[" + loadLocation.replace(".","].[") + "]").option("user",Config().getString("databaseusername"))
.option("password",Config().getString("databasepassword")).save()
}.start()
加载命令::load ./src/test/scala/file_name.scala
进入Spark Shell: spark-shell --packages org.apache.spark:spark-avro_2.12:3.0.1 --packages com.databricks:spark-xml_2.12:0.10.0 --conf spark.dynamicAllocation.enabled=true --conf spark.shuffle.service.enabled=true --conf spark.executor.memory=2g --conf spark.dynamicAllocation.maxExecutors=1 --jars C:\BigDataLocalSetup\spark3.0\jars\hadoop-azure-3.3.0 .jar,C:\BigDataLocalSetup\spark3.0\jars\azure-storage-8.6.6.jar,C:\BigDataLocalSetup\spark3.0\jars\jetty-util-ajax-9.4.20.v20190813.jar,C :\BigDataLocalSetup\spark3.0\jars\jetty-util-9.4.20.v20190813.jar,C:\BigDataLocalSetup\spark3.0\jars\config-1.4.1.jar
IDE:VS 代码
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。