微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

F#Akkling无法通过分片代理发送消息

如何解决F#Akkling无法通过分片代理发送消息

当我尝试使用以下代码向akka.net区域代理发送消息时,

open Akkling.Cluster.Sharding
open Akka.Actor
open Akka.Cluster
open Akka.Cluster.Sharding
open System
open Akkling

let configWithPort (port:int) =
    let config = Configuration.parse ("""
        akka {
          actor {
            provider = cluster
          }
          remote {
            dot-netty.tcp {
              public-hostname = "localhost"
              hostname = "localhost"
              port = """ + port.ToString() + """
            }
          }
          cluster {
            roles = ["Worker"]
            sharding {
                journal-plugin-id = "akka.persistence.journal.inmem"
                snapshot-plugin-id = "akka.persistence.snapshot-store.inmem"
            }
            seed-nodes = [ "akka.tcp://cluster-system@localhost:5000" ]
          }
        }
        """)
    config
      .WithFallback(Akka.Cluster.Tools.Singleton.ClusterSingletonManager.DefaultConfig())
      .WithFallback(ClusterSharding.DefaultConfig())

let system1 = ActorSystem.Create("cluster-system",configWithPort 5000)
let system2 = ActorSystem.Create("cluster-system",configWithPort 5001)

/// Domain
type FileCommand = {
    ProgramId : string
    Duration : TimeSpan
    FilePath : string
}

/// Actors
let aggregateRootActor (mailBox:Actor<_>) (msg:FileCommand) =
    let nodeAddress = Cluster.Get(mailBox.System).SelfUniqueAddress
    logInfof mailBox "Program: [%s] with path [%s] on [%A]" msg.ProgramId msg.FilePath nodeAddress
    ignored ()

let extractorFunction (message:FileCommand) =
    let entityId = message.ProgramId
    let hash = entityId.GetHashCode()
    let numberOfShards = 5
    let shardId = sprintf "shard_%d" ((abs hash) % numberOfShards)
    shardId,entityId,message

let region1 = spawnSharded extractorFunction system1 "fileRouter" (props (actorOf2 aggregateRootActor))
let region2 = spawnSharded extractorFunction system2 "fileRouter" (props (actorOf2 aggregateRootActor))

let shardRegionProxy = 
    spawnShardedProxy extractorFunction system1 "fileRouterProxy" None

向代理发送消息总是失败。

shardRegionProxy <! { ProgramId = "a"; Duration = TimeSpan.FromMinutes 10.; FilePath = "\\a_1.mp4" } //this Failed

错误消息是

> [INFO][8/26/2020 5:13:15 PM][Thread 0027][akka://cluster-system/system/sharding/fileRouterProxyCoordinator/singleton/coordinator] Message [RegisterProxy] from akka://cluster-system/system/sharding/fileRouterProxyProxy to akka://cluster-system/system/sharding/fileRouterProxyCoordinator/singleton/coordinator was not delivered. [6] dead letters encountered. If this is not an expected behavior then akka://cluster-system/system/sharding/fileRouterProxyCoordinator/singleton/coordinator may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

但是这些发送成功了。

region1 <! { ProgramId = "d"; Duration = TimeSpan.FromMinutes 8.; FilePath = "\\a_2.mp4" }
region2 <! { ProgramId = "a"; Duration = TimeSpan.FromMinutes 10.; FilePath = "\\a_1.mp4" }

不好意思, 如何正确创建分片协调器?

或者如果不正确,使用像这样的shardingcoordinator有什么问题?

解决方法

名称错误,按如下所示更改代码,一切都很好!

let shardRegionProxy = spawnShardedProxy extractorFunction system1 "fileRouter" (Some "Worker")

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。