微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

scala – 使用Async HTTP调用的Spark作业

我从URL列表构建一个RDD,然后尝试使用一些异步http调用获取数据.
在进行其他计算之前我需要所有结果.
理想情况下,我需要在不同节点上进行http调用以进行缩放考虑.

我做了这样的事情:

//init spark
val sparkContext = new SparkContext(conf)
val datas = Seq[String]("url1","url2")

//create rdd
val rdd = sparkContext.parallelize[String](datas)

//httpCall return Future[String]
val requests = rdd.map((url: String) => httpCall(url))

//await all results (Future.sequence may be better)
val responses = requests.map(r => Await.result(r,10.seconds))

//print responses
response.collect().foreach((s: String) => println(s))

//stop spark
sparkContext.stop()

这项工作,但Spark工作永远不会完成!

所以我想知道使用Spark(或Future [RDD])处理Future的最佳实践是什么.

我认为这个用例看起来很常见,但还没有找到任何答案.

最好的祝福

解决方法

this use case looks pretty common

不是真的,因为它根本无法正常工作(可能).由于每个任务都在标准的Scala迭代器上运行,因此这些操作将被压缩在一起.这意味着所有操作都将在实践中阻塞.假设您有三个URL [“x”,“y”,“z”],您的代码将按以下顺序执行:

Await.result(httpCall("x",10.seconds))
Await.result(httpCall("y",10.seconds))
Await.result(httpCall("z",10.seconds))

您可以轻松地在本地重现相同的行为.如果要异步执行代码,则应使用mapPartitions显式处理:

rdd.mapPartitions(iter => {
  ??? // Submit requests
  ??? // Wait until all requests completed and return Iterator of results
})

但这比较棘手.无法保证给定分区的所有数据都适合内存,因此您可能也需要一些批处理机制.

所有这一切都说我无法重现你所描述的问题可能是一些配置问题或httpCall本身的问题.

在旁注上允许单个超时终止整个任务看起来不是一个好主意.

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐