如何解决创建pyspark的spark上下文py4j java网关对象
我正在尝试将 java 数据帧转换为 pyspark 数据帧。为此,我在 java 进程中创建一个数据帧(或行的数据集)并在 java 端启动一个 py4j.GatewayServer 服务器进程。然后在 python 端,我创建了一个 py4j.java_gateway.JavaGateway() 客户端对象并将其传递给 pyspark 的 SparkContext 构造函数以将其链接到已经启动的 jvm 进程。但我收到此错误:-
File: "path_to_virtual_environment/lib/site-packages/pyspark/conf.py",line 120,in __init__
self._jconf = _jvm.SparkConf(loadDefaults)
TypeError: 'JavaPackage' object is not callable
有人可以帮忙吗? 以下是我正在使用的代码:-
Java 代码:-
import py4j.GatewayServer
public class TestJavaToPythonTransfer{
Dataset<Row> df1;
public TestJavaToPythonTransfer(){
SparkSession spark =
SparkSession.builder().appName("test1").config("spark.master","local").getorCreate();
df1 = spark.read().json("path/to/local/json_file");
}
public Dataset<Row> getDf(){
return df1;
}
public static void main(String args[]){
GatewayServer gatewayServer = new GatewayServer(new TestJavaToPythonTransfer());
gatewayServer.start();
System.out.println("Gateway server started");
}
}
Python 代码:-
from pyspark.sql import sqlContext,DataFrame
from pyspark import SparkContext,SparkConf
from py4j.java_gateway import JavaGateway
gateway = JavaGateway()
conf = SparkConf().set('spark.io.encryption.enabled','true')
py_sc = SparkContext(gateway=gateway,conf=conf)
j_df = gateway.getDf()
py_df = DataFrame(j_df,sqlContext(py_sc))
print('print dataframe content')
print(dpy_df.collect())
运行python代码的命令:-
python path_to_python_file.py
我也试过这样做:-
$SPARK_HOME/bin/spark-submit --master local path_to_python_file.py
但是这里虽然代码没有抛出任何错误,但它没有向终端打印任何内容。我需要为此设置一些 spark conf 吗?
P.S - 如果代码中有拼写错误或错误,请提前道歉,因为我无法直接从我公司的 IDE 中复制代码和错误堆栈。
解决方法
在调用 getDf() 之前缺少对 entry_point 的调用
所以,试试这个:
app = gateway.entry_point
j_df = app.getDf()
此外,我在下面使用 Python 和 Scala 创建了工作副本(希望您不要介意),它显示了如何在 Scala 端 py4j 网关使用 Spark 会话和示例 DataFrame 启动,在 Python 端我访问了该 DataFrame 并转换为Python List[Tuple] 在为 Python 端的 Spark 会话转换回 DataFrame 之前:
Python:
from py4j.java_gateway import JavaGateway
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,IntegerType,StructField
if __name__ == '__main__':
gateway = JavaGateway()
spark_app = gateway.entry_point
df = spark_app.df()
# Note "apply" method here comes from Scala's companion object to access elements of an array
df_to_list_tuple = [(int(i.apply(0)),int(i.apply(1))) for i in df]
spark = (SparkSession
.builder
.appName("My PySpark App")
.getOrCreate())
schema = StructType([
StructField("a",IntegerType(),True),StructField("b",True)])
df = spark.createDataFrame(df_to_list_tuple,schema)
df.show()
斯卡拉:
import java.nio.file.{Path,Paths}
import org.apache.spark.sql.SparkSession
import py4j.GatewayServer
object SparkApp {
val myFile: Path = Paths.get(System.getProperty("user.home") + "/dev/sample_data/games.csv")
val spark = SparkSession.builder()
.master("local[*]")
.appName("My app")
.getOrCreate()
val df = spark
.read
.option("header","True")
.csv(myFile.toString)
.collect()
}
object Py4JServerApp extends App {
val server = new GatewayServer(SparkApp)
server.start()
print("Started and running...")
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。