微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

scala – 如何在阅读spark中的cassandra分区时获得良好的性能?

我正在使用cassandra-connector将数据从cassandra分区读取到spark.我尝试了以下解决方案来读取partitions.I尝试通过尽可能多地创建rdds来并行化任务,但解决方案ONE和解决方案TWO都具有相同的性能.

解决方案ONE中,我可以立即看到spark UI中的各个阶段.我试图在解决方案TWO中避免一个for循环.

解决方案TWO中,阶段在相当长的时间之后出现.随着用户ID的数量增加,在阶段出现在解决方案TWO的火花UI中之前,时间显着增加.

Version
 spark - 1.1
 Dse - 4.6
 cassandra-connector -1.1

Setup
 3 - Nodes with spark cassandra
 Each node has 1 core dedicated to this task.
 512MB ram for the executor memory.

我的cassandra表架构,

CREATE TABLE   test (
   user text,userid bigint,period timestamp,ip text,data blob,PRIMARY KEY((user,userid,period),ip)
   );

第一解决方

val users = List("u1","u2","u3")
 val period = List("2000-05-01","2000-05-01")
 val partitions = users.flatMap(x => period.map(y => (x,y))))
 val userids = 1 to 10
 for (userid <- userids){
 val rdds = partitions.map(x => sc.cassandratable("test_keyspace","table1")
                                .select("data")
                                .where("user=?",x._1)
                                .where("period=?",x._2)
                                .where("userid=?,userid)
                          )
 val combinedRdd = sc.union(rdds)
 val result = combinedRdd.map(getDataFromColumns)
                    .coalesce(4)
                    .reduceByKey((x,y) => x+y)
                    .collect()
 result.foreach(prinltn)
 }

解决方案:

val users = List("u1","2000-05-01")
 val userids = 1 to 10
 val partitions = users.flatMap(x => period.flatMap(
                  y => userids.map(z => (x,y,z))))

 val rdds = partitions.map(x => sc.cassandratable("test_keyspace",x._3)
                     )
 val combinedRdd = sc.union(rdds)
 val result = combinedRdd.map(getDataFromColumns)
                    .coalesce(4)
                    .reduceByKey((x,y) => x+y)
                    .collect()
 result.foreach(prinltn)

为什么解决方案TWO不比解决方案ONE快?

我的理解是,由于所有分区都在一段查询,而数据分布在节点上,因此应该更快.
如果我错了,请纠正我.

解决方法

首先,您应该查看joinWithCassandratable,它应该是您正在做的更容易的api(授予您足够的分区以使其值得).这个api接受分区键的RDD和palatalizes并从C *分发它们的检索.

这会让你做类似的事情

sc.parallelize(partitions).joinWithCassandratable("keyspace","table")

如果你想要你也可以做一个repartitionByCassandraReplica,但这很可能对非常小的请求没有好处.您必须对数据进行基准测试才能确定.

如果您只想做原始驱动程序命令,您可以执行类似的操作

val cc = CassandraConnector(sc.getConf)
partitions.mapPartitions{ it => 
  cc.withSessionDo{ session =>
    session.execute( Some query )
  }
}

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#performing-efficient-joins-with-cassandra-tables-since-12

代码示例演练:

现在让我们快速浏览您的代码示例
首先,我们只检索60个C *分区.对于这个用例,与从C *检索分区所花费的时间相比,我们很可能会在设置和删除任务所需的时间内占主导地位.

在这两种解决方案中,由于Spark的懒惰评估,你基本上都在做同样的事情.驱动程序创建一个图表,首先创建60个RDD,每个RDD都有一个惰性指令,用于从C *中检索单个分区. (每个分区1个RDD是坏的,RDD用于存储大量数据,因此最终会产生相当大的开销).虽然60 RDD是用不同的模式制作的,但这并不重要,因为在你打电话收集之前它们的实际评估不会发生.驱动程序继续设置新的RDD和转换.

在我们点击收集之前,绝对没有做任何事情来从C *中检索数据,因为我们在上面发布的两个解决方案中使用基本相同的依赖关系图点击收集,在两种情况下都会发生确切(或非常相似)的事情.所有60个RDD将在依赖图指定时解析.这将并行发生,但同样会非常开销.

为了避免这种情况,请查看我上面使用单个RDD获取所有信息的示例.

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐