如何解决TPL .Net与Azure EventHub Producer并发问题
我正在使用Azure Event Hub生产者客户端,并从kafka流中读取消息,然后将其传递给反序列化/映射,然后传递给Event Hub。我有一个消耗循环,为每个消耗创建一个任务,然后使用两种方法进行处理(从卡夫卡滞后的角度来看,这似乎大大提高了速度。但是,事件中心使您可以创建一个事件批处理,而我却没有一定要使用。我现在只想一次一次发送数据。为了创建一个新的批处理,我必须调用dispose()。我遇到了一个问题,即另外一个调用该函数的函数调用dispose()的时间,我收到一条错误消息,指出事件中心正在使用该对象。
我还尝试将重载用于eventHubProducerClient.SendAsync,该重载允许您传递IEnumerable,但与此同时我也遇到了同样的问题。
所以我认为这是一个同步问题,或者也许我需要在某个地方进行锁定?
任何帮助将不胜感激。
public void Execute()
{
using (_consumer)
{
try
{
_consumer.Subscribe(_streamConsumerSettings.Topic);
while (true)
{
var result = _consumer.Consume(1000);
if (result == null)
{
continue;
}
var process = Task.Factory.StartNew(() => ProcessMessage(result?.Message?.Value));
var send = process.ContinueWith(t => SendMessage(process.Result));
}
}
catch (ConsumeException e)
{
_logger.LogError(e,e.StackTrace ?? e.Message);
_cancelConsume = true;
_consumer.Close();
RestartConsumer();
}
}
}
public static EquipmentJson ProcessMessage(byte[] result)
{
var json = _messageProcessor.DeserializeAndMap(result);
return json;
}
public static void SendMessage(EquipmentJson message)
{
try
{
_eventHubClient.AddToBatch(message);
}
catch (Exception e)
{
_logger.LogError(e,e.StackTrace ?? e.Message);
}
}
public async Task AddToBatch(EquipmentJson message)
{
if
(!string.IsNullOrEmpty(message.EquipmentLocation))
{
try
{
var batch = await _equipmentLocclient.CreateBatchAsync();
batch.TryAdd(new EventData(Encoding.UTF8.GetBytes(message.EquipmentLocation)));
await _eventHubProducerClient.SendAsync(batch);
batch.dispose();
_logger.Loginformation($"Data sent {DateTimeOffset.UtcNow}");
}
catch (Exception e)
{
_logger.LogError(e,e.StackTrace ?? e.Message);
}
}
}
public class EventHubClient : IEventHubClient
{
private readonly ILoggerAdapter<EventHubClient> _logger;
private readonly EventHubClientSettings _eventHubClientSettings;
private IMapper _mapper;
private static EventHubProducerClient _equipmentLocclient;
public EventHubClient(ILoggerAdapter<EventHubClient> logger,EventHubClientSettings eventHubClientSettings,IMapper mapper)
{
_logger = logger;
_eventHubClientSettings = eventHubClientSettings;
_mapper = mapper;
_equipmentLocclient = new EventHubProducerClient(_eventHubClientSettings.ConnectionString,_eventHubClientSettings.EquipmentLocation);
}
}
}
解决方法
基于我对评论的猜测,我很好奇是否重构为使用async/await
而不是主循环中的显式连续可能会有所帮助。也许与以下LinqPad代码段相似:
async Task Main()
{
while (true)
{
var message = await Task.Factory.StartNew(() => GetText());
var events = new[] { new EventData(Encoding.UTF8.GetBytes(message)) };
await Send(events).ConfigureAwait(false);
}
}
public EventHubProducerClient client = new EventHubProducerClient("<< CONNECTION STRING >>");
public async Task Send(EventData[] events)
{
try
{
await client.SendAsync(events).ConfigureAwait(false);
"Sent".Dump();
}
catch (Exception ex)
{
ex.Dump();
}
}
public string GetText()
{
Thread.Sleep(250);
return "Test";
}
如果您打算保留延续性,我想知道在延续中进行轻微的结构重构是否有帮助,既可以推动事件的创建,也可以兑现await
语句。也许与以下LinqPad代码段相似:
async Task Main()
{
while(true)
{
var t = Task.Factory.StartNew(() => GetText());
var _ = t.ContinueWith(async q =>
{
var events = new[] { new EventData(Encoding.UTF8.GetBytes(t.Result)) };
await Send(events).ConfigureAwait(false);
});
await Task.Yield();
}
}
public EventHubProducerClient client = new EventHubProducerClient("<< CONNECTION STRING >>");
public async Task Send(EventData[] events)
{
try
{
await client.SendAsync(events).ConfigureAwait(false);
"Sent".Dump();
}
catch (Exception ex)
{
ex.Dump();
}
}
public string GetText()
{
Thread.Sleep(250);
return "Test";
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。