微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

数据转换后将kafka流数据帧保存到Databricks中的Redis

如何解决数据转换后将kafka流数据帧保存到Databricks中的Redis

在对数据执行聚合后,我正在使用 pyspark 将 kafka 流定向到 redis。最终输出一个流数据名。

我连接到 kafka 流的代码。 (你可能会发现我的代码是外行,请无视)

app_schema = StructType([
        StructField("applicationId",StringType(),True),StructField("applicationTimeStamp",True)
    ])

# group_id = "mygroup"
topic = "com.mobile-v1"
bootstrap_servers = "server-1:9093,server-2:9093,server-3:9093"

options = {
    "kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="user@stream.com" password="xxxxx";',\
    "kafka.ssl.ca.location": "/tmp/cert.crt",\
    "kafka.sasl.mechanism": "PLAIN",\
    "kafka.security.protocol" : "SASL_SSL",\
    "kafka.bootstrap.servers": bootstrap_servers,\
    "failOnDataLoss": "false",\
    "subscribe": topic,\
    "startingOffsets": "latest",\
    "enable.auto.commit": "false",\
    "auto.offset.reset": "false",\
    "enable.partition.eof": "true",\
    "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",\
    "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
}
kafka_mobile_apps_df = spark.readStream.format("kafka").options(**options).options().load()

kafka_mobile_apps_df = kafka_mobile_apps_df\
    .select(from_json(col("value").cast("string"),app_schema).alias("mob_apps"))

因为订阅了经纪人,这给了我流数据帧。在此之后,我将数据聚合到 count_df 中,如图所示

count_df = kafka_mobile_apps_df.withColumn("diff_days",((col("TimeStamp_")) - (col("TimeStamp")))/(60.0*60.0*24))\
                            .withColumn("within_7d_ind",when(col("diff_days") < 7.0,1).otherwise(0))\
                            .groupBy("_applicationId")
                            .agg(sum(col("within_7d_ind")).alias(feature+"_7day_veLocity"))

现在我正在尝试将此 count_df 流写入 redis。重新访问后,我发现我可以使用“spark-redis_2.11”进行 spark-redis 连接。

我不知道scala,我找到了一个带有scala的spark-redis github exmaple。有人可以帮助在pyspark中写入以通过身份验证将此count_df写入redis的确切方法是什么

请找 spark-redis github here

我已经在集群上安装了所需的 jar“com.redislabs:spark-redis_2.12:2.5.0”。

谢谢。

刚刚发现他们还不支持python,请告诉我还有其他方法可以写吗?

解决方法

你应该做 pyspark,我已经在这里回答了这个问题 https://stackoverflow.com/a/68218806/2986344

更多有用的链接: https://github.com/RedisLabs/spark-redis/issues/307

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。