如何解决请求 dataTable.NewRow() 时 DataTable“数组尺寸超出支持的范围”
出于某种疯狂的原因,我在以合理的块将数据传输到 sql 时收到 OutOfMemoryException,并且几乎不使用任何内存:
System.OutOfMemoryException: Exception of type 'System.OutOfMemoryException' was thrown.
at System.Data.DataTable.NewRowArray(Int32 size)
at System.Data.RecordManager.GrowRecordCapacity()
at System.Data.RecordManager.NewRecordBase()
at System.Data.DataTable.NewRecord(Int32 sourceRecord)
at Axis.PortfolioAccumulation.Data.PortfolioAccumulationDbContext.d__22`1.MoveNext() in D:\Azure_DevOps_Reins_Prod_Agent_A\_work\7\s\Axis.PortfolioAccumulation.DataLayer\Axis.PortfolioAccumulation.Data\BulkInsert\StreamedsqlBulkcopy.cs:line 46
在下面的 while 循环中调用 dataTable.NewRow()
时发生错误,一旦我超过第 30 百万行:
/// <summary>Helper to stream a large number of records into sql without
/// ever having to materialize the entire enumerable into memory at once.</summary>
/// <param name="destinationTableName">The name of the table in the database to copy data to.</param>
/// <param name="dataTable">A new instance of the DataTable class that matches the schema of the table to insert to.
/// This should match exactly (same column names) what is in sql,for automatic column mapping to work.</param>
/// <param name="sourceData">The enumerable of data that will be used to generate DaTarows</param>
/// <param name="populateRow">A delegate function that populates and returns a new data row for a given record.</param>
/// <param name="memoryBatchSize">The number of DaTarows to generate in memory before passing them to sqlBulkcopy</param>
/// <param name="insertBatchSize">The batch size of inserts performed by sqlBulkcopy utility.</param>
public async Task StreamedsqlBulkcopy<T>(
string destinationTableName,DataTable dataTable,IEnumerable<T> sourceData,Func<T,DaTarow,DaTarow> populateRow,int memoryBatchSize = 1000000,int insertBatchSize = 5000)
{
using (sqlConnection connection = new sqlConnection(Database.Connection.ConnectionString))
{
connection.open();
using (sqlBulkcopy bulkcopy = new sqlBulkcopy(connection,sqlBulkcopyOptions.TableLock,null))
using (IEnumerator<T> enumerator = sourceData.GetEnumerator())
{
// Configure the single sqlBulkcopy instance that will be used to copy all "batches"
bulkcopy.DestinationTableName = destinationTableName;
bulkcopy.BatchSize = insertBatchSize;
bulkcopy.BulkcopyTimeout = _bulkInsertTimeOut;
foreach (DataColumn column in dataTable.Columns)
bulkcopy.columnmappings.Add(column.ColumnName,column.ColumnName);
// Begin enumerating over all records,preparing batches no larger than "memoryBatchSize"
bool hasNext = true;
while (hasNext)
{
DaTarow[] batch = new DaTarow[memoryBatchSize];
int filled = 0;
while ((hasNext = enumerator.MoveNext()) && filled < memoryBatchSize)
batch[filled++] = populateRow(enumerator.Current,dataTable.NewRow());
// When we reach the end of the enumerable,we need to shrink the final buffer array
if (filled < memoryBatchSize)
Array.Resize(ref batch,filled);
await bulkcopy.WritetoServerAsync(batch);
}
}
}
}
正如希望清楚的那样,上述帮助程序的目的是使用 IEnumerable<T>
读取器和将填充一行的委托将(非常大的)sqlBulkcopy
数据流式传输到 sql 表给定的元素。
示例用法是:
public async Task SaveExchangeRates(List<FxRate> fxRates)
{
var createDate = DateTimeOffset.UtcNow;
await StreamedsqlBulkcopy("RefData.ExchangeRate",GetExchangeRateDataTable(),fxRates,(fx,newRow) =>
{
newRow["BaseCurrency"] = "USD";
newRow["TargetCurrency"] = fx.CurrencyCode;
newRow["ExchangeDate"] = fx.ExchangeRateDate;
newRow["DollarValue"] = fx.ValueInUsd;
return newRow;
});
}
private DataTable GetExchangeRateDataTable()
{
var dataTable = new DataTable();
dataTable.Columns.Add("ExchangeDate",typeof(DateTime));
dataTable.Columns.Add("BaseCurrency",typeof(string));
dataTable.Columns.Add("TargetCurrency",typeof(string));
dataTable.Columns.Add("DollarValue",typeof(double));
return dataTable;
}
解决方法
事实证明,即使您仅将 DataTable
实例用作用于架构目的的空结构,并且即使您从未调用 dataTable.Rows.Add()
将其实际添加到表中,在其中每次调用 NewRow
时它都会增加一个计数器,并且显然甚至增加了一个占位符数组,它希望您最终插入所有这些行?
无论如何,解决方法是通过用自身的克隆覆盖它来定期“重置”模板:
dataTable = dataTable.Clone();
当然不优雅,但比尝试实现自己的 IDataReader
要容易,后者是利用 SQLBulkCopy 的唯一其他方法。 (也就是说 - 对于任何其他人尝试流式传输到 SQL 批量复制不受约束以避免像我一样避免使用 3rd 方库,请查看 FastMember 包中的 Marc Gravel 的 ObjectReader 和这个答案:https://stackoverflow.com/a/47208127/529618)
另一种简化(但以额外开销为代价)的方法是接受我们的命运并使用 DataTable
类而不是 DataRow
的数组 - 但创建 Clone()
原始表周期性地避免 16,777,216 行的明显硬最大限制。
我不明白 DataTable
为您使用它创建的所有行维护一个数组,即使它们最终没有被添加 - 所以我们不妨利用而不是分配我们自己的。
使用 DataTable
的一些开销可以通过设置其初始容量以确保它不会增长(内存分配)并禁用尽可能多的事件来抵消:
相关变化如下:
bool hasNext = true;
while (hasNext)
{
using (DataTable tableChunk = dataTable.Clone())
{
tableChunk.MinimumCapacity = memoryBatchSize + 1; // Avoid triggering resizing
tableChunk.BeginLoadData(); // Speeds up inserting a large volume of rows a little
int filled = 0;
while ((hasNext = enumerator.MoveNext()) && filled++ < memoryBatchSize)
tableChunk.Rows.Add(populateRow(enumerator.Current,tableChunk.NewRow()));
await bulkCopy.WriteToServerAsync(tableChunk);
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。