CsvHelper - 缓慢填充数据表ish

如何解决CsvHelper - 缓慢填充数据表ish

将下面的代码放在一起以读取一组特定的 CSV 文件。它有效,但仍在进行中。有一段代码(填充数据表行 - 请参阅下面的片段)与 sqlBulkcopy 操作的运行时间一样长。就如何提高绩效征求意见/建议。

在(下面)的代码中,以 50K 批次处理一个 ~15M 行文件的时间不到 11.5 分钟。分解部分。 sqlBulkcopy 耗时约 236 公里(4 分钟),阅读器只需要 105 公里(约 1.5 分钟),填充数据表的部分耗时约 200 公里(3.33 分钟)。

     csvTableTimer.Start();
     // Process row and populate datatable
     DaTarow dr = dt.NewRow();
 
          foreach (DataColumn dc in dt.Columns)
          {
               if (row.GetType().GetProperty(dc.ToString()).GetValue(row) != null)
               {
                    dr[dc.ToString()] = row.GetType().GetProperty(dc.ToString()).GetValue(row);
               }
           }
           dt.Rows.Add(dr);
 
      csvTableTimer.Stop();

CSV 文件非常大(10+GB)并且没有标题。我正在使用 Class 来构建数据表结构,并希望在填充数据表行时继续使用这种方法,因为我需要扩展它以处理多种 CSV 类型。

数据表反映了与 sql DB 表对齐的类中的列名。曾想使用 GetField(已转换,而不是原始)遍历数据表 row[column.ColumnName] = csv.GetField( column.DataType,column.ColumnName ); 中的每一列,但一直收到关于没有标题错误。发现了一个与 HasHeaderRecord = false 相关的未决问题,它与我试图做的事情相匹配,这增加了我向更擅长这方面的人寻求建议的愿望。感谢您的帮助!

代码块上展开;

     var rconfig = new CsvHelper.Configuration.CsvConfiguration(CultureInfo.InvariantCulture)
     {
         BufferSize = 1024,Delimiter = ",",AllowComments = true,HasHeaderRecord = false,HeaderValidated = null,IgnoreBlankLines = true,MissingFieldFound = null,Comment = '#',Escape = '"',TrimOptions = TrimOptions.Trim,BadDataFound = x =>
         {
             isBadRecord = true;
             ErrRecords.Add(x.RawRecord);
             ++badCount;
         }
     };

     var loadFType = @"B";
     // Create datatable using class as deFinition.
     PropertyDescriptorCollection props1 = TypeDescriptor.GetProperties(loaderFileType);
     DataTable dt = new DataTable();
     dt = UtilExtensions.CreateDataTable(props1);

     using (var reader = new StreamReader(rFile))
     {
         reader.ReadLine();
          
         using (var csv = new CsvReader(reader,rconfig))
         {
             switch (loadFType)
             {
                 case "ALL":
                     csv.Context.RegisterClassMap<CSVLoader.AMap>();
                     var allRecords = new List<CSVLoader.A>();
                     break;
                 case "BAL":
                     csv.Context.RegisterClassMap<CSVLoader.BMap>();
                     var balRecords = new List<CSVLoader.B>();
                     break;

                 case "CIF":
                     csv.Context.RegisterClassMap<CSVLoader.CMap>();
                     var cifRecords = new List<CSVLoader.C>();
                     break;
             }

             dt.BeginLoadData();
             while (csv.Read())
             {
                 csvReadTimer.Start();
                 var row = csv.GetRecord(loaderFileType);
                 csvReadTimer.Stop();

                 runningCount++;

                 if (!isBadRecord)
                 {
                      csvTableTimer.Start();
                      // Process row and populate datatable
                      DaTarow dr = dt.NewRow();

                      foreach (DataColumn dc in dt.Columns)
                      {
                          if (row.GetType().GetProperty(dc.ToString()).GetValue(row) != null)
                          {
                              dr[dc.ToString()] = row.GetType().GetProperty(dc.ToString()).GetValue(row);
                          }
                      }
                     dt.Rows.Add(dr);

                     csvTableTimer.Stop();
                     ++goodCount;

                     if (batchCount >= dtbatchSize || runningCount >= fileRecCount)
                     {
                         try
                         {
                             // Write from the source to the destination.
                             bcpLoadTimer.Start();

                             bulkcopy.WritetoServer(dt);

                             bcpLoadTimer.Stop();
                             bcpLoadBatchCount++;

                         }
                         catch (Exception ex)
                         {
                         }
                         dt.Clear();
                         batchCount = 0;
                     }
                     batchCount++;
                 }
                 isBadRecord = false;
             }
             dt.EndLoadData();
             reader.Close();
             dt.Clear();
             transaction.Commit();
// B
public class B
{
    [Index(0)]
    public string A { get; set; }
    [Index(1)]
    public string BString { get; set; }
    [Index(2)]
    public int? C { get; set; }
    [Index(3)]
    public string D { get; set; }
    [Index(4)]
    public string E { get; set; }
    [Index(5)]
    public DateTime? F { get; set; }
    [Index(6)]
    public decimal? G { get; set; }
    [Index(7)]
    public decimal? H { get; set; }
    [Index(8)]
    public decimal? I { get; set; }
    [Index(9)]
    public decimal? J { get; set; }
    [Index(10)]
    public int? K { get; set; }
    [Index(11)]
    public string L { get; set; }
    [Index(12)]
    public DateTime? M { get; set; }
}

// B
public sealed class BMap : ClassMap<B>
{
    public BMap()
    {
        // AutoMap(CultureInfo.InvariantCulture);
        Map(m => m.A).Index(0);
        Map(m => m.BString).Index(1); 
        Map(m => m.C).Index(2);
        Map(m => m.D).Index(3);
        Map(m => m.E).Index(4);
        Map(m => m.F).Index(5).TypeConverterOption.Format("yyyyMMdd");
        Map(m => m.G).Index(6);
        Map(m => m.H).Index(7);
        Map(m => m.I).Index(8);
        Map(m => m.J).Index(9);
        Map(m => m.K).Index(10);
        Map(m => m.L).Index(11);
        Map(m => m.M).Index(12).TypeConverterOption.Format("yyyy-MM-dd-hh.mm.ss.ffffff");
    }
}

解决方法

您的问题并未真正包含 minimal reproducible example,因此我简化了您的代码以创建以下 FileLoader 类,该类计算从某些实例填充 DataTable 所需的时间使用 TClass 从 CSV 行读取的类 B(此处为 CsvReader):

public class FileLoader
{
    public System.Diagnostics.Stopwatch csvTableTimer { get; } = new();

    public long Load<TClass,TClassMap>(string rFile,int dtbatchSize) where TClassMap : ClassMap<TClass>,new()
    {
        bool isBadRecord = false;
        long badCount = 0;
        long runningCount = 0;
        long goodCount = 0;
        long batchCount = 0;

        var rconfig = CreateCsvConfiguration(
            x => 
            {
                isBadRecord = true;
                //ErrRecords.Add(x.RawRecord);
                ++badCount;
            });
        
        // Create datatable using class as definition.
        var dt = UtilExtensions.CreateDataTable(typeof(TClass));

        using (var reader = new StreamReader(rFile))
        {
            //reader.ReadLine();  FIXED - THIS SKIPPED THE FIRST LINE AND CAUSED A RECORD TO BE OMITTED.
            using (var csv = new CsvReader(reader,rconfig))
            {
                csv.Context.RegisterClassMap<TClassMap>();

                dt.BeginLoadData();
                while (csv.Read())
                {
                    isBadRecord = false;
                    //csvReadTimer.Start();
                    var record = csv.GetRecord<TClass>();
                    //csvReadTimer.Stop();

                    runningCount++;
                    if (!isBadRecord)
                    {
                        csvTableTimer.Start();
                        // Process row and populate datatable
                        DataRow dr = dt.NewRow();
                        foreach (DataColumn dc in dt.Columns)
                        {
                            if (record.GetType().GetProperty(dc.ToString()).GetValue(record) != null)
                            {
                                dr[dc.ToString()] = record.GetType().GetProperty(dc.ToString()).GetValue(record);
                            }
                        }
                        dt.Rows.Add(dr);
                        csvTableTimer.Stop();
                        goodCount++;
                        if (++batchCount >= dtbatchSize)
                        {
                            // Flush the data table
                            FlushTable(dt);
                            batchCount = 0;
                        }
                    }
                }
                dt.EndLoadData();
                FlushTable(dt);
                Commit();
            }
        }
        
        return goodCount;
    }

    protected virtual void FlushTable(DataTable dt) => dt.Clear();  // Replace with SqlBulkCopy 
    protected virtual void Commit() {} // Replace with transaction.Commit();
    
    public static CsvConfiguration CreateCsvConfiguration(BadDataFound badDataFound) => 
        new CsvHelper.Configuration.CsvConfiguration(CultureInfo.InvariantCulture)
        {
            BufferSize = 1024,Delimiter = ",",AllowComments = true,HasHeaderRecord = false,HeaderValidated = null,IgnoreBlankLines = true,MissingFieldFound = null,Comment = '#',Escape = '"',TrimOptions = TrimOptions.Trim,BadDataFound = badDataFound,};
}

public static partial class UtilExtensions
{
    static IEnumerable<PropertyInfo> GetSerializableProperties(this Type type) => 
        type.GetProperties().Where(p => p.GetIndexParameters().Length == 0 && p.CanRead && p.CanWrite && p.GetGetMethod() != null && p.GetSetMethod() != null);
    
    public static DataTable CreateDataTable(Type type)
    {
        var dt = new DataTable();
        foreach (var p in type.GetSerializableProperties())
            dt.Columns.Add(p.Name,Nullable.GetUnderlyingType(p.PropertyType) ?? p.PropertyType);
        return dt;
    }
}

然后,如果我使用文件加载器并调用 loader.Load<B,BMap>(rFile,1000) 来读取 5555 行的 CSV 文件 20 次,则在 dotnetfiddle 上大约需要 1049 毫秒。参见演示 #1 here

您遇到的一个问题是 c# 中的反射可能非常慢。您调用了 record.GetType().GetProperty(dc.ToString()).GetValue(record) 两次,如果我只是将调用次数减少 1,时间将减少到大约 706 毫秒:

                        foreach (DataColumn dc in dt.Columns)
                        {
                            var value = record.GetType().GetProperty(dc.ToString()).GetValue(record);
                            if (value != null)
                            {
                                dr[dc.ToString()] = value;
                            }
                        }

演示 #2 here

但是,我们可以通过在运行时制造委托来做得更好。首先,添加以下使用 System.Linq.Expressions 命名空间的实用方法:

public static partial class UtilExtensions
{
    public static Func<TSource,object> CreatePropertyGetter<TSource>(PropertyInfo propertyInfo)
    {
        var parameter = Expression.Parameter(typeof(TSource),"obj");
        var property = Expression.Property(parameter,propertyInfo);
        var convert = Expression.Convert(property,typeof(object));
        var lambda = Expression.Lambda(typeof(Func<TSource,object>),convert,parameter);

        return (Func<TSource,object>)lambda.Compile();
    }

    public static ReadOnlyDictionary<string,Func<TSource,object>> PropertyGetters<TSource>() => PropertyExpressionsCache<TSource>.PropertyGetters;

    static ReadOnlyDictionary<string,object>> CreatePropertyGetters<TSource>() =>
        typeof(TSource)
            .GetSerializableProperties()
            .ToDictionary(p => p.Name,p => CreatePropertyGetter<TSource>(p))
            .ToReadOnly();

    static class PropertyExpressionsCache<TSource>
    {
        public static ReadOnlyDictionary<string,object>> PropertyGetters { get; } = UtilExtensions.CreatePropertyGetters<TSource>();
    }
    
    public static ReadOnlyDictionary<TKey,TValue> ToReadOnly<TKey,TValue>(this IDictionary<TKey,TValue> dictionary) => 
        new ReadOnlyDictionary<TKey,TValue>(dictionary ?? throw new ArgumentNullException());
}

并修改Load<TClass,TClassMap>()如下:

public long Load<TClass,new()
{
    bool isBadRecord = false;
    long badCount = 0;
    long runningCount = 0;
    long goodCount = 0;
    long batchCount = 0;

    var rconfig = CreateCsvConfiguration(
        x => 
        {
            isBadRecord = true;
            //ErrRecords.Add(x.RawRecord);
            ++badCount;
        });
    
    var loaderFileType = typeof(TClass);

    // Create datatable using class as definition.
    var dt = UtilExtensions.CreateDataTable(loaderFileType);
    var properties = UtilExtensions.PropertyGetters<TClass>();

    using (var reader = new StreamReader(rFile))
    {
        //reader.ReadLine();  FIXED - THIS SKIPPED THE FIRST LINE AND CAUSED A RECORD TO BE OMITTED.
        using (var csv = new CsvReader(reader,rconfig))
        {
            csv.Context.RegisterClassMap<TClassMap>();

            dt.BeginLoadData();
            while (csv.Read())
            {
                isBadRecord = false;
                //csvReadTimer.Start();
                var record = csv.GetRecord<TClass>();
                //csvReadTimer.Stop();

                runningCount++;
                if (!isBadRecord)
                {
                    csvTableTimer.Start();
                    // Process row and populate datatable
                    DataRow dr = dt.NewRow();
                    foreach (var p in properties)
                    {
                        var value = p.Value(record);
                        if (value != null)
                            dr[p.Key] =  value;
                    }
                    dt.Rows.Add(dr);
                    csvTableTimer.Stop();
                    goodCount++;
                    if (++batchCount >= dtbatchSize)
                    {
                        // Flush the data table
                        FlushTable(dt);
                        batchCount = 0;
                    }
                }
            }
            dt.EndLoadData();
            FlushTable(dt);
        }
    }
    
    return goodCount;
}

时间将进一步缩短,大约为 404 毫秒。演示小提琴 #3 here

我也尝试使用 Delegate.CreateDelegate() 而不是 Expression

public static partial class UtilExtensions
{
    static Func<TSource,object> CreateTypedPropertyGetter<TSource,TValue>(PropertyInfo propertyInfo)
    {
        var typedFunc = (Func<TSource,TValue>)Delegate.CreateDelegate(typeof(Func<TSource,TValue>),propertyInfo.GetGetMethod());
        return i => (object)typedFunc(i);
    }

    public static Func<TSource,object> CreatePropertyGetter<TSource>(PropertyInfo propertyInfo)
    {
        var typedCreator = typeof(UtilExtensions).GetMethod(nameof(CreateTypedPropertyGetter),BindingFlags.Static | BindingFlags.Public | BindingFlags.NonPublic);
        var concreteTypedCreator = typedCreator = typedCreator.MakeGenericMethod(typeof(TSource),propertyInfo.PropertyType);
        return (Func<TSource,object>)concreteTypedCreator.Invoke(null,new object [] { propertyInfo });
    }

    public static ReadOnlyDictionary<string,TValue>(dictionary ?? throw new ArgumentNullException());
}

并且得到大致相同的时间,为 410 毫秒。演示小提琴 #4 here

注意事项:

  • 您问题中的代码通过调用 reader.ReadLine(); 跳过了 CSV 文件的第一行。

    在我的测试工具中,这导致读取的记录数量不正确,所以我删除了这一行。

  • 我没有使用在记录类型上有开关的非泛型方法,而是提取了一个将记录类型和类映射类型作为泛型参数的泛型方法。这使得委托创建更容易一些,因为不再需要对记录类型进行运行时转换。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?