object ProviderAPI { trait Receiver[T] { def receive(entry: T) def close() } def run(r: Receiver[Int]) { new Thread() { override def run() { (0 to 9).foreach { i => r.receive(i) Thread.sleep(100) } r.close() } }.start() } }
在此示例中,ProviderAPI.run接收Receiver,调用receive(i)10次然后关闭.通常,ProviderAPI.run会根据可能无限的集合调用receive(i).
此API旨在以命令式样式使用,如外部迭代器.如果我们的应用程序需要过滤,映射和打印此输入,我们需要实现一个混合所有这些操作的Receiver:
object Main extends App { class MyReceiver extends ProviderAPI.Receiver[Int] { def receive(entry: Int) { if (entry % 2 == 0) { println("Entry#" + entry) } } def close() {} } ProviderAPI.run(new MyReceiver()) }
现在,问题是如何在函数式,内部迭代器中使用ProviderAPI(不改变ProviderAPI的实现,这是给我们的).请注意,ProviderAPI也可以无限次地调用receive(i),因此不能选择收集列表中的所有内容(同样,我们应该逐个处理每个结果,而不是先收集所有输入,然后再处理它) .
我问如何实现这样的ReceiverToIterator,以便我们可以在功能样式中使用ProviderAPI:
object Main extends App { val iterator = new ReceiverToIterator[Int] // how to implement this? ProviderAPI.run(iterator) iterator .view .filter(_ % 2 == 0) .map("Entry#" + _) .foreach(println) }
更新
这有四个解决方案:
> IteratorWithSemaphorSolution:我提议的解决方案解决方案首先附在问题上
> QueueIteratorSolution:根据nadavwr的建议使用BlockingQueue [Option [T]].
它允许生产者在被消费者阻止之前继续产生最多queueCapacity.
> PublishSubjectSolution:非常简单的解决方案,使用Netflix RxJava-Scala API中的PublishSubject.
> SameThreadReceiverToTraversable:通过放宽问题的约束,非常简单的解决方案
解决方法
你在这里实现的主要是Java的BlockingQueue,队列大小为1.
主要特点:超级阻塞.缓慢的消费者会扼杀你的制作人的表现.
更新:@ gzm0提到BlockingQueue不包括EOF.你必须使用BlockingQueue [Option [T]].
更新:这是一个代码片段.它可以与您的接收器配合使用.
其中一些灵感来自Iterator.buffered.请注意,peek是一个误导性的名称,因为它可能会阻止 – 所以hasNext也是如此.
// fairness enabled -- you probably want to preserve order... // alternatively,disable fairness and increase buffer to be 'big enough' private val queue = new java.util.concurrent.ArrayBlockingQueue[Option[T]](1,true) // the following block provides you with a potentially blocking peek operation // it should `queue.take` when the prevIoUs peeked head has been invalidated // specifically,it will `queue.take` and block when the queue is empty private var head: Option[T] = _ private var headDefined: Boolean = false private def invalidateHead() { headDefined = false } private def peek: Option[T] = { if (!headDefined) { head = queue.take() headDefined = true } head } def iterator = new Iterator[T] { // potentially blocking; only false upon taking `None` def hasNext = peek.isDefined // peeks and invalidates head; throws NoSuchElementException as appropriate def next: T = { val opt = peek; invalidateHead() if (opt.isEmpty) throw new NoSuchElementException else opt.get } }
替代方案:Iteratees
基于迭代器的解决方案通常会涉及更多阻塞.从概念上讲,你可以在执行迭代的线程上使用continuation来避免阻塞线程,但是延续会破坏Scala的for-comprehensions,所以在这条道路上没有快乐.
或者,您可以考虑基于迭代的解决方案.迭代器与迭代器的不同之处在于,消费者不负责推进迭代 – 生产者是.使用迭代,消费者基本上折叠生产者推动的条目随着时间的推移.折叠每个下一个条目可以在线程池中进行折叠,因为在每次折叠完成后放弃该线程.
你不会为迭代的语法得到好处,学习曲线有点挑战,但如果你对使用foldLeft有信心,你最终会得到一个看起来合理的非阻塞解决方案.
要阅读有关iteratees的更多信息,我建议您查看PlayFramework 2.X’s iteratee reference.该文档描述了他们的独立iteratee库,它在Play的上下文之外100%可用. Scalaz 7还有一个全面的iteratee库.
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。