如何解决Service Fabric 中的事件中心使用者
我正在尝试使用一种服务结构来始终如一地从 azure 事件中心提取消息。我似乎已连接好一切,但请注意,我的消费者只是停止拉动事件。
我有一个集线器,其中包含已推送到它的数千个事件。为集线器配置了 1 个分区,并且我的服务结构服务也只有 1 个分区以简化调试。
Service 启动,创建 EventHubClient,然后使用它创建 PartitionReceiver。接收器被传递到一个“EventLoop”,它在调用receiver.ReceiveAsync时进入一个“无限”。 EventLoop 的代码如下。
我观察到的是第一次通过循环时我几乎总是收到 1 条消息。第二次我收到了 103 到 200 条左右的消息。在那之后,我没有收到任何消息。似乎如果我重新启动服务,我会再次收到相同的消息 - 但那是因为当我重新启动服务时,我让它在流的开头重新启动。
希望它一直运行,直到我的 2000 条消息被消耗掉,然后它会等待我(偶尔轮询)。
我需要对 Azure.Messaging.EventHubs 5.3.0 包做一些特定的事情以使其继续拉取事件吗?
//Here is how I am creating the EventHubClient:
var connectionString = "something secret";
var connectionStringBuilder = new EventHubsConnectionStringBuilder(connectionString)
{
EntityPath = "NameOfMyEventHub"
};
try
{
m_eventHubClient = EventHubClient.Create(connectionStringBuilder);
}
//Here is how I am getting the partition receiver
var receiver = m_eventHubClient.CreateReceiver("$Default",m_partitionId,EventPosition.FromStart());
//The event loop which the receiver is passed to
private async Task EventLoop(PartitionReceiver receiver)
{
m_started = true;
while (m_keepRunning)
{
var events = await receiver.ReceiveAsync(m_options.BatchSize,TimeSpan.FromSeconds(5));
if (events != null) //First 2/3 times events aren't null. After that,always null and I kNow there are more in the partition/
{
var eventsArray = events as EventData[] ?? events.ToArray();
m_state.NumProcessedSinceLastSave += eventsArray.Count();
foreach (var evt in eventsArray)
{
//Process the event
await m_options.Processor.ProcessMessageAsync(evt,null);
string lastOffset = evt.SystemProperties.Offset;
if (m_state.NumProcessedSinceLastSave >= m_options.BatchSize)
{
m_state.Offset = lastOffset;
m_state.NumProcessedSinceLastSave = 0;
await m_state.SaveAsync();
}
}
}
}
m_started = false;
}
**EDIT,问了一个关于分区数量的问题。事件中心有一个分区,SF 服务也有一个分区。
打算使用服务结构状态来跟踪我在集线器中的偏移量,但这不是现在的问题。
为每个分区创建分区侦听器。我得到这样的分区:
public async Task StartAsync()
{
// slice the pie according to distribution
// this partition can get one or more assigned Event Hub Partition ids
string[] eventHubPartitionIds = (await m_eventHubClient.GetRuntimeinformationAsync()).PartitionIds;
string[] resolvedEventHubPartitionIds = m_options.ResolveAssignedEventHubPartitions(eventHubPartitionIds);
foreach (var resolvedPartition in resolvedEventHubPartitionIds)
{
var partitionReceiver = new EventHubListenerPartitionReceiver(m_eventHubClient,resolvedPartition,m_options);
await partitionReceiver.StartAsync();
m_partitionReceivers.Add(partitionReceiver);
}
}
调用partitionListener.StartAsync的时候,其实是创建了PartitionListener,像这样(其实比这个多一点,但是走的分支是这个:
m_eventHubClient.CreateReceiver(m_options.EventHubConsumerGroupName,EventPosition.FromStart());
感谢您的任何提示。 将
解决方法
你有多少个分区?我在您的代码中看不到您如何确保读取默认使用者组中的所有分区。
您使用 PartitionReceiver
而不是 EventProcessorHost 的任何具体原因?
对我来说,SF 似乎非常适合使用事件处理器主机。我看到已经有一个 SF integrated solution 使用有状态服务进行检查点。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。