微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

spline spark agent jar 在后处理过程中出现错误

如何解决spline spark agent jar 在后处理过程中出现错误

我一直在尝试使用新的样条 jsr 运行以下代码:za.co.absa.spline.agent.spark:spark-3.0-spline-agent-bundle_2.12:0.6.0 但一直出现错误特定于 UserExtraMetadataProvider,在较新版本中已弃用。我还尝试使用第一个代码块下方显示代码将 UserExtraMetadataProvider 替换为 UserExtraAppendingPostProcessingFilter,但仍然出现错误。您能否验证并分享如何使用新的样条包正确编写后处理过滤器代码

%scala
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.extra.UserExtraMetadataProvider
import za.co.absa.spline.harvester.HarvestingContext
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.producer.model._
import scala.util.parsing.json.JSON

val splineConf: Configuration = StandardSplineConfigurationStack(spark)

spark.enableLineageTracking(new DefaultSplineConfigurer(splineConf) {
//override protected def userExtraMetadataProvider = new UserExtraMetaDataProvider {
//val test = dbutils.notebook.getContext.notebookPath
val notebookinformationjson = dbutils.notebook.getContext.toJson
val outerMap = JSON.parseFull(notebookinformationjson).getorElse(0).asInstanceOf[Map[String,String]]
val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]

val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
val notebookPath = extraContextMap("notebook_path").split("/")

val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash")
val user = tagMap("user")
val name = notebookPath(notebookPath.size-1)

val notebookInfo = Map("notebookURL" -> notebookURL,"user" -> user,"name" -> name,"mounts" -> dbutils.fs.ls("/mnt").map(_.path),"timestamp" -> System.currentTimeMillis)
val notebookInfoJson = scala.util.parsing.json.JSONObject(notebookInfo)

override protected def userExtraMetadataProvider: UserExtraMetadataProvider = new UserExtraMetadataProvider {
override def forExecEvent(event: ExecutionEvent,ctx: HarvestingContext): Map[String,Any] = Map("foo" -> "bar1")
override def forExecPlan(plan: ExecutionPlan,Any] = Map("notebookInfo" -> notebookInfoJson) // tilføj mount info til searchAndReplace denne funktion indeholder infoen
override def forOperation(op: ReadOperation,Any] = Map("foo" -> "bar3")
override def forOperation(op: WriteOperation,Any] = Map("foo" -> "bar4")
override def forOperation(op: DataOperation,Any] = Map("foo" -> "bar5")
}
})

这是仍然有错误的更新代码

%scala
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.extra.UserExtraMetadataProvider
import za.co.absa.spline.harvester.HarvestingContext
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.producer.model._
import play.api.libs.json._

val splineConf: Configuration = StandardSplineConfigurationStack(spark)

spark.enableLineageTracking(new DefaultSplineConfigurer(splineConf) {
val notebookinformationjson = Json.toJson(dbutils.notebook.getContext)
val outerMap = Json.toJson(notebookinformationjson).getorElse(0).asInstanceOf[Map[String,String]]
val notebookPath = extraContextMap("notebook_path").split("/")

val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash")
val user = tagMap("user")
val name = notebookPath(notebookPath.size-1)

val notebookInfo = Map("notebookURL" -> Json.toJson(notebookURL),"user" -> Json.toJson(user),"name" -> Json.toJson(name),"mounts" -> Json.toJson(dbutils.fs.ls("/mnt").map(_.path)),"timestamp" -> Json.toJson(System.currentTimeMillis))

val notebookInfoJson = Json.toJson(notebookInfo)

def userExtraMetadataProvider: UserExtraAppendingPostProcessingFilter
= new UserExtraAppendingPostProcessingFilter

{
def processExecutionEvent(event: ExecutionEvent,Any] = Map("foo" -> "bar1")
def processExecutionPlan (plan: ExecutionPlan,Any] = Map("notebookInfo" -> notebookInfoJson)
def processReadOperation(op: ReadOperation,Any] = Map("foo" -> "bar3")
def processWriteOperation(op: WriteOperation,Any] = Map("foo" -> "bar4")
def processDataOperation(op: DataOperation,Any] = Map("foo" -> "bar5")
}
})

这里是错误

    command-2044409137370707:12: error: not enough arguments for constructor DefaultSplineConfigurer: (sparkSession: org.apache.spark.sql.SparkSession,userConfiguration: org.apache.commons.configuration.Configuration)za.co.absa.spline.harvester.conf.DefaultSplineConfigurer.
Unspecified value parameter userConfiguration.
spark.enableLineageTracking(new DefaultSplineConfigurer(splineConf) {
                                ^
command-2044409137370707:32: error: not found: type UserExtraAppendingPostProcessingFilter
 def userExtraMetadataProvider: UserExtraAppendingPostProcessingFilter
                                ^
command-2044409137370707:33: error: not found: type UserExtraAppendingPostProcessingFilter
 = new UserExtraAppendingPostProcessingFilter
       ^
command-2044409137370707:37: error: not found: type ExecutionEvent
    def processExecutionEvent(event: ExecutionEvent,Any] = Map("foo" -> "bar1")
                                     ^
command-2044409137370707:38: error: not found: type ExecutionPlan
    def processExecutionPlan (plan: ExecutionPlan,Any] = Map("notebookInfo" -> notebookInfoJson)
                                    ^
command-2044409137370707:39: error: not found: type ReadOperation
    def processReadOperation(op: ReadOperation,Any] = Map("foo" -> "bar3")
                                 ^
command-2044409137370707:40: error: not found: type WriteOperation
    def processWriteOperation(op: WriteOperation,Any] = Map("foo" -> "bar4")
                                  ^
command-2044409137370707:41: error: not found: type DataOperation
    def processDataOperation(op: DataOperation,Any] = Map("foo" -> "bar5")
                                 ^
command-2044409137370707:36: warning: a pure expression does nothing in statement position; multiline expressions may require enclosing parentheses
  {
  ^

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。