如何解决Kinesis Spark Streaming Integration-无法输出DStream内容
我想使用Python创建一个简单的体系结构,以打印在Kinesis中流传输的数据,然后将其发送到Spark Streaming DStream对象。我正在EC2实例中运行所有程序。
我的数据生产者是Kinesis Agent监视/var/documents/
目录。
代理程序日志文件似乎正在解析记录并将它们发送到目的地,但是以某种方式在我打印DStream对象时,什么都没有显示。
我的源代码:
import boto3,random,time
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils,InitialPositionInStream
conf = SparkConf().setAppName("KinesissparkBigDataPipeline")
sc = SparkContext(conf = conf)
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc,2)
def createStream():
"""
Function that creates a DStream Object coming from Kinesis Stream.
Returns:
sparkDStream => DStream object created from records in the Kinesis Stream.
"""
kinesisAppName = ("KinesisstreamTests-%d" % abs(random.randint(0,10000000)))
sparkDStream = KinesisUtils.createStream(
ssc,kinesisAppName,"EntryPoints","https://kinesis.eu-central-1.amazonaws.com","eu-central-1",InitialPositionInStream.LATEST,2
)
return sparkDStream
if __name__ == "__main__":
try:
kinesisstream = createStream()
kinesisstream.pprint()
ssc.start()
time.sleep(60)
ssc.stop()
# ssc.awaitTermination()
except Exception as e:
print(e)
当我运行命令:spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.4.4 poc_bigdata_pipeline.py
时,输出是:
-------------------------------------------
Time: 2020-11-03 11:09:52
-------------------------------------------
-------------------------------------------
Time: 2020-11-03 11:09:54
-------------------------------------------
...
我在那做错了什么吗?如果我忘记了任何重要信息,请原谅我对此很陌生。
感谢阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。