如何解决CsvHelper - 缓慢填充数据表ish
将下面的代码放在一起以读取一组特定的 CSV 文件。它有效,但仍在进行中。有一段代码(填充数据表行 - 请参阅下面的片段)与 sqlBulkcopy 操作的运行时间一样长。就如何提高绩效征求意见/建议。
在(下面)的代码中,以 50K 批次处理一个 ~15M 行文件的时间不到 11.5 分钟。分解部分。 sqlBulkcopy 耗时约 236 公里(4 分钟),阅读器只需要 105 公里(约 1.5 分钟),填充数据表的部分耗时约 200 公里(3.33 分钟)。
csvTableTimer.Start();
// Process row and populate datatable
DaTarow dr = dt.NewRow();
foreach (DataColumn dc in dt.Columns)
{
if (row.GetType().GetProperty(dc.ToString()).GetValue(row) != null)
{
dr[dc.ToString()] = row.GetType().GetProperty(dc.ToString()).GetValue(row);
}
}
dt.Rows.Add(dr);
csvTableTimer.Stop();
CSV 文件非常大(10+GB)并且没有标题。我正在使用 Class 来构建数据表结构,并希望在填充数据表行时继续使用这种方法,因为我需要扩展它以处理多种 CSV 类型。
数据表反映了与 sql DB 表对齐的类中的列名。曾想使用 GetField(已转换,而不是原始)遍历数据表 row[column.ColumnName] = csv.GetField( column.DataType,column.ColumnName );
中的每一列,但一直收到关于没有标题的错误。发现了一个与 HasHeaderRecord = false 相关的未决问题,它与我试图做的事情相匹配,这增加了我向更擅长这方面的人寻求建议的愿望。感谢您的帮助!
在代码块上展开;
var rconfig = new CsvHelper.Configuration.CsvConfiguration(CultureInfo.InvariantCulture)
{
BufferSize = 1024,Delimiter = ",",AllowComments = true,HasHeaderRecord = false,HeaderValidated = null,IgnoreBlankLines = true,MissingFieldFound = null,Comment = '#',Escape = '"',TrimOptions = TrimOptions.Trim,BadDataFound = x =>
{
isBadRecord = true;
ErrRecords.Add(x.RawRecord);
++badCount;
}
};
var loadFType = @"B";
// Create datatable using class as deFinition.
PropertyDescriptorCollection props1 = TypeDescriptor.GetProperties(loaderFileType);
DataTable dt = new DataTable();
dt = UtilExtensions.CreateDataTable(props1);
using (var reader = new StreamReader(rFile))
{
reader.ReadLine();
using (var csv = new CsvReader(reader,rconfig))
{
switch (loadFType)
{
case "ALL":
csv.Context.RegisterClassMap<CSVLoader.AMap>();
var allRecords = new List<CSVLoader.A>();
break;
case "BAL":
csv.Context.RegisterClassMap<CSVLoader.BMap>();
var balRecords = new List<CSVLoader.B>();
break;
case "CIF":
csv.Context.RegisterClassMap<CSVLoader.CMap>();
var cifRecords = new List<CSVLoader.C>();
break;
}
dt.BeginLoadData();
while (csv.Read())
{
csvReadTimer.Start();
var row = csv.GetRecord(loaderFileType);
csvReadTimer.Stop();
runningCount++;
if (!isBadRecord)
{
csvTableTimer.Start();
// Process row and populate datatable
DaTarow dr = dt.NewRow();
foreach (DataColumn dc in dt.Columns)
{
if (row.GetType().GetProperty(dc.ToString()).GetValue(row) != null)
{
dr[dc.ToString()] = row.GetType().GetProperty(dc.ToString()).GetValue(row);
}
}
dt.Rows.Add(dr);
csvTableTimer.Stop();
++goodCount;
if (batchCount >= dtbatchSize || runningCount >= fileRecCount)
{
try
{
// Write from the source to the destination.
bcpLoadTimer.Start();
bulkcopy.WritetoServer(dt);
bcpLoadTimer.Stop();
bcpLoadBatchCount++;
}
catch (Exception ex)
{
}
dt.Clear();
batchCount = 0;
}
batchCount++;
}
isBadRecord = false;
}
dt.EndLoadData();
reader.Close();
dt.Clear();
transaction.Commit();
// B
public class B
{
[Index(0)]
public string A { get; set; }
[Index(1)]
public string BString { get; set; }
[Index(2)]
public int? C { get; set; }
[Index(3)]
public string D { get; set; }
[Index(4)]
public string E { get; set; }
[Index(5)]
public DateTime? F { get; set; }
[Index(6)]
public decimal? G { get; set; }
[Index(7)]
public decimal? H { get; set; }
[Index(8)]
public decimal? I { get; set; }
[Index(9)]
public decimal? J { get; set; }
[Index(10)]
public int? K { get; set; }
[Index(11)]
public string L { get; set; }
[Index(12)]
public DateTime? M { get; set; }
}
// B
public sealed class BMap : ClassMap<B>
{
public BMap()
{
// AutoMap(CultureInfo.InvariantCulture);
Map(m => m.A).Index(0);
Map(m => m.BString).Index(1);
Map(m => m.C).Index(2);
Map(m => m.D).Index(3);
Map(m => m.E).Index(4);
Map(m => m.F).Index(5).TypeConverterOption.Format("yyyyMMdd");
Map(m => m.G).Index(6);
Map(m => m.H).Index(7);
Map(m => m.I).Index(8);
Map(m => m.J).Index(9);
Map(m => m.K).Index(10);
Map(m => m.L).Index(11);
Map(m => m.M).Index(12).TypeConverterOption.Format("yyyy-MM-dd-hh.mm.ss.ffffff");
}
}
解决方法
您的问题并未真正包含 minimal reproducible example,因此我简化了您的代码以创建以下 FileLoader
类,该类计算从某些实例填充 DataTable
所需的时间使用 TClass
从 CSV 行读取的类 B
(此处为 CsvReader
):
public class FileLoader
{
public System.Diagnostics.Stopwatch csvTableTimer { get; } = new();
public long Load<TClass,TClassMap>(string rFile,int dtbatchSize) where TClassMap : ClassMap<TClass>,new()
{
bool isBadRecord = false;
long badCount = 0;
long runningCount = 0;
long goodCount = 0;
long batchCount = 0;
var rconfig = CreateCsvConfiguration(
x =>
{
isBadRecord = true;
//ErrRecords.Add(x.RawRecord);
++badCount;
});
// Create datatable using class as definition.
var dt = UtilExtensions.CreateDataTable(typeof(TClass));
using (var reader = new StreamReader(rFile))
{
//reader.ReadLine(); FIXED - THIS SKIPPED THE FIRST LINE AND CAUSED A RECORD TO BE OMITTED.
using (var csv = new CsvReader(reader,rconfig))
{
csv.Context.RegisterClassMap<TClassMap>();
dt.BeginLoadData();
while (csv.Read())
{
isBadRecord = false;
//csvReadTimer.Start();
var record = csv.GetRecord<TClass>();
//csvReadTimer.Stop();
runningCount++;
if (!isBadRecord)
{
csvTableTimer.Start();
// Process row and populate datatable
DataRow dr = dt.NewRow();
foreach (DataColumn dc in dt.Columns)
{
if (record.GetType().GetProperty(dc.ToString()).GetValue(record) != null)
{
dr[dc.ToString()] = record.GetType().GetProperty(dc.ToString()).GetValue(record);
}
}
dt.Rows.Add(dr);
csvTableTimer.Stop();
goodCount++;
if (++batchCount >= dtbatchSize)
{
// Flush the data table
FlushTable(dt);
batchCount = 0;
}
}
}
dt.EndLoadData();
FlushTable(dt);
Commit();
}
}
return goodCount;
}
protected virtual void FlushTable(DataTable dt) => dt.Clear(); // Replace with SqlBulkCopy
protected virtual void Commit() {} // Replace with transaction.Commit();
public static CsvConfiguration CreateCsvConfiguration(BadDataFound badDataFound) =>
new CsvHelper.Configuration.CsvConfiguration(CultureInfo.InvariantCulture)
{
BufferSize = 1024,Delimiter = ",",AllowComments = true,HasHeaderRecord = false,HeaderValidated = null,IgnoreBlankLines = true,MissingFieldFound = null,Comment = '#',Escape = '"',TrimOptions = TrimOptions.Trim,BadDataFound = badDataFound,};
}
public static partial class UtilExtensions
{
static IEnumerable<PropertyInfo> GetSerializableProperties(this Type type) =>
type.GetProperties().Where(p => p.GetIndexParameters().Length == 0 && p.CanRead && p.CanWrite && p.GetGetMethod() != null && p.GetSetMethod() != null);
public static DataTable CreateDataTable(Type type)
{
var dt = new DataTable();
foreach (var p in type.GetSerializableProperties())
dt.Columns.Add(p.Name,Nullable.GetUnderlyingType(p.PropertyType) ?? p.PropertyType);
return dt;
}
}
然后,如果我使用文件加载器并调用 loader.Load<B,BMap>(rFile,1000)
来读取 5555 行的 CSV 文件 20 次,则在 dotnetfiddle 上大约需要 1049 毫秒。参见演示 #1 here。
您遇到的一个问题是 c# 中的反射可能非常慢。您调用了 record.GetType().GetProperty(dc.ToString()).GetValue(record)
两次,如果我只是将调用次数减少 1,时间将减少到大约 706 毫秒:
foreach (DataColumn dc in dt.Columns)
{
var value = record.GetType().GetProperty(dc.ToString()).GetValue(record);
if (value != null)
{
dr[dc.ToString()] = value;
}
}
演示 #2 here。
但是,我们可以通过在运行时制造委托来做得更好。首先,添加以下使用 System.Linq.Expressions
命名空间的实用方法:
public static partial class UtilExtensions
{
public static Func<TSource,object> CreatePropertyGetter<TSource>(PropertyInfo propertyInfo)
{
var parameter = Expression.Parameter(typeof(TSource),"obj");
var property = Expression.Property(parameter,propertyInfo);
var convert = Expression.Convert(property,typeof(object));
var lambda = Expression.Lambda(typeof(Func<TSource,object>),convert,parameter);
return (Func<TSource,object>)lambda.Compile();
}
public static ReadOnlyDictionary<string,Func<TSource,object>> PropertyGetters<TSource>() => PropertyExpressionsCache<TSource>.PropertyGetters;
static ReadOnlyDictionary<string,object>> CreatePropertyGetters<TSource>() =>
typeof(TSource)
.GetSerializableProperties()
.ToDictionary(p => p.Name,p => CreatePropertyGetter<TSource>(p))
.ToReadOnly();
static class PropertyExpressionsCache<TSource>
{
public static ReadOnlyDictionary<string,object>> PropertyGetters { get; } = UtilExtensions.CreatePropertyGetters<TSource>();
}
public static ReadOnlyDictionary<TKey,TValue> ToReadOnly<TKey,TValue>(this IDictionary<TKey,TValue> dictionary) =>
new ReadOnlyDictionary<TKey,TValue>(dictionary ?? throw new ArgumentNullException());
}
并修改Load<TClass,TClassMap>()
如下:
public long Load<TClass,new()
{
bool isBadRecord = false;
long badCount = 0;
long runningCount = 0;
long goodCount = 0;
long batchCount = 0;
var rconfig = CreateCsvConfiguration(
x =>
{
isBadRecord = true;
//ErrRecords.Add(x.RawRecord);
++badCount;
});
var loaderFileType = typeof(TClass);
// Create datatable using class as definition.
var dt = UtilExtensions.CreateDataTable(loaderFileType);
var properties = UtilExtensions.PropertyGetters<TClass>();
using (var reader = new StreamReader(rFile))
{
//reader.ReadLine(); FIXED - THIS SKIPPED THE FIRST LINE AND CAUSED A RECORD TO BE OMITTED.
using (var csv = new CsvReader(reader,rconfig))
{
csv.Context.RegisterClassMap<TClassMap>();
dt.BeginLoadData();
while (csv.Read())
{
isBadRecord = false;
//csvReadTimer.Start();
var record = csv.GetRecord<TClass>();
//csvReadTimer.Stop();
runningCount++;
if (!isBadRecord)
{
csvTableTimer.Start();
// Process row and populate datatable
DataRow dr = dt.NewRow();
foreach (var p in properties)
{
var value = p.Value(record);
if (value != null)
dr[p.Key] = value;
}
dt.Rows.Add(dr);
csvTableTimer.Stop();
goodCount++;
if (++batchCount >= dtbatchSize)
{
// Flush the data table
FlushTable(dt);
batchCount = 0;
}
}
}
dt.EndLoadData();
FlushTable(dt);
}
}
return goodCount;
}
时间将进一步缩短,大约为 404 毫秒。演示小提琴 #3 here。
我也尝试使用 Delegate.CreateDelegate()
而不是 Expression
:
public static partial class UtilExtensions
{
static Func<TSource,object> CreateTypedPropertyGetter<TSource,TValue>(PropertyInfo propertyInfo)
{
var typedFunc = (Func<TSource,TValue>)Delegate.CreateDelegate(typeof(Func<TSource,TValue>),propertyInfo.GetGetMethod());
return i => (object)typedFunc(i);
}
public static Func<TSource,object> CreatePropertyGetter<TSource>(PropertyInfo propertyInfo)
{
var typedCreator = typeof(UtilExtensions).GetMethod(nameof(CreateTypedPropertyGetter),BindingFlags.Static | BindingFlags.Public | BindingFlags.NonPublic);
var concreteTypedCreator = typedCreator = typedCreator.MakeGenericMethod(typeof(TSource),propertyInfo.PropertyType);
return (Func<TSource,object>)concreteTypedCreator.Invoke(null,new object [] { propertyInfo });
}
public static ReadOnlyDictionary<string,TValue>(dictionary ?? throw new ArgumentNullException());
}
并且得到大致相同的时间,为 410 毫秒。演示小提琴 #4 here。
注意事项:
-
您问题中的代码通过调用
reader.ReadLine();
跳过了 CSV 文件的第一行。在我的测试工具中,这导致读取的记录数量不正确,所以我删除了这一行。
-
我没有使用在记录类型上有开关的非泛型方法,而是提取了一个将记录类型和类映射类型作为泛型参数的泛型方法。这使得委托创建更容易一些,因为不再需要对记录类型进行运行时转换。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。