工作当中几乎全用Sparksql ,RDD用的很少(面试多)
Sparksql误区
Spark sql is Apache Spark’s module for working with structured data.
不要把Sparksql认为就是处理sql的 或者认为就是写sql
SparkSQL
误区:
1)Spark sql是处理结构化数据
并不是仅仅能够处理sql
sql仅仅是Spark sql这个模块的一小部分应用
API/ExtDS
2)Uniform Data Access 外部数据源(*****)
Spark sql是能够处理多种不同的数据源的数据
text、json、parquet、orc、hive、jdbc 数据的格式
HDFS/S3(a/n)/OSS/COS 数据的存储系统
不同的数据格式压缩的不压缩的 sparksql都是兼容的
你访问不同的数据源Sparksql都是用统一的访问方式 这就是外部数据源
Sparksql能面试的东西 就是两个 :
DataFrame 、 外部数据源、catelist
2.能集成Hive
你的数仓以前是基于Hive来做的 都是Hive的脚本
现在 如果想使用Sparksql访问Hive的数据 Sparksql能连接到metastore才可以
(把Hive-site.xml 拷贝到Sparkconf目录下就可以了)
因为metastore 是 on Hadoop的核心所在
所以你要把Hive迁移到Spark上来 成本是很低的
3.Standard Connectivity
Hive能通过HiveServer2提供一个服务 大家去查,那么 spark里面有个thriftServer
他们底层都是用thrift协议的
误区3:
MR==>Hive==> Hive底层当时是MR 慢 所以出来Spark
Spark==> AMPLab Shark(为了将Hive sql跑在Spark上) 1.x 配套一个打了补丁的Hive
Spark1.0 Shark不维护
==> Spark sql 是在Spark里面的
==> Hive on Spark 是在Hive里面的 是Hive的引擎是Spark
误区3)
Hive on Spark不是Spark sql
Hive刚开始时底层执行引擎只有一个:MR
后期:Tez Spark
set hive.execution.engine=spark; 就可以 Hive on Spark
Sparksql on Hive X
Time taken: 6.86 seconds, Fetched: 2 row(s)
hive (default)> set hive.execution.engine;
hive.execution.engine=mr
hive (default)> set hive.execution.engine=spark;
hive (default)> set hive.execution.engine;
hive.execution.engine=spark
hive (default)> show databases;
OK
database_name
default
homework
Time taken: 0.008 seconds, Fetched: 2 row(s)
hive (default)>
这个东西了解即可 Hive On Spark 真正生产上用的很少的
这个东西不是很成熟的
Datasets and DataFrames
出来的时间:
Spark sql
1.0
SchemaRDD ==> Table RDD(存数据) + schema = Table
==> DataFrame 1.2/3 由SchemaRDD 变为DataFrame 原因是 更加 OO
==> Dataset 1.6 由DataFrame 变为Dataset 因为 compile-time type safety
DataFrame
A Dataset is a distributed collection of data.
A DataFrame is a Dataset organized into named columns.
DataFrame = Dataset[Row]
In Scala and Java, a DataFrame is represented by a Dataset of Rows.
DataFrame :
1.named columns 就是一个表 包含 列的名字 + 列的类型
Row : 可以理解为 一行数据 没有scheme的
SparkSession是Spark编程的入口点
Api:
SparkSession:
/**
* Executes a sql query using Spark, returning the result as a `DataFrame`.
* The dialect that is used for sql parsing can be configured with 'spark.sql.dialect'.
*
* @since 2.0.0
*/
def sql(sqlText: String): DataFrame = {
Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
}
注意:
1. returning the result as a `DataFrame`
Dataset:
/**
* displays the top 20 rows of Dataset in a tabular form.
*
* @param truncate Whether truncate long strings. If true, strings more than 20 characters will
* be truncated and all cells will be aligned right
*
* @group action
* @since 1.6.0
*/
def show(truncate: Boolean): Unit = show(20, truncate)
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|istemporary|
+--------+---------+-----------+
| default| student| false|
+--------+---------+-----------+
scala>
注意:
启动spark-shell的时候 指定MysqL驱动
个人建议使用 --jars 指定MysqL驱动
不建议把MysqL驱动 直接丢在Spark jar路径里
查看Hive里元数据:
MysqL> select * from DBS;
+-------+-----------------------+-------------------------------------------------------+----------+--------------+------------+
| DB_ID | DESC | DB_LOCATION_URI | NAME | OWNER_NAME | OWNER_TYPE |
+-------+-----------------------+-------------------------------------------------------+----------+--------------+------------+
| 1 | Default Hive database | hdfs://hadoop101:8020/user/hive/warehouse | default | public | ROLE |
| 6 | NULL | hdfs://hadoop101:8020/user/hive/warehouse/homework.db | homework | double_happy | USER |
+-------+-----------------------+-------------------------------------------------------+----------+--------------+------------+
2 rows in set (0.00 sec)
MysqL> select * from TBLS;
+--------+-------------+-------+------------------+--------------+-----------+-------+-----------------------------------+----------------+--------------------+--------------------+
| TBL_ID | CREATE_TIME | DB_ID | LAST_ACCESS_TIME | OWNER | RETENTION | SD_ID | TBL_NAME | TBL_TYPE | VIEW_EXPANDED_TEXT | VIEW_ORIGINAL_TEXT |
+--------+-------------+-------+------------------+--------------+-----------+-------+-----------------------------------+----------------+--------------------+--------------------+
| 1 | 1568615059 | 1 | 0 | double_happy | 0 | 1 | student | MANAGED_TABLE | NULL | NULL |
| 8 | 1568616039 | 6 | 0 | double_happy | 0 | 8 | ods_domain_traffic_info | EXTERNAL_TABLE | NULL | NULL |
| 9 | 1568620410 | 6 | 0 | double_happy | 0 | 9 | ods_uid_pid_info | EXTERNAL_TABLE | NULL | NULL |
| 17 | 1568860945 | 6 | 0 | double_happy | 0 | 17 | jf_tmp | MANAGED_TABLE | NULL | NULL |
| 21 | 1569056727 | 6 | 0 | double_happy | 0 | 21 | access_wide | EXTERNAL_TABLE | NULL | NULL |
| 26 | 1569209493 | 6 | 0 | double_happy | 0 | 31 | ods_uid_pid_info_compression_test | EXTERNAL_TABLE | NULL | NULL |
| 27 | 1569209946 | 6 | 0 | double_happy | 0 | 32 | ods_uid_pid_compression_info | MANAGED_TABLE | NULL | NULL |
| 31 | 1569224142 | 6 | 0 | double_happy | 0 | 36 | dwd_platform_stat_info | MANAGED_TABLE | NULL | NULL |
| 53 | 1570957119 | 6 | 0 | double_happy | 0 | 63 | ods_log_info | EXTERNAL_TABLE | NULL | NULL |
+--------+-------------+-------+------------------+--------------+-----------+-------+-----------------------------------+----------------+--------------------+--------------------+
9 rows in set (0.00 sec)
MysqL>
spark-shell查询Hive里的表:
scala> spark.sql("select * from homework.dwd_platform_stat_info").show
+--------+---+--------+--------+
|platform|cnt| d| day|
+--------+---+--------+--------+
| Andriod|658|20190921|20190921|
| Symbain|683|20190921|20190921|
| linux|639|20190921|20190921|
| mac|652|20190921|20190921|
| windows|640|20190921|20190921|
+--------+---+--------+--------+
scala>
使用sparksql 在spark-shell交互 还得写 spark.sql
在spark里 有个 spark-sql 用法和 spark-shell 是一样的
编程
1.SparkSession构建
object SparkSessionApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local")
.appName("SparkSessionApp")
.getorCreate()
spark.stop()
}
}
当然 你spark一些参数如何传进去呢?
提供config传进去
eg : 你要设置多少个分区呀 等
Data Sources
1.读文本数据
object SparkSessionApp {
def text(spark: SparkSession) = {
import spark.implicits._
val df: DataFrame = spark.read.format("text").load("file:///C:/IdeaProjects/spark/data/data.txt")
df.show()
}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local")
.appName("SparkSessionApp")
.getorCreate()
text(spark)
spark.stop()
}
}
结果:
+---------------+
| value|
+---------------+
|double_happy,25|
| Kairis,25|
| Kite,32|
+---------------+
1. 但是有一个问题 读取进来的数据 把所有内容
都放到 value这个列 里面去了
该怎么办?
2. 上面那种写法读进来的是DF
def text(spark: SparkSession) = {
val ds: Dataset[String] = spark.read.textFile("file:///C:/IdeaProjects/spark/data/data.txt")
ds.show()
}
读进来的是DS
结果是一样的:
+---------------+
| value|
+---------------+
|double_happy,25|
| Kairis,25|
| Kite,32|
+---------------+
/**
* Loads text files and returns a [[Dataset]] of String. See the documentation on the
* other overloaded `textFile()` method for more details.
* @since 2.0.0
*/
def textFile(path: String): Dataset[String] = {
// This method ensures that calls that explicit need single argument works, see SPARK-16009
textFile(Seq(path): _*)
}
可以传入多个路径的 textFile(Seq(path): _*)
看s9 待续。。。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。