微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何将Akka.Net流与AspNet核心或长颈鹿集成

如何解决如何将Akka.Net流与AspNet核心或长颈鹿集成

通常,当我使用Giraffe或ASP.Net Core时,我可以创建一个actor系统,将其添加为服务,然后得到它以为请求处理程序选择任何actor并询问/告知消息。

无论是使用Cluster.Sharding还是普通的user/actor,我都知道它将是整个系统中处理多个消息的actor的单个实例。

如何与Streams进行相同的通信?它们似乎不是路由器中的引用,也不是作为角色路径的角色系统:角色引用,路径和地址。

这应该做些不同的事情吗?

从IO部分进行复制,我可以具体化一个图来处理每个请求,但是总的来说,我与“单例”通信,例如域驱动设计聚合根来处理域逻辑(这就是分片模块的原因)不确定如何处理可在请求处理程序中新实现的图中使用的Singleton Sink,因为所有请求必须只有一个接收器。

解决方法

有很多方法integrate akka streams用于外部系统。使接收者更容易接受的对象是Source.queue(有点类似于System.Threading.Channels并在其之前)。您可以在初始化点实现流,然后在Giraffe DI中注册队列端点-这样,您不必为每个请求支付相同的流初始化费用:

open Akka.Streams
open Akkling
open Akkling.Streams
open FSharp.Control.Tasks.Builders

let run () = task {
    use sys = System.create "sys" <| Configuration.defaultConfig()
    use mat = sys.Materializer()
    
    // construct a stream with async queue on both ends with buffer for 10 elements
    let sender,receiver =
        Source.queue OverflowStrategy.Backpressure 10
        |> Source.map (fun x -> x * x)
        |> Source.toMat (Sink.queue) Keep.both
        |> Graph.run mat
        
    // send data to a queue - quite often result could be just ignored
    match! sender.OfferAsync 2 with
    | :? QueueOfferResult.Enqueued -> () // successfull
    | :? QueueOfferResult.Dropped -> () // doesn't happen in OverflowStrategy.Backpressure 
    | :? QueueOfferResult.QueueClosed -> () // queue has been already closed
    | :? QueueOfferResult.Failure as f -> eprintfn "Unexpected failure: %O" f.Cause
    
    // try to receive data from the queue
    match! receiver.AsyncPull() with
    | Some data -> printfn "Received: %i" data
    | None -> printfn "Stream has been prematurelly closed"
        
    // asynchronously close the queue
    sender.Complete()
    do! sender.WatchCompletionAsync()
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。