如果系统中的某些实体可以充当数据或事件的生成者,而其他实体可以充当消费者,那么将这些“正交关注点”外化到生产者和消费者类型类中是否有意义?
我可以看到Haskell管道库使用这种方法,并且欣赏这个问题对于来自Haskell背景的人来说可能看起来非常基本,但是会对Scala视角和示例感兴趣,因为我看不到很多.
解决方法
你应该看看Matt Might的
this article.
它为您提供了Producer,Consumer,Transducer(您提到的haskell库中的Pipe)的简单实现,以及如何使用它们来创建Web服务器的示例.
基本上每个Producer都扩展了Runnable,并有一个私有缓冲区来输出元素.缓冲区是一个java ArrayBlockingQueue,它是线程安全的.
每个Consumer也是一个Runnable,并且具有使用类似架构的输入缓冲区.
将Consumer连接到Producer时,您将创建另一个Runnable.
在启动时,它将启动Producer和Consumer(它们是Runnable)并将在它们之间传输数据.
将传感器链接到Producer时,它会创建一个新的Producer.
因此,如果您遵循他的实现,您应该能够以haskell的方式编写:
listen ==> connect ==> process ==> reply
import java.util.concurrent.ArrayBlockingQueue trait Coroutine extends Runnable { def start() { val myThread = new Thread(this) myThread.start() } } trait Producer[O] extends Coroutine { private val outputs = new ArrayBlockingQueue[O](1024) protected def put(output: O): Unit = outputs.put(output) def next(): O = outputs.take() def ==>[I >: O](consumer: Consumer[I]): Coroutine = { val that = this new Coroutine { def run() { while (true) { val o = that.next(); consumer.accept(o) } } override def start() { that.start() consumer.start() super.start() } } } } trait Consumer[I] extends Coroutine { private val inputs = new ArrayBlockingQueue[I] (1024) def accept(input : I): Unit = inputs.put(input) protected def get(): I = inputs.take() }
以下是如何使用它:
case class IntProducer(zero: Int) extends Producer[Int]{ def run(): Unit = { var i = zero while(true) { put(i); i += 1 } } } object Printer extends Consumer[Any]{ def run(): Unit = { while(true) { println(get()) } } } val pip = IntProducer(0) ==> Printer pip.start()
要查看更多示例以及如何处理`Transducer,请查看my Gist.
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。