如何解决如何将Akka.Net流与AspNet核心或长颈鹿集成
通常,当我使用Giraffe或ASP.Net Core时,我可以创建一个actor系统,将其添加为服务,然后得到它以为请求处理程序选择任何actor并询问/告知消息。
无论是使用Cluster.Sharding还是普通的user/actor
,我都知道它将是整个系统中处理多个消息的actor的单个实例。
如何与Streams进行相同的通信?它们似乎不是路由器中的引用,也不是作为角色路径的角色系统:角色引用,路径和地址。
这应该做些不同的事情吗?
从IO部分进行复制,我可以具体化一个图来处理每个请求,但是总的来说,我与“单例”通信,例如域驱动设计聚合根来处理域逻辑(这就是分片模块的原因)不确定如何处理可在请求处理程序中新实现的图中使用的Singleton Sink,因为所有请求必须只有一个接收器。
解决方法
有很多方法integrate akka streams用于外部系统。使接收者更容易接受的对象是Source.queue
(有点类似于System.Threading.Channels并在其之前)。您可以在初始化点实现流,然后在Giraffe DI中注册队列端点-这样,您不必为每个请求支付相同的流初始化费用:
open Akka.Streams
open Akkling
open Akkling.Streams
open FSharp.Control.Tasks.Builders
let run () = task {
use sys = System.create "sys" <| Configuration.defaultConfig()
use mat = sys.Materializer()
// construct a stream with async queue on both ends with buffer for 10 elements
let sender,receiver =
Source.queue OverflowStrategy.Backpressure 10
|> Source.map (fun x -> x * x)
|> Source.toMat (Sink.queue) Keep.both
|> Graph.run mat
// send data to a queue - quite often result could be just ignored
match! sender.OfferAsync 2 with
| :? QueueOfferResult.Enqueued -> () // successfull
| :? QueueOfferResult.Dropped -> () // doesn't happen in OverflowStrategy.Backpressure
| :? QueueOfferResult.QueueClosed -> () // queue has been already closed
| :? QueueOfferResult.Failure as f -> eprintfn "Unexpected failure: %O" f.Cause
// try to receive data from the queue
match! receiver.AsyncPull() with
| Some data -> printfn "Received: %i" data
| None -> printfn "Stream has been prematurelly closed"
// asynchronously close the queue
sender.Complete()
do! sender.WatchCompletionAsync()
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。