气流火花提交操作符

如何解决气流火花提交操作符

我将 spark2 的命令提交为:

    value = Varibale.get('value')
    cmd = """
                    spark2-submit --master yarn --deploy-mode cluster 
                    --driver-memory=10G 
                    --conf spark.dynamicAllocation.minexecutors=5
                    --conf spark.dynamicAllocation.maxExecutors=10 
                    --queue test 
                    --executor-memory=10G
                    --executor-cores=2
                    --conf spark.yarn.driver.memoryOverhead=5120
                    --conf spark.driver.maxResultSize=2G 
                    --conf spark.yarn.executor.memoryOverhead=5120 
                    --conf spark.kryoserializer.buffer.max=1000m 
                    --conf spark.executor.extrajavaoptions=-XX:+UseG1GC 
                    --conf spark.network.timeout=15000s
                    --conf spark.executor.heartbeatInterval=1500s 
                    --conf spark.task.maxDirectResultSize=8G 
                    --principal test-host@test
                    --keytab /home/test-host.keytab 
                    --conf spark.ui.view.acls="*" 
                    /home/test/test.py {0}
                    """.format(value)

    test = SSHOperator(task_id='TEST',ssh_conn_id='test-conn',command=cmd
                   )

我希望将其转换为 SparkSubmitOperator。另外,我需要 spark2 提交。

如何将上面的转换为 SparkSubmitOperator? 到目前为止,我已经尝试过:

          
                                      
SparkSubmitOperator(task_id='TEST',conn_id='test-conn',application=f'/home/test/test.py {0}'.format(value),executor_cores=2,executor_memory='10g',)

解决方法

Airflow 中的 SparkSubmitOperator 需要的选项可以在字典中发送。请记住,字典中的键应该与函数的参数名称相同。

创建以下两个字典:

base_config = {
    "task_id":"TEST","conn_id":"test-conn","application": "/home/test/test.py"
    "executor-memory":"10G","driver-memory":"10G","executor-cores":2,"principal":"test-host@test","keytab":"/home/test-host.keytab","env_vars":{"SPARK_MAJOR_VERSION":2}
    }

spark_config = {
    "spark.master": "yarn","spark.submit.deployMode": "client","spark.yarn.queue":"test","spark.dynamicAllocation.minExecutors":5,"spark.dynamicAllocation.maxExecutors":10,"spark.yarn.driver.memoryOverhead":5120,"spark.driver.maxResultSize":"2G","spark.yarn.executor.memoryOverhead":5120,"spark.kryoserializer.buffer.max":"1000m","spark.executor.extraJavaOptions":"-XX:+UseG1GC","spark.network.timeout":"15000s","spark.executor.heartbeatInterval":"1500s","spark.task.maxDirectResultSize":"8G","spark.ui.view.acls":"*"
}

SparkSubmitOperator(**base_config,conf=spark_config)

这应该会驱动您的流程配置。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?