微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark是否尊重kudu的散列分区,类似于镶木桌子上的桶式连接?

如何解决Spark是否尊重kudu的散列分区,类似于镶木桌子上的桶式连接?

我正在用Spark试用Kudu。我想使用以下架构联接2个表-

# This table has around 1 million records
TABLE dimensions (
    id INT32 NOT NULL,PRIMARY KEY (id)
)
HASH (id) PARTITIONS 32,RANGE (id) (
    PARTITION UNBOUNDED
)
OWNER root
REPLICAS 1

# This table has 500 million records
TABLE facts (
    id INT32 NOT NULL,date DATE NOT NULL,PRIMARY KEY (id,date)
)
HASH (id) PARTITIONS 32,RANGE (id,date) (
    PARTITION UNBOUNDED
)
OWNER root
REPLICAS 1

我使用以下脚本将数据插入了这些表-

// Load data to spark dataframe
val dimensions_raw = spark.sqlContext.read.format("csv")
  .option("header","true")
  .option("inferSchema","true")
  .load("/root/dimensions.csv")

dimensions_raw.printSchema
dimensions_raw.createOrReplaceTempView("dimensions_raw")

// Set the primary key columns
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
def setNotNull(df: DataFrame,columns: Seq[String]) : DataFrame = {
  val schema = df.schema
  // Modify [[StructField] for the specified columns.
  val newSchema = StructType(schema.map {
    case StructField(c,t,_,m) if columns.contains(c) => StructField(c,nullable = false,m)
    case y: StructField => y
  })
  // Apply new schema to the DataFrame
  df.sqlContext.createDataFrame(df.rdd,newSchema)
}
val primaryKeyCols = Seq("id") // `primaryKeyCols` for `facts` table is `(id,date)`
val dimensions_prep = setNotNull(dimensions_raw,primaryKeyCols)
dimensions_prep.printSchema
dimensions_prep.createOrReplaceTempView("dimensions_prep")


// Create a kudu table
import collection.JavaConverters._
import org.apache.kudu.client._
import org.apache.kudu.spark.kudu._
val kuduContext = new KuduContext("localhost:7051",spark.sparkContext)

// Delete the table if it already exists.
if(kuduContext.tableExists("dimensions")) {
    kuduContext.deleteTable("dimensions")
}

kuduContext.createTable("dimensions",dimensions_prep.schema,/* primary key */ primaryKeyCols,new CreateTableOptions()
    .setNumReplicas(1)
    .addHashPartitions(List("id").asJava,32))

// Load the kudu table from spark dataframe
kuduContext.insertRows(dimensions_prep,"dimensions")
// Create a DataFrame that points to the Kudu table we want to query.
val dimensions = spark.read
    .option("kudu.master","localhost:7051")
    .option("kudu.table","dimensions")
    .format("kudu").load
dimensions.createOrReplaceTempView("dimensions")

也为facts表运行以上脚本。

我想将facts上的dimensionsid表联接在一起。我在Spark-

中尝试了以下命令
val query = facts.join(dimensions,facts.col("id") === dimensions.col("id"))
query.show()

// And I get the following Physical plan-
== Physical Plan ==
*(5) SortMergeJoin [id#0],[id#14],Inner
:- *(2) Sort [id#0 ASC NULLS FirsT],false,0
:  +- Exchange hashpartitioning(id#0,200),true,[id=#43]
:     +- *(1) Scan Kudu facts [id#0,date#1] PushedFilters: [],ReadSchema: struct<id:int,date:date...
+- *(4) Sort [id#14 ASC NULLS FirsT],0
   +- Exchange hashpartitioning(id#14,[id=#49]
      +- *(3) Scan Kudu dimensions [id#14] PushedFilters: [],ReadSchema: struct<id:int>

我的问题是,我如何告诉火花表已经在id(联接键)上排序,因此无需再次排序。 此外,Exchange hashpartitioning不需要完成,因为该表已经存储在id上。

在运行单个主机和平板电脑服务器的一台计算机上,联接查询花费不到100秒的时间。 我在这里做错什么了吗?还是Kudu这种查询的预期速度?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。