微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Scala以懒惰集合处理大型scala数据的功能方式

我试图找出使用 scala中的字符串处理大量数据的内存高效和功能方法.我已经阅读了许多关于延迟集合的内容,并且已经看到了相当多的代码示例.但是,我一次又一次地遇到“GC开销超出”或“ Java堆空间”问题.

通常问题是我尝试构建一个惰性集合,但是当我将它附加到不断增长的集合时评估每个新元素(我现在没有任何其他方法可以逐步增加).当然,我可以先尝试初始化一个初始的延迟集合,然后通过将地图左右应用资源关键计算来产生保持所需值的集合,但通常我只是不知道最终集合的确切大小初始化懒惰集合的先验.

也许你可以通过给我提示或解释如何改进以下代码作为例子来帮助我,根据奇数序列对属于一个文件的规则将FASTA(下面的定义)格式化文件分成两个单独的文件甚至是到另一个(“股线分离”).这样做的“最”直接的方式是通过循环线路并通过打开的文件流打印到相应的文件中以一种必要的方式(这当然非常好).但是,我只是不喜欢重新分配包含头部和序列的变量的样式,因此下面的示例代码使用(尾部)递归,我希望找到一种方法来维护类似的设计而不会遇到资源问题!

该示例适用于小文件,但是对于大约500mb左右的文件,代码将在标准JVM设置中失败.我想要处理“arbitray”大小的文件,比如10-20g左右.

val fileName = args(0)
val in = io.source.fromFile(fileName) getLines

type itType = Iterator[String]
type sType = Stream[(String,String)]

def getFullSeqs(ite: itType) = {
    //val MetaChar = ">"
    val HeadPatt = "(^>)(.+)" r
    val SeqPatt  = "([\\w\\W]+)" r
    @annotation.tailrec
    def rec(it: itType,out: sType = Stream[(String,String)]()): sType = 
        if (it hasNext) it next match  {
            case HeadPatt(_,header) =>
                // introduce new header-sequence pair
                rec(it,(header,"") #:: out)
            case SeqPatt(seq) =>
                val oldVal = out head
                // concat subsequences
                val newStream = (oldVal._1,oldVal._2 + seq) #:: out.tail    
                rec(it,newStream)
            case _ =>
                println("something went wrong my friend,oh oh oh!"); Stream[(String,String)]()                
        } else out
    rec(ite)    
}

def printStrands(seqs: sType) {
   import java.io.PrintWriter
   import java.io.File
   def printStrand(seqse: sType,strand: Int) {
        // only use sequences of one strand 
        val indices =  List.tabulate(seqs.size/2)(_*2 + strand - 1).view
        val p = new PrintWriter(new File(fileName + "." + strand))
        indices foreach { i =>
              p.print(">" + seqse(i)._1 + "\n" + seqse(i)._2 + "\n")
        }; p.close
       println("Done bro!")
   }
   List(1,2).par foreach (s => printStrand(seqs,s))
}

printStrands(getFullSeqs(in))

我遇到三个问题:

A)假设需要维护一个大型数据结构,通过处理从getLines获得的初始迭代器,就像在我的getFullSeqs方法中一样(注意inF的不同大小和getFullSeqs的输出),因为整个(!)数据的转换需要反复进行,因为人们不知道在任何步骤中需要哪一部分数据.我的例子可能不是最好的,但如何做到这一点?有可能吗?

B)当所需的数据结构本身不是懒惰的时候,比如想要将(header – >序列)对存储到Map()中?你会把它包装在一个懒惰的集合中吗?

C)我构造流的实现可能会颠倒输入行的顺序.当调用reverse时,将评估所有元素(在我的代码中,它们已经是,所以这是实际问题).有没有办法以懒惰的方式“从后面”后期处理?我知道reverseIterator,但这已经是解决方案,或者这实际上不会首先评估所有元素(因为我需要在列表中调用它)?可以使用newVal#:: rec(…)构造流,但是我会丢失尾递归,不是吗?

所以我基本上需要的是向集合中添加元素,这些元素不会被添加过程评估.懒惰的val elem =“测试”; elem :: lazyCollection不是我想要的.

编辑:我也尝试在rec中使用stream参数的by-name参数.

非常感谢您的关注和时间,我非常感谢任何帮助(再次:)).

////////////////////////////////////////////////// ////////////////////////////////////////////////// ////////////////////////////////////////////////// ///////////////

FASTA被定义为由单个标题行界定的顺序序列集.标题定义为以“>”开头的行.标题下面的每一行都称为与标题关联的序列的一部分.当存在新标头时,序列结束.每个标题都是唯一的.例:

>头1
ABCDEFG
> HEADER2
hijklmn
opqrstu
> HEADER3
VWXYZ
> HEADER4
zyxwv

因此,序列2是seq 1的两倍.我的程序会将该文件拆分为包含的文件A.

>头1
ABCDEFG
> HEADER3
VWXYZ

和第二个文件B包含

> HEADER2
hijklmn
opqrstu
> HEADER4
zyxwv

假设输入文件由偶数个头 – 序列对组成.

解决方法

使用非常大的数据结构的关键是只在内存中保存对执行所需操作至关重要的内容.那么,在你的情况下,就是这样

>您的输入文件
>你的两个输出文件
>当前的文字

就是这样.在某些情况下,您可能需要存储信息,例如序列的长度;在这样的事件中,您在第一遍中构建数据结构并在第二遍中使用它们.例如,假设您决定要编写三个文件一个用于偶数记录,一个用于奇数,一个用于总长度小于300个核苷酸的条目.你会做这样的事情(警告 – 它编译,但我从来没有运行它,所以它可能实际上不起作用):

final def findSizes(
  data: Iterator[String],sz: Map[String,Long] = Map(),currentName: String = "",currentSize: Long = 0
): Map[String,Long] = {
  def currentMap = if (currentName != "") sz + (currentName->currentSize) else sz
  if (!data.hasNext) currentMap
  else {
    val s = data.next
    if (s(0) == '>') findSizes(data,currentMap,s,0)
    else findSizes(data,sz,currentName,currentSize + s.length)
  }
}

然后,为了处理,您使用该地图并再次通过:

import java.io._
final def writeFiles(
  source: Iterator[String],targets: Array[PrintWriter],sizes: Map[String,Long],count: Int = -1,which: Int = 0
) {
  if (!source.hasNext) targets.foreach(_.close)
  else {
    val s = source.next
    if (s(0) == '>') {
      val w = if (sizes.get(s).exists(_ < 300)) 2 else (count+1)%2
      targets(w).println(s)
      writeFiles(source,targets,sizes,count+1,w)
    }
    else {
      targets(which).println(s)
      writeFiles(source,count,which)
    }
  }
}

然后,您使用Source.fromFile(f).getLines()两次来创建迭代器,并且您已经完成了设置.编辑:从某种意义上讲,这是关键步骤,因为这是你的“懒惰”系列.但是,它并不重要,因为它不会立即读取所有内存(“懒惰”),而是因为它不存储任何以前的字符串!

更一般地说,Scala无法帮助您仔细考虑内存中需要哪些信息以及您可以根据需要从磁盘中获取内容.懒惰的评估有时会有所帮助,但是没有神奇的公式,因为您可以轻松地表达要求以懒惰的方式将所有数据存储在内存中. Scala无法解释您的命令来访问内存,因为秘密地指示从磁盘中取出内容. (好吧,除非你编写一个库来缓存来自磁盘的结果,否则不会这样做.)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐