微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

继承自 Stephen Cleary 的 Nito.Disposables NuGet 包

如何解决继承自 Stephen Cleary 的 Nito.Disposables NuGet 包

我看到了 Stephen Cleary 的 disposables NuGet 包的实现,在我的情况下它似乎是完美的,即使如此,我也找不到关于如何继承它的示例。

我的想法是将 UnsubscribeAsync().GetAwaiter().GetResult(); 变成 await UnsubscribeAsync();,这意味着它应该被包装到一个 IAsyncdisposable 中。我怎么能用一个密封的类来实现这一点?

public sealed class LiveTradeManager : ITradeManager,Idisposable
{
    private bool _disposed;
    private readonly ILogger<LiveTradeManager> _logger;
    private readonly TradeOptions _TradeOptions;
    private readonly IBotClient _client;
    
    private string _listenKey;
    private UpdateSubscription _candleSubscription,_accountUpdateSubscription;
    private IDictionary<string,Channel<IBinanceStreamKlineData>> _channels;
    
    public LiveTradeManager(ILogger<LiveTradeManager> logger,IOptions<TradeOptions> TradeOptions,IOptions<ExchangeOptions> exchangeOptions,IBotClientFactory clientFactory)
    {
        _logger = logger;
        _TradeOptions = TradeOptions.Value;
        _client = clientFactory.GetBotClient(exchangeOptions.Value.BotClientType);
    }
    
    public bool IsPaused { get; set; }

    public async Task RunAsync(CancellationToken cancellationToken)
    {
        try
        {
            await SubscribeAsync(cancellationToken).ConfigureAwait(false);
        }
        catch (TaskCanceledException)
        {
        }
        catch (Exception ex) when (Handle(() => _logger.LogError(ex,"Unexpected error.")))
        {
        }
    }
    
    private async Task SubscribeAsync(CancellationToken cancellationToken)
    {
        // Subscribe to account updates
        _listenKey = await _client.GetListenKeyAsync(cancellationToken).ConfigureAwait(false);

        void OnorderUpdate(BinanceStreamOrderUpdate order)
        {
            // order update logic
        }

        _accountUpdateSubscription = await _client.SubscribetoUserDataUpdatesAsync(_listenKey,OnorderUpdate).ConfigureAwait(false);

        _ = Task.Run(async () =>
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                await _client.KeepAliveListenKeyAsync(_listenKey,cancellationToken).ConfigureAwait(false);
                await Task.Delay(TimeSpan.FromMinutes(30),cancellationToken).ConfigureAwait(false);
            }
        },cancellationToken);

        // Subscribe to candle updates
        var symbols = _TradeOptions.Symbols.Select(x => x.ToString()).ToList();

        _channels = symbols.ToDictionary(x => x,_ =>
            Channel.CreateBounded<IBinanceStreamKlineData>(new BoundedChannelOptions(1)
                {FullMode = BoundedChannelFullMode.DropOldest}));

        async void OnCandleReceived(IBinanceStreamKlineData data)
        {
            if (IsPaused) return;

            try
            {
                var ohlcv = data.Data.ToCandle();

                if (data.Data.Final)
                {
                    _logger.Loginformation(
                        $"[{data.Symbol}] Finalized candle | Open time: {ohlcv.Timestamp.ToDateTimeFormat()} | Price: {ohlcv.Close}");

                    _ = Task.Run(async () =>
                    {
                        await Task.Delay(10000,cancellationToken).ConfigureAwait(false);
                    },cancellationToken);
                }
                else
                {
                    _logger.Loginformation(
                        $"[{data.Symbol}] Candle update | Open time: {ohlcv.Timestamp.ToDateTimeFormat()} | Price: {ohlcv.Close}");
                    
                    await _channels[data.Symbol].Writer.WriteAsync(data,cancellationToken).ConfigureAwait(false);
                }
            }
            catch (TaskCanceledException)
            {
            }
        }

        _candleSubscription = await _client
            .SubscribetoCandleUpdatesAsync(symbols,KlineInterval.OneMinute,OnCandleReceived)
            .ConfigureAwait(false);

        var tasks = _channels.Values.Select(async channel =>
        {
            await foreach (var data in channel.Reader.ReadAllAsync(cancellationToken))
            {
                // long-running logic...

                await Task.Delay(10000,cancellationToken).ConfigureAwait(false);
            }
        });

        // NOTE: this would block further logic
        await Task.WhenAll(tasks).ConfigureAwait(false);
    }

    private async Task UnsubscribeAsync()
    {
        // Unsubscribe account updates
        if (!string.IsNullOrEmpty(_listenKey))
        {
            await _client.StopListenKeyAsync(_listenKey).ConfigureAwait(false);
        }

        if (_accountUpdateSubscription != null)
        {
            await _client.UnsubscribeAsync(_accountUpdateSubscription).ConfigureAwait(false);
        }

        // Unsubscribe candle updates
        if (_candleSubscription != null)
        {
            await _client.UnsubscribeAsync(_candleSubscription).ConfigureAwait(false);
        }

        // Channels
        if (_channels != null)
        {
            foreach (var channel in _channels.Values)
            {
                channel.Writer.Complete();
            }

            _channels.Clear();
        }
    }
    
    public void dispose()
    {
        dispose(true);
        GC.SuppressFinalize(this);
    }

    private void dispose(bool disposing)
    {
        if (_disposed) return;

        if (disposing)
        {
            UnsubscribeAsync().GetAwaiter().GetResult();
        }

        _disposed = true;
    }
}

public class BotManagerService : BackgroundService
{
    private readonly IHostApplicationLifetime _hostApplicationLifetime;
    private readonly IdiscordClient _discordClient;
    private readonly ITradeManager _TradeManager;
    
    public BotManagerService(
        IHostApplicationLifetime hostApplicationLifetime,IOptions<ExchangeOptions> options,IdiscordClient discordClient,ITradeManagerFactory TradeManagerFactory)
    {
        _hostApplicationLifetime = hostApplicationLifetime;
        _discordClient = discordClient;
        _TradeManager = TradeManagerFactory.GetTradeManager(options.Value.TradeManagerType);
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        try
        {
            var task1 = _TradeManager.RunAsync(stoppingToken);
            var task2 = _discordClient.StartAsync();

            await Task.WhenAll(task1,task2).ConfigureAwait(false);
        }
        finally
        {
            _hostApplicationLifetime.StopApplication();
        }
    }
}

解决方法

我找不到关于如何继承它的示例。

Nito.Disposables 与我编写的绝大多数代码一样,都是为 composition rather than inheritance 编写的。

因此,如果您有一个需要实现 IAsyncDisposable 的类型,它应该包含一个 IAsyncDisposable 实现并将其接口方法转发给该包含的对象:

public sealed class LiveTradeManager : ITradeManager,IAsyncDisposable
{
  private readonly AsyncDisposable _disposable;
  ...
    
  public LiveTradeManager(...)
  {
    ...
    _disposable = new(async () => await UnsubscribeAsync());
  }

  public ValueTask DisposeAsync() => _disposable.DisposeAsync();    
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。