微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Kinesis Spark Streaming Integration-无法输出DStream内容

如何解决Kinesis Spark Streaming Integration-无法输出DStream内容

我想使用Python创建一个简单的体系结构,以打印在Kinesis中流传输的数据,然后将其发送到Spark Streaming DStream对象。我正在EC2实例中运行所有程序。

我的数据生产者是Kinesis Agent监视/var/documents/目录。 代理程序日志文件似乎正在解析记录并将它们发送到目的地,但是以某种方式在我打印DStream对象时,什么都没有显示

我的源代码

import boto3,random,time
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils,InitialPositionInStream

conf = SparkConf().setAppName("KinesissparkBigDataPipeline")

sc = SparkContext(conf = conf)
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc,2)

def createStream():
    """
    Function that creates a DStream Object coming from Kinesis Stream.

    Returns:
        sparkDStream => DStream object created from records in the Kinesis Stream.
    """
    kinesisAppName = ("KinesisstreamTests-%d" % abs(random.randint(0,10000000)))
    sparkDStream = KinesisUtils.createStream(
            ssc,kinesisAppName,"EntryPoints","https://kinesis.eu-central-1.amazonaws.com","eu-central-1",InitialPositionInStream.LATEST,2
    )
    return sparkDStream

if __name__ == "__main__":
    try:
        kinesisstream = createStream()
        kinesisstream.pprint()

        ssc.start()
        time.sleep(60)
        ssc.stop()
        # ssc.awaitTermination()
    except Exception as e:
        print(e)
当我运行命令:spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.4.4 poc_bigdata_pipeline.py时,

输出是:

-------------------------------------------
Time: 2020-11-03 11:09:52
-------------------------------------------

-------------------------------------------
Time: 2020-11-03 11:09:54
-------------------------------------------

...

我在那做错了什么吗?如果我忘记了任何重要信息,请原谅我对此很陌生。

感谢阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。