我从URL列表构建一个RDD,然后尝试使用一些异步http调用获取数据.
在进行其他计算之前我需要所有结果.
理想情况下,我需要在不同节点上进行http调用以进行缩放考虑.
在进行其他计算之前我需要所有结果.
理想情况下,我需要在不同节点上进行http调用以进行缩放考虑.
我做了这样的事情:
//init spark val sparkContext = new SparkContext(conf) val datas = Seq[String]("url1","url2") //create rdd val rdd = sparkContext.parallelize[String](datas) //httpCall return Future[String] val requests = rdd.map((url: String) => httpCall(url)) //await all results (Future.sequence may be better) val responses = requests.map(r => Await.result(r,10.seconds)) //print responses response.collect().foreach((s: String) => println(s)) //stop spark sparkContext.stop()
这项工作,但Spark工作永远不会完成!
所以我想知道使用Spark(或Future [RDD])处理Future的最佳实践是什么.
我认为这个用例看起来很常见,但还没有找到任何答案.
最好的祝福
解决方法
this use case looks pretty common
不是真的,因为它根本无法正常工作(可能).由于每个任务都在标准的Scala迭代器上运行,因此这些操作将被压缩在一起.这意味着所有操作都将在实践中阻塞.假设您有三个URL [“x”,“y”,“z”],您的代码将按以下顺序执行:
Await.result(httpCall("x",10.seconds)) Await.result(httpCall("y",10.seconds)) Await.result(httpCall("z",10.seconds))
您可以轻松地在本地重现相同的行为.如果要异步执行代码,则应使用mapPartitions显式处理:
rdd.mapPartitions(iter => { ??? // Submit requests ??? // Wait until all requests completed and return Iterator of results })
但这比较棘手.无法保证给定分区的所有数据都适合内存,因此您可能也需要一些批处理机制.
所有这一切都说我无法重现你所描述的问题可能是一些配置问题或httpCall本身的问题.
在旁注上允许单个超时终止整个任务看起来不是一个好主意.
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。