如何解决Py4JJavaError:作业由于阶段失败而中止:阶段460.0中的任务0失败4次
我在用pyspark编写的Spark Streaming代码中遇到了这个奇怪的错误。我尝试调试此代码,但没有任何原因
https://www.example.com
错误日志:-
import os
from pyspark.sql.types import *
import json
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from uuid import uuid1
from pyspark.sql import Row,SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql import sqlContext
import time as t
import shutil
pkey = "userid"
def getsqlContextInstance(sparkContext):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = sqlContext(sparkContext)
return globals()['sqlContextSingletonInstance']
def processRDDs(time,rdd):
print("========= %s =========" % str(time))
if rdd.isEmpty() > 0: #RDD is Empty
print("*****************************EMPTY RDD ********************************")
else:
sqlContext = getsqlContextInstance(rdd.context)
myRDD = rdd.map(lambda y : json.dumps(y))
newRDD = sc.parallelize(myRDD.collect(),40)
df0 = sqlContext.read.json(newRDD)
df0.createOrReplaceTempView("mytable")
newDF = spark.sql( ''' select userid,userdevicetype,date_format(cast(timestamp as timestamp),'yyyy-MM-dd HH:mm:ss.SSS') timestamp,reminderemails,referredname,paymentmade,joined,inviteid,emailaddress from mytable ''')
newDF = newDF.select([col(c).cast("string") for c in newDF.columns])
newDF.coalesce(3).write.mode("append").parquet("s3://bucket/file")
if __name__ == "__main__":
sc = SparkContext(appName="app_test")
ssc = StreamingContext(sc,120)
print("spark context set")
spark = SparkSession(sc)
sqlContext = sqlContext(sc)
zkQuorum,topic = 'ip_Add','topic'
kvs = KafkaUtils.createStream(ssc,zkQuorum,"grp1",{topic: 1},{"auto.offset.reset" : "smallest"})
print("********************************************connection set****************************************************************")
dstream = kvs.map(lambda x: json.loads(x[1]))
dstream.foreachRDD(processRDDs)
ssc.start()
ssc.awaitTermination()
我无法找到根本原因。每次我重新运行此代码,它运行良好。但是经常出现此错误
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。