创建pyspark的spark上下文py4j java网关对象

如何解决创建pyspark的spark上下文py4j java网关对象

我正在尝试将 java 数据帧转换为 pyspark 数据帧。为此,我在 java 进程中创建一个数据帧(或行的数据集)并在 java 端启动一个 py4j.GatewayServer 服务器进程。然后在 python 端,我创建了一个 py4j.java_gateway.JavaGateway() 客户端对象并将其传递给 pyspark 的 SparkContext 构造函数以将其链接到已经启动的 jvm 进程。但我收到此错误:-

File: "path_to_virtual_environment/lib/site-packages/pyspark/conf.py",line 120,in __init__
    self._jconf = _jvm.SparkConf(loadDefaults)
TypeError: 'JavaPackage' object is not callable

有人可以帮忙吗? 以下是我正在使用的代码:-

Java 代码:-

import py4j.GatewayServer
public class TestJavaToPythonTransfer{
    Dataset<Row> df1;
    public TestJavaToPythonTransfer(){
        SparkSession spark = 
              SparkSession.builder().appName("test1").config("spark.master","local").getorCreate();
        df1 = spark.read().json("path/to/local/json_file");
    }
    public Dataset<Row> getDf(){
        return df1;  
    }
    public static void main(String args[]){
       GatewayServer gatewayServer = new GatewayServer(new TestJavaToPythonTransfer());
       gatewayServer.start();
       System.out.println("Gateway server started");
    }
}

Python 代码:-

from pyspark.sql import sqlContext,DataFrame
from pyspark import SparkContext,SparkConf
from py4j.java_gateway import JavaGateway
gateway = JavaGateway()
conf = SparkConf().set('spark.io.encryption.enabled','true')
py_sc = SparkContext(gateway=gateway,conf=conf)
j_df = gateway.getDf()
py_df = DataFrame(j_df,sqlContext(py_sc))
print('print dataframe content')
print(dpy_df.collect())

运行python代码的命令:-

python path_to_python_file.py

我也试过这样做:-

$SPARK_HOME/bin/spark-submit --master local path_to_python_file.py

但是这里虽然代码没有抛出任何错误,但它没有向终端打印任何内容。我需要为此设置一些 spark conf 吗?

P.S - 如果代码中有拼写错误错误,请提前道歉,因为我无法直接从我公司的 IDE 中复制代码错误堆栈。

解决方法

在调用 getDf() 之前缺少对 entry_point 的调用

所以,试试这个:

app = gateway.entry_point
j_df = app.getDf()

此外,我在下面使用 Python 和 Scala 创建了工作副本(希望您不要介意),它显示了如何在 Scala 端 py4j 网关使用 Spark 会话和示例 DataFrame 启动,在 Python 端我访问了该 DataFrame 并转换为Python List[Tuple] 在为 Python 端的 Spark 会话转换回 DataFrame 之前:

Python:

from py4j.java_gateway import JavaGateway
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,IntegerType,StructField

if __name__ == '__main__':
    gateway = JavaGateway()

    spark_app = gateway.entry_point
    df = spark_app.df()

    # Note "apply" method here comes from Scala's companion object to access elements of an array
    df_to_list_tuple = [(int(i.apply(0)),int(i.apply(1))) for i in df]

    spark = (SparkSession
             .builder
             .appName("My PySpark App")
             .getOrCreate())

    schema = StructType([
        StructField("a",IntegerType(),True),StructField("b",True)])

    df = spark.createDataFrame(df_to_list_tuple,schema)

    df.show()

斯卡拉:

import java.nio.file.{Path,Paths}

import org.apache.spark.sql.SparkSession
import py4j.GatewayServer

object SparkApp {
  val myFile: Path = Paths.get(System.getProperty("user.home") + "/dev/sample_data/games.csv")
  val spark = SparkSession.builder()
    .master("local[*]")
    .appName("My app")
    .getOrCreate()

  val df = spark
      .read
      .option("header","True")
      .csv(myFile.toString)
      .collect()

}

object Py4JServerApp extends App {


  val server = new GatewayServer(SparkApp)
  server.start()

  print("Started and running...")
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?