微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

独立运行的 UDF 引发错误

如何解决独立运行的 UDF 引发错误

我正在运行我的 spark 程序,它在本地工作但不是远程工作。 我的程序有这些组件(容器):

  • 我的应用程序基于 spring(用于 REST 调用),它启动一个驱动程序(带有 getorCreate 的 Spark 会话)并具有我构建的所有转换器。
  • 基于 bitnami 图像的 Spark Master。
  • Spark Worker 基于 bitnami 映像,但也具有我的应用程序的所有依赖项(即 /dependencies 目录下的所有 jar)。

本地一切正常,但远程运行带有 UDF 的转换器时出现该错误(其余转换器(即没有 UDF)工作正常):

Caused by: java.lang.classCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of在 org.apache.spark.rdd.MapPartitionsRDD 的 java.io.ObjectStreamClass$FieldReflector.setobjFieldValues(ObjectStreamClass.java:2301) 的 java.io.ObjectStreamClass.setobjFieldValues(ObjectStreamClass.java:1431) 的实例中键入 scala.collection.Seq在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)在 java.io.ObjectInputStream.readobject0(ObjectInputStream.java:1625) 在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) 在 java.io.ObjectInputStream.readobject0(ObjectInputStream.java:1625) 在 java.io.ObjectInputStream.readobject(ObjectInputStream.java:465)在 java.io.ObjectInputStream.readobject(ObjectInputStream.java:423) at scala.collection.immutable.List$SerializationProxy.readobject(List.scala:490) at sun.reflect.GeneratedMethodAccessor3.invoke(UnkNown Source)在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ...

这是我的 spark 会话代码

val sparkConf = new SparkConf()
.setMaster("spark://spark-master:7077")
.setAppName("My-App")
.set("spark.executor.extraClasspath","/dependencies/*")

val spark = SparkSession.builder().config(sparkConf).getorCreate()

因此,具有外部依赖项的作业工作正常,但 UDF 会产生上述错误。 我还尝试将我的应用程序 jar(其中包含驱动程序和 spring 代码以及工作程序中已经存在的所有其他依赖项)添加到工作程序中的依赖项文件夹,但仍然产生错误。还尝试将其放置在与驱动程序相同的位置的工作器中,并使用“spark.jars”将其位置添加到 sparkConf,但没有成功。 有什么建议吗?

解决方法

经过大量的谷歌搜索后,我发现了如何集成 Spring-Boot 和 Spark 的解决方案。 我需要更改我的 pom 以使用 shade 插件制作一个 uber-jar。所以我换了这个:

            <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <configuration>
                <fork>true</fork>
                <executable>true</executable>
            </configuration>
            <executions>
                <execution>
                    <goals>
                        <goal>repackage</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

与:

            <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.4</version>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring-boot.version}</version>
                </dependency>
            </dependencies>
            <configuration>
                <keepDependenciesWithProvidedScope>false</keepDependenciesWithProvidedScope>
                <createDependencyReducedPom>false</createDependencyReducedPom>
                <filters>
                    <filter>
                        <artifact>*:*</artifact>
                        <excludes>
                            <exclude>module-info.class</exclude>
                            <exclude>META-INF/*.SF</exclude>
                            <exclude>META-INF/*.DSA</exclude>
                            <exclude>META-INF/*.RSA</exclude>
                        </excludes>
                    </filter>
                </filters>
                <transformers>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                        <resource>META-INF/spring.handlers</resource>
                    </transformer>
                    <transformer
                            implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
                        <resource>META-INF/spring.factories</resource>
                    </transformer>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                        <resource>META-INF/spring.schemas</resource>
                    </transformer>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass>${start-class}</mainClass>
                    </transformer>
                </transformers>
            </configuration>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

然后,我将项目 jar 添加到每个工人,并将这些配置添加到 Spark 会话:

    "spark.executor.extraClassPath","/path/app.jar","spark.driver.extraClassPath","spark.jars",

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?