MemoryStream - 数据被截断

如何解决MemoryStream - 数据被截断

原始问题 - CSV 文件太大 (700k) 记录 - 因此希望从该大 CSV 文件创建更小的 CSV 文件

得到以下代码来剖析文件并创建更小的文件

 private async Task SplitFile(List<CsvRow> rows,string name)
    {
        var numRows = 30000;

        var remainder = rows.Count() % numRows;
        var chunks = rows.Count() / numRows;

        if (remainder > 0)
        {
            chunks++;
        }

        // Iterate rows in chunks
        for (var row = 0; row < chunks; row++)
        {
            // Extract chunks using LINQ
            var fileRows = rows
                .Skip(row * numRows)
                .Take(numRows)
                .ToList();

            var outputPath = Path.Combine(@"c:\",$"file{row}.txt");
            var encoding = new UTF8Encoding(true);
            
            await using var mem = new MemoryStream();
            await using var fileWriter = new StreamWriter(outputPath,false,encoding);
            await using var writer = new StreamWriter(mem,encoding);
            await using var csvBlob = new CsvWriter(writer,CultureInfo.InvariantCulture);
            await using var csvFile = new CsvWriter(fileWriter,CultureInfo.InvariantCulture);

            await csvFile.WriteRecordsAsync(fileRows);
            await csvBlob.WriteRecordsAsync(fileRows);

            FileStream file = new FileStream(@$"c:\memfile{row}.txt",FileMode.Create,FileAccess.Write);
            mem.Writeto(file);
            file.Close();
        }
    }

阻止程序 - 我正在从 Azure Blob 容器下载原始大文件,在创建小块后,我要将它们上传回 Blob 容器。为此,我需要在 MemoryStreams 中获取数据。

我创建物理文件只是为了解决我有内存流的问题。更易于调试。

当我运行上面的代码时 - 创建了小块文件。您会注意到我正在创建两组文件(块)

首先,直接将数据写入File Stream,其次,使用我创建的MemoryStream。

我在通过直接写入 File Stream 创建的文件中得到 30000 条记录,但在第二个文件中我只得到 29889 条记录。

我尝试了所有方法,但无法在使用 MemoryStream 后立即获取所有 30000 条记录。

我刷新了流,摆弄了编码,但没有任何帮助。我阅读了有关带有 BOM 的 UTF8 的信息。这看起来很有希望,但又无法解决

我使用的是 Dot Net Core 3.1

MemoryStream 是否存在已知问题。为什么它丢失了最后几条记录?其余文件相同。

有什么想法吗?

谢谢

解决方法

正如我上面评论的那样,修复方法是在复制 Flush 之前在 CsvWriter 上调用 MemoryStream。问题在于 CsvWriter 内部缓冲区中仍有待处理的数据,在您 MemoryStream 之前不会将其复制到 Flush。这应该会让事情对你有用。

但是,我对您的方案有更深入的反馈。在处理批次之前,您似乎正在将整个 700K 文件读入 List<CsvRow>。更好的方法是从 Azure 流式传输 CSV 数据,然后在您阅读时将较小的批次发送回 Azure。

在本例中,我将使用我自己的库 (Sylvan.Data.Csv),但我确信 CsvHelper 提供了类似的功能。

using Sylvan.Data.Csv;

...

string name = "MyMassiveCsv";
TextReader reader = File.OpenText(name + ".csv");

// replace above with however you access your Azure blob streams.

CsvDataReader csv = await CsvDataReader.CreateAsync(reader);

RangeDataReader r;
int i = 0;
do
{
    r = new RangeDataReader(csv,30000);
    i++;

    using var writer = File.CreateText(name + i + ".csv");
    // using var writer = new StreamWriter(CreateAzureBlob("batch" + i));
    using var w = CsvDataWriter.Create(writer);

    await w.WriteAsync(r);
} while (!r.AtEndOfData);

通过这种方式,您一次只需在内存中保存少量 CSV 文件,并且您将立即开始发回批次,而不必先下载整个 CSV。

RangeDataReader 是一个 DbDataReader 实现,它包装了一个 DbDataReader 并限制它从底层读取器读取的行数。实现如下:

using System;
using System.Collections;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;

class RangeDataReader : DbDataReader
{
    readonly DbDataReader reader;

    int row = -1;
    int count;

    public RangeDataReader(DbDataReader dataReader,int count)
    {
        this.reader = dataReader;
        this.count = count;
    }


    public bool AtEndOfData { get; private set; }

    public override async Task<bool> ReadAsync(CancellationToken cancellationToken)
    {
        if (row < count)
        {
            row++;
            var r = await reader.ReadAsync(cancellationToken);
            if (!r)
            {
                AtEndOfData = r;
            }
            return r;
        }
        return false;
    }

    public override bool Read()
    {
        if (row < count)
        {
            row++;
            var r = reader.Read();
            if (!r)
            {
                AtEndOfData = r;
            }
            return r;
        }
        return false;
    }

    public override object this[int ordinal] => this.GetValue(ordinal);

    public override object this[string name] => this.GetValue(GetOrdinal(name));

    public override int Depth => 0;

    public override int FieldCount => reader.FieldCount;

    public override bool HasRows => reader.HasRows;

    public override bool IsClosed => reader.IsClosed;

    public override int RecordsAffected => reader.RecordsAffected;


    public override bool GetBoolean(int ordinal)
    {
        return reader.GetBoolean(ordinal);
    }

    public override byte GetByte(int ordinal)
    {
        return reader.GetByte(ordinal);
    }

    public override long GetBytes(int ordinal,long dataOffset,byte[]? buffer,int bufferOffset,int length)
    {
        return reader.GetBytes(ordinal,dataOffset,buffer,bufferOffset,length);
    }

    public override char GetChar(int ordinal)
    {
        return reader.GetChar(ordinal);
    }

    public override long GetChars(int ordinal,char[]? buffer,int length)
    {
        return reader.GetChars(ordinal,length);
    }

    public override string GetDataTypeName(int ordinal)
    {
        return reader.GetDataTypeName(ordinal);
    }

    public override DateTime GetDateTime(int ordinal)
    {
        return reader.GetDateTime(ordinal);
    }

    public override decimal GetDecimal(int ordinal)
    {
        return reader.GetDecimal(ordinal);
    }

    public override double GetDouble(int ordinal)
    {
        return reader.GetDouble(ordinal);
    }

    public override IEnumerator GetEnumerator()
    {
        return new DbEnumerator(this);
    }

    public override Type GetFieldType(int ordinal)
    {
        return reader.GetFieldType(ordinal);
    }

    public override float GetFloat(int ordinal)
    {
        return reader.GetFloat(ordinal);
    }

    public override Guid GetGuid(int ordinal)
    {
        return reader.GetGuid(ordinal);
    }

    public override short GetInt16(int ordinal)
    {
        return reader.GetInt16(ordinal);
    }

    public override int GetInt32(int ordinal)
    {
        return reader.GetInt32(ordinal);
    }

    public override long GetInt64(int ordinal)
    {
        return reader.GetInt64(ordinal);
    }

    public override string GetName(int ordinal)
    {
        return reader.GetName(ordinal);
    }

    public override int GetOrdinal(string name)
    {
        return reader.GetOrdinal(name);
    }

    public override string GetString(int ordinal)
    {
        return reader.GetString(ordinal);
    }

    public override object GetValue(int ordinal)
    {
        return reader.GetValue(ordinal);
    }

    public override int GetValues(object[] values)
    {
        return reader.GetValues(values);
    }

    public override bool IsDBNull(int ordinal)
    {
        return reader.IsDBNull(ordinal);
    }

    public override bool NextResult()
    {
        throw new NotSupportedException();
    }
}


几乎所有内容都委托给内部数据读取器。唯一有趣的位是 Read/ReadAsync,它限制了它将读取的行数。我还没有彻底测试过这段代码,现在看看它,我可能会读取它的行数。

最后,既然我已经说明了如何对 CSV 数据进行流式处理,也许拆分的需要就消失了,您可以简单地流式处理文件而不是拆分它?很难知道为什么你觉得你需要拆分它。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?