微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

scala,将回调模式转换为函数式内部迭代器

假设给出了这个API,我们无法更改它:

object ProviderAPI {

  trait Receiver[T] {
    def receive(entry: T)
    def close()
  }

  def run(r: Receiver[Int]) {
    new Thread() {
      override def run() {
        (0 to 9).foreach { i =>
          r.receive(i)
          Thread.sleep(100)
        }
        r.close()
      }
    }.start()
  }
}

在此示例中,ProviderAPI.run接收Receiver,调用receive(i)10次然后关闭.通常,ProviderAPI.run会根据可能无限的集合调用receive(i).

此API旨在以命令式样式使用,如外部迭代器.如果我们的应用程序需要过滤,映射和打印此输入,我们需要实现一个混合所有这些操作的Receiver:

object Main extends App {
  class MyReceiver extends ProviderAPI.Receiver[Int] {
    def receive(entry: Int) {
      if (entry % 2 == 0) {
        println("Entry#" + entry)
      }
    }
    def close() {}
  }

  ProviderAPI.run(new MyReceiver())
}

现在,问题是如何在函数式,内部迭代器中使用ProviderAPI(不改变ProviderAPI的实现,这是给我们的).请注意,ProviderAPI也可以无限次地调用receive(i),因此不能选择收集列表中的所有内容(同样,我们应该逐个处理每个结果,而不是先收集所有输入,然后再处理它) .

我问如何实现这样的ReceiverToIterator,以便我们可以在功能样式中使用ProviderAPI:

object Main extends App {
  val iterator = new ReceiverToIterator[Int]  // how to implement this?
  ProviderAPI.run(iterator)
  iterator
    .view
    .filter(_ % 2 == 0)
    .map("Entry#" + _)
    .foreach(println)
}

更新

这有四个解决方案:

> IteratorWithSemaphorSolution:我提议的解决方解决方案首先附在问题上
> QueueIteratorSolution:根据nadavwr的建议使用BlockingQueue [Option [T]].
它允许生产者在被消费者阻止之前继续产生最多queueCapacity.
> PublishSubjectSolution:非常简单的解决方案,使用Netflix RxJava-Scala API中的PublishSubject.
> SameThreadReceiverToTraversable:通过放宽问题的约束,非常简单的解决方

解决方法

更新:BlockingQueue的1个条目

在这里实现的主要是Java的BlockingQueue,队列大小为1.

主要特点:超级阻塞.缓慢的消费者会扼杀你的制作人的表现.

更新:@ gzm0提到BlockingQueue不包括EOF.你必须使用BlockingQueue [Option [T]].

更新:这是一个代码片段.它可以与您的接收器配合使用.
其中一些灵感来自Iterator.buffered.请注意,peek是一个误导性的名称,因为它可能会阻止 – 所以hasNext也是如此.

// fairness enabled -- you probably want to preserve order...
// alternatively,disable fairness and increase buffer to be 'big enough'
private val queue = new java.util.concurrent.ArrayBlockingQueue[Option[T]](1,true)

// the following block provides you with a potentially blocking peek operation
// it should `queue.take` when the prevIoUs peeked head has been invalidated
// specifically,it will `queue.take` and block when the queue is empty
private var head: Option[T] = _
private var headDefined: Boolean = false
private def invalidateHead() { headDefined = false }
private def peek: Option[T] = {
  if (!headDefined) {
    head = queue.take()
    headDefined = true
  }
  head
}

def iterator = new Iterator[T] {

  // potentially blocking; only false upon taking `None`
  def hasNext = peek.isDefined

  // peeks and invalidates head; throws NoSuchElementException as appropriate
  def next: T = {
    val opt = peek; invalidateHead()
    if (opt.isEmpty) throw new NoSuchElementException
    else opt.get
  }
}

替代方案:Iteratees

基于迭代器的解决方案通常会涉及更多阻塞.从概念上讲,你可以在执行迭代的线程上使用continuation来避免阻塞线程,但是延续会破坏Scala的for-comprehensions,所以在这条道路上没有快乐.

或者,您可以考虑基于迭代的解决方案.迭代器与迭代器的不同之处在于,消费者不负责推进迭代 – 生产者是.使用迭代,消费者基本上折叠生产者推动的条目随着时间的推移.折叠每个下一个条目可以在线程池中进行折叠,因为在每次折叠完成后放弃该线程.

你不会为迭代的语法得到好处,学习曲线有点挑战,但如果你对使用foldLeft有信心,你最终会得到一个看起来合理的非阻塞解决方案.

要阅读有关iteratees的更多信息,我建议您查看PlayFramework 2.X’s iteratee reference.该文档描述了他们的独立iteratee库,它在Play的上下文之外100%可用. Scalaz 7还有一个全面的iteratee库.

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐