如何在 Powershell 中对 3000 万条 csv 记录进行排序

如何解决如何在 Powershell 中对 3000 万条 csv 记录进行排序

我正在使用 oledbconnection 对 csv 文件的第一列进行排序。 Oledb 连接在 6 分钟内成功执行了多达 900 万条记录。但是当我执行 1000 万条记录时,收到以下警报消息。

使用“0”个参数调用“ExecuteReader”的异常:“查询无法完成。查询结果的大小大于数据库的最大大小(2 GB),或者 磁盘上没有足够的临时存储空间来存储查询结果。"

是否有其他解决方案可以使用 Powershell 对 3000 万进行排序?

这是我的脚本

$OutputFile = "D:\Performance_test_data\output1.csv"
$stream = [System.IO.StreamWriter]::new( $OutputFile )

$sb = [System.Text.StringBuilder]::new()
$sw = [Diagnostics.Stopwatch]::StartNew()

$conn = New-Object System.Data.OleDb.OleDbConnection("Provider=Microsoft.ACE.OLEDB.12.0;Data Source='D:\Performance_test_data\';Extended Properties='Text;HDR=Yes;CharacterSet=65001;FMT=Delimited';")
$cmd=$conn.CreateCommand()
$cmd.CommandText="Select * from 1crores.csv order by col6"

$conn.open()

$data = $cmd.ExecuteReader()

echo "Query has been completed!"
$stream.WriteLine( "col1,col2,col3,col4,col5,col6")

while ($data.read()) 
{ 
  $stream.WriteLine( $data.GetValue(0) +',' + $data.GetValue(1)+',' + $data.GetValue(2)+',' + $data.GetValue(3)+',' + $data.GetValue(4)+',' + $data.GetValue(5))

}
echo "data written successfully!!!"

$stream.close()
$sw.Stop()
$sw.Elapsed

$cmd.dispose()
$conn.dispose()

解决方法

你可以试试 SQLite:

$OutputFile = "D:\Performance_test_data\output1.csv"

$sw = [Diagnostics.Stopwatch]::StartNew()

sqlite3 output1.db '.mode csv' '.import 1crores.csv 1crores' '.headers on' ".output $OutputFile" 'Select * from 1crores order by 最終アクセス日時'

echo "data written successfully!!!"

$sw.Stop()
$sw.Elapsed
,

你可以尝试使用这个:

$CSVPath = 'C:\test\CSVTest.csv'
$Delimiter = ';'

# list we use to hold the results
$ResultList = [System.Collections.Generic.List[Object]]::new()

# Create a stream (I use OpenText because it returns a streamreader)
$File = [System.IO.File]::OpenText($CSVPath)

# Read and parse the header
$HeaderString = $File.ReadLine()

# Get the properties from the string,replace quotes
$Properties = $HeaderString.Split($Delimiter).Replace('"',$null)
$PropertyCount = $Properties.Count

# now read the rest of the data,parse it,build an object and add it to a list
while ($File.EndOfStream -ne $true)
{
    # Read the line
    $Line = $File.ReadLine()
    # split the fields and replace the quotes
    $LineData = $Line.Split($Delimiter).Replace('"',$null)
    # Create a hashtable with the properties (we convert this to a PSCustomObject later on). I use an ordered hashtable to keep the order
    $PropHash = [System.Collections.Specialized.OrderedDictionary]@{}
    # if loop to add the properties and values
    for ($i = 0; $i -lt $PropertyCount; $i++)
    { 
        $PropHash.Add($Properties[$i],$LineData[$i])
    }
    # Now convert the data to a PSCustomObject and add it to the list
    $ResultList.Add($([PSCustomObject]$PropHash))
}

# Now you can sort this list using Linq:
Add-Type -AssemblyName System.Linq
# Sort using propertyname (my sample data had a prop called "Name")
$Sorted = [Linq.Enumerable]::OrderBy($ResultList,[Func[object,string]] { $args[0].Name })

我没有使用 import-csv,而是编写了一个快速解析器,它使用流读取器并动态解析 CSV 数据并将其放入 PSCustomObject。 然后将其添加到列表中。

编辑:修复了 linq 示例

,

把性能放在一边,至少找到一个有效的解决方案(意味着一个不会因为内存不足而挂起的解决方案)我会依赖 PowerShell 管道。问题是,为了对对象进行排序,您需要停止管道,因为最后一个对象可能会成为第一个对象。
为了解决这部分问题,我将首先对关注属性的第一个字符进行粗划分。完成后,对每个粗划分进行细排序并附加结果:

Function Sort-BigObject {
    [CmdletBinding()] param(
        [Parameter(ValueFromPipeLine = $True)]$InputObject,[Parameter(Position = 0)][String]$Property,[ValidateRange(1,9)]$Coarse = 1,[System.Text.Encoding]$Encoding = [System.Text.Encoding]::Default
    )
    Begin {
        $TemporaryFiles = [System.Collections.SortedList]::new()
    }
    Process {
        if ($InputObject.$Property) {
            $Grain = $InputObject.$Property.SubString(0,$Coarse)
            if (!$TemporaryFiles.Contains($Grain)) { $TemporaryFiles[$Grain] = New-TemporaryFile }
            $InputObject | Export-Csv $TemporaryFiles[$Grain] -Encoding $Encoding -Append
        } else { $InputObject.$Property }
    }
    End {
        Foreach ($TemporaryFile in $TemporaryFiles.Values) {
            Import-Csv $TemporaryFile -Encoding $Encoding | Sort-Object $Property
            Remove-Item -LiteralPath $TemporaryFile
        }
    }
}

使用
(不要将流分配给变量,也不要使用括号。)

Import-Csv .\1crores.csv | Sort-BigObject <PropertyName> | Export-Csv .\output.csv
  • 如果临时文件仍然太大而无法处理,您可能需要增加 -Coarse 参数

注意事项(改进注意事项)

  • 排序属性为空的对象将立即输出
  • 排序列假定为(单个)字符串
  • 我认为性能很差(我没有对 3000 万条记录进行完整测试,但 10.000 条记录需要大约 8 秒,这意味着大约 8 小时)。考虑用 .Net 流方法替换本机 PowerShell cmdlet。缓冲/缓存文件输入和输出,并行处理?
,

我添加了一个新答案,因为这是解决此问题的完全不同的方法。
您可以考虑创建一个有序索引列表,而不是创建临时文件(这可能会导致大量文件打开和关闭),而不是多次遍历输入文件 (-FilePath)每次,处理选定数量的行(-BufferSize = 1Gb,您可能需要调整此“内存使用与性能”参数):

Function Sort-Csv {
    [CmdletBinding()] param(
        [string]$InputFile,[String]$Property,[string]$OutputFile,[Char]$Delimiter = ',',[System.Text.Encoding]$Encoding = [System.Text.Encoding]::Default,[Int]$BufferSize = 1Gb
    )
    Begin {
        if ($InputFile.StartsWith('.\')) { $InputFile = Join-Path (Get-Location) $InputFile }
        $Index = 0
        $Dictionary = [System.Collections.Generic.SortedDictionary[string,[Collections.Generic.List[Int]]]]::new()
        Import-Csv $InputFile -Delimiter $Delimiter -Encoding $Encoding | Foreach-Object { 
            if (!$Dictionary.ContainsKey($_.$Property)) { $Dictionary[$_.$Property] = [Collections.Generic.List[Int]]::new() }
            $Dictionary[$_.$Property].Add($Index++)
        }
        $Indices = [int[]]($Dictionary.Values | ForEach-Object { $_ })
        $Dictionary = $Null                                     # we only need the sorted index list
    }
    Process {
        $Start = 0
        $ChunkSize = [int]($BufferSize / (Get-Item $InputFile).Length * $Indices.Count / 2.2)
        While ($Start -lt $Indices.Count) {
            [System.GC]::Collect()
            $End = $Start + $ChunkSize - 1
            if ($End -ge $Indices.Count) { $End = $Indices.Count - 1 }
            $Chunk = @{}
            For ($i = $Start; $i -le $End; $i++) { $Chunk[$Indices[$i]] = $i }
            $Reader = [System.IO.StreamReader]::new($InputFile,$Encoding)
            $Header = $Reader.ReadLine()
            $i = $Start
            $Count = 0
            For ($i = 0; ($Line = $Reader.ReadLine()) -and $Count -lt $ChunkSize; $i++) {
                if ($Chunk.Contains($i)) { $Chunk[$i] = $Line }
            }
            $Reader.Dispose()
            if ($OutputFile) {
                if ($OutputFile.StartsWith('.\')) { $OutputFile = Join-Path (Get-Location) $OutputFile }
                $Writer = [System.IO.StreamWriter]::new($OutputFile,($Start -ne 0),$Encoding)
                if ($Start -eq 0) { $Writer.WriteLine($Header) }
                For ($i = $Start; $i -le $End; $i++) { $Writer.WriteLine($Chunk[$Indices[$i]]) }
                $Writer.Dispose()
            } else {
                $Start..$End | ForEach-Object { $Header } { $Chunk[$Indices[$_]] } | ConvertFrom-Csv -Delimiter $Delimiter
            }
            $Chunk = $Null
            $Start = $End + 1
        }
    }
}

基本用法

Sort-Csv .\Input.csv <PropertyName> -Output .\Output.csv
Sort-Csv .\Input.csv <PropertyName> | ... | Export-Csv .\Output.csv

请注意,对于 1Crones.csv,它可能只会一次性导出完整文件,除非您将 -BufferSize 设置为较低的数量,例如500Kb

,

我从这里下载了 gnu sort.exe:http://gnuwin32.sourceforge.net/packages/coreutils.htm 它还需要依赖 zip 中的 libiconv2.dll 和 libintl3.dll。我基本上是在 cmd.exe 中执行此操作的,它使用的内存不到一演出,大约需要 5 分钟。这是一个大约 3000 万个随机数的 500 兆文件。此命令还可以使用 --merge 合并已排序的文件。您还可以指定开始和结束键位置进行排序 --key。它会自动使用临时文件。

.\sort.exe < file1.csv > file2.csv

实际上,它的工作方式与 cmd 提示符下的 Windows 排序类似。 Windows 排序还有一个 /+n 选项,用于指定开始排序的字符列。

sort.exe < file1.csv > file2.csv

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?