微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

为什么我会使用工作单元模式用尽SQL连接?

如何解决为什么我会使用工作单元模式用尽SQL连接?

当我们在总线上放置许多消息,并且它们在业务逻辑中调用流程时,就会发生此错误

在从服务器获取连接之前经过的超时时间 池。这可能是因为所有池化连接都在 使用和达到最大池大小。

当有15条消息调用我们的流程时,不会发生这种情况。但是,当调用80或130个进程时,确实会发生这种情况。

我们正在使用工作单元模式,并且连接在使用后被关闭。因此,我很难理解为什么下一个过程无法在池中使用它。

这是在我们的应用中使用工作单元的方式:

using (var uow = _uowFactory.Create(true))
{
    await uow.AccrualRepo.AddAccrualHistoriesAsync(histories);
    await uow.CommitAsync();
}

这是工厂返回欠款的方式:

public class UnitOfWorkFactory : IUnitOfWorkFactory
{
    private readonly IConfiguration _configuration;
    private readonly IMediator _mediator;
    private readonly IStateAccessor _stateAccessor;
    private readonly ITimeProvider _timeProvider;
    private readonly IDbConnection _connection;
    private readonly IAccrualMapper _accrualMapper;
    private readonly ILogger<RepoBase> _logger;

    public UnitOfWorkFactory(IConfiguration configuration,IDbConnection sqlConnection,IMediator mediator,IStateAccessor stateAccessor,ITimeProvider timeProvider,IAccrualMapper accrualMapper,ILogger<RepoBase> logger)
    {
        _configuration = configuration;
        _mediator = mediator;
        _stateAccessor = stateAccessor;
        _timeProvider = timeProvider;
        _connection = sqlConnection;
        _accrualMapper = accrualMapper;
        _logger = logger;
    }

    public IUnitOfWork Create(bool useTransaction)
    {
        return new UnitOfWork(_configuration,_connection,_mediator,_stateAccessor,_timeProvider,_accrualMapper,_logger,useTransaction);
    }

我们的Startup.cs文件通过以下方式设置了依赖项注入:

services.AddTransient<IUnitOfWorkFactory,UnitOfWorkFactory>();
services.AddTransient<IDbConnection,sqlConnection>();

我现在有很多代码,但是我们的样子如下。请注意,在调用CommitAsync()之后和进行处置时,连接已关闭

public class UnitOfWork : IUnitOfWork,Idisposable
{
    private readonly IConfiguration _configuration;
    private readonly IMediator _mediator;
    private readonly IStateAccessor _stateAccessor;
    private readonly ITimeProvider _timeProvider;
    private readonly IAccrualMapper _accrualMapper;
    private readonly ILogger<RepoBase> _logger;
    private IDbConnection _connection;
    private IDbTransaction _transaction;
    private IAccrualRepo _accrualRepo;
    private bool _disposed;
    private bool _commitOccurred;
    private bool _useTransaction;

    public UnitOfWork(IConfiguration configuration,ILogger<RepoBase> logger,bool useTransaction = true)
    {
        _configuration = configuration;
        _mediator = mediator;
        _stateAccessor = stateAccessor;
        _timeProvider = timeProvider;
        _useTransaction = useTransaction;
        _accrualMapper = accrualMapper;
        _logger = logger;
        _connection = sqlConnection;
        _connection.ConnectionString = _configuration["ConnectionString"];
        _connection.open();

        if (useTransaction)
        {
            _transaction = _connection.BeginTransaction();
        }
    }

    public IAccrualRepo AccrualRepo
    {
        get => _accrualRepo ?? (_accrualRepo = new AccrualRepo(_configuration,_transaction,_logger));
        set => _accrualRepo = value;
    }

    public async Task CommitAsync()
    {
        if (!_useTransaction)
        {
            throw new InvalidOperationException("Attempting to call commit on a unit of work that isn't using a transaction");
        }

        try
        {
            _transaction.Commit();
            _commitOccurred = true;
            await InvokePostCommitOnReposAsync();
        }
        catch
        {
            _transaction.Rollback();
            throw;
        }
        finally
        {
            _connection.Close();
            _transaction.dispose();
            ResetRepositories();
        }
    }

    private async Task InvokePostCommitOnReposAsync()
    {
        var repos = new List<RepoBase>();
        if (_accrualRepo != null) { repos.Add((RepoBase)_accrualRepo); }

        try
        {
            foreach (var repo in repos)
            {
                await repo.PostCommitAsync();
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex,"Exception occurred while invoking post commit on a repo.");
        }
    }

    private void ResetRepositories()
    {
        _accrualRepo = null; // Note: there are more repos here,but removed for clarity.
    }

    public void dispose()
    {
        dispose(true);
        GC.SuppressFinalize(this); // Already disposed; no need for the GC to finalize.
    }

    protected virtual void dispose(bool calledFromdisposeAndNotFromFinalizer)
    {
        if (_disposed) { return; }

        if (calledFromdisposeAndNotFromFinalizer)
        {
            // If the user never called commit,but we are using a transaction,then roll back.
            if (!_commitOccurred && _useTransaction && _transaction != null) { _transaction.Rollback(); }

            if (_transaction != null) { _transaction.dispose(); _transaction = null; }
            if (_connection != null) { _connection.dispose(); _connection = null; }
        }

        _disposed = true;
    }
}

那么为什么会有连接池问题呢?这里做错了什么吗?也许我们需要增加连接池的大小?

解决方法

连接池大小是允许的同时连接数。例如,对于SQL Server,默认值为100。如果一次连接的数量超过该数量,则必须等待先前的连接关闭。

如果您有很多消息可以一次发送,建议您增加连接池的大小。

https://docs.microsoft.com/en-us/dotnet/framework/data/adonet/sql-server-connection-pooling

但是,如果您发现一定时间的运行时间后收到此消息。可能是因为代码有问题,并且某些连接没有关闭。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。