Spark SQL|Spark,从入门到精通
Spark SQL 在 Hive 兼容层面仅依赖 HQL parser、Hive Metastore 和 Hive SerDe。也就是说,从 HQL 被解析成抽象语法树(AST)起,就全部由 Spark SQL 接管了。
发家史
熟悉 Spark SQL 的都知道,Spark SQL 是从 Shark 发展而来。Shark 为了实现 Hive 兼容,在 HQL 方面重用了 Hive 中 HQL 的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从 MR 作业替换成了 Spark 作业(辅以内存列式存储等各种和 Hive 关系不大的优化);同时还依赖 Hive Metastore 和 Hive SerDe(用于兼容现有的各种 Hive 存储格式)。
Spark SQL 在 Hive 兼容层面仅依赖 HQL parser、Hive Metastore 和 Hive SerDe。也就是说,从 HQL 被解析成抽象语法树(AST)起,就全部由 Spark SQL 接管了。执行计划生成和优化都由 Catalyst 负责。借助 Scala 的模式匹配等函数式语言特性,利用 Catalyst 开发执行计划优化策略比 Hive 要简洁得多。成都加米谷大数据培训机构,大数据开发,数据分析与挖掘,2019春节前预报名学费特惠,详情见加米谷大数据官网。
Spark SQL
Spark SQL 提供了多种接口:
纯 Sql 文本;
dataset/dataframe api。
当然,相应的,也会有各种客户端:
sql 文本,可以用 thriftserver/spark-sql;
编码,Dataframe/dataset/sql。
Dataframe/Dataset API 简介
Dataframe/Dataset 也是分布式数据集,但与 RDD 不同的是其带有 schema 信息,类似一张表。
可以用下面一张图详细对比 Dataset/dataframe 和 RDD 的区别:
Dataset 是在 spark1.6 引入的,目的是提供像 RDD 一样的强类型、使用强大的 lambda 函数,同时使用 Spark SQL 的优化执行引擎。到 spark2.0 以后,DataFrame 变成类型为 Row 的 Dataset,即为:
type DataFrame = Dataset[Row]
所以,很多移植 spark1.6 及之前的代码到 spark2+的都会报错误,找不到 dataframe 类。
基本操作
val df = spark.read.json(“file:///opt/meitu/bigdata/src/main/data/people.json”)
df.show()
import spark.implicits._
df.printSchema()
df.select("name").show()
df.select($"name", $"age" + 1).show()
df.filter($"age" > 21).show()
df.groupBy("age").count().show()
spark.stop()
分区分桶 排序
分桶排序保存hive表
df.write.bucketBy(42,“name”).sortBy(“age”).saveAsTable(“people_bucketed”)
分区以parquet输出到指定目录
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
分区分桶保存到hive表
df.write .partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("users_partitioned_bucketed")
cube rullup pivot
cube
sales.cube("city", "year”).agg(sum("amount")as "amount”) .show()
rull up
sales.rollup("city", "year”).agg(sum("amount")as "amount”).show()
pivot 只能跟在groupby之后
sales.groupBy("year").pivot("city",Seq("Warsaw","Boston","Toronto")).agg(sum("amount")as "amount”).show()
SQL 编程
Spark SQL 允许用户提交 SQL 文本,支持以下三种手段编写 SQL 文本:
1. spark 代码
2. spark-sql的shell
3. thriftserver
支持 Spark SQL 自身的语法,同时也兼容 HSQL。
1. 编码
要先声明构建 SQLContext 或者 SparkSession,这个是 SparkSQL 的编码入口。早起的版本使用的是 SQLContext 或者 HiveContext,spark2 以后,建议使用的是 SparkSession。
SQLContext
new SQLContext(SparkContext)
HiveContext
new HiveContext(spark.sparkContext)
SparkSession
不使用 hive 元数据:
val spark = SparkSession.builder()
.config(sparkConf) .getOrCreate()
使用 hive 元数据:
val spark = SparkSession.builder()
.config(sparkConf) .enableHiveSupport().getOrCreate()
使用
val df =spark.read.json("examples/src/main/resources/people.json")
df.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people").show()
2. spark-sql 脚本
spark-sql 启动的时候类似于 spark-submit 可以设置部署模式资源等,可以使用
bin/spark-sql –help 查看配置参数。
需要将 hive-site.xml 放到 ${SPARK_HOME}/conf/ 目录下,然后就可以测试
show tables;
select count(*) from student;
3. thriftserver
thriftserver jdbc/odbc 的实现类似于 hive1.2.1 的 hiveserver2,可以使用 spark 的 beeline 命令来测试 jdbc server。
安装部署
/1 开启 hive 的 metastore
bin/hive --service metastore
/2 将配置文件复制到spark/conf/目录下
/3 thriftserver
sbin/start-thriftserver.sh --masteryarn --deploy-mode client
对于 yarn 只支持 client 模式。
/4 启动 bin/beeline
/5 连接到 thriftserver
!connect jdbc:hive2://localhost:10001
用户自定义函数
1. UDF
定义一个 udf 很简单,例如我们自定义一个求字符串长度的 udf:
val len = udf{(str:String) => str.length}
spark.udf.register("len",len)
val ds =spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json")
ds.createOrReplaceTempView("employees")
ds.show()
spark.sql("select len(name) from employees").show()
2. UserDefinedAggregateFunction
定义一个 UDAF
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
object MyAverageUDAF extends UserDefinedAggregateFunction {
//Data types of input arguments of this aggregate function
definputSchema:StructType = StructType(StructField("inputColumn", LongType) :: Nil)
//Data types of values in the aggregation buffer
defbufferSchema:StructType = {
StructType(StructField("sum", LongType):: StructField("count", LongType) :: Nil)
}
//The data type of the returned value
defdataType:DataType = DoubleType
//Whether this function always returns the same output on the identical input
defdeterministic: Boolean = true
//Initializes the given aggregation buffer. The buffer itself is a `Row` that inaddition to
// standard methods like retrieving avalue at an index (e.g., get(), getBoolean()), provides
// the opportunity to update itsvalues. Note that arrays and maps inside the buffer are still
// immutable.
definitialize(buffer:MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
//Updates the given aggregation buffer `buffer` with new input data from `input`
defupdate(buffer:MutableAggregationBuffer, input: Row): Unit ={
if(!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0)+ input.getLong(0)
buffer(1) = buffer.getLong(1)+ 1
}
}
// Mergestwo aggregation buffers and stores the updated buffer values back to `buffer1`
defmerge(buffer1:MutableAggregationBuffer, buffer2: Row): Unit ={
buffer1(0) = buffer1.getLong(0)+ buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1)+ buffer2.getLong(1)
}
//Calculates the final result
defevaluate(buffer:Row): Double =buffer.getLong(0).toDouble /buffer.getLong(1)
}
使用 UDAF
val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json")
ds.createOrReplaceTempView("employees")
ds.show()
spark.udf.register("myAverage", MyAverageUDAF)
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
3. Aggregator
定义一个 Aggregator
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)
object MyAverageAggregator extends Aggregator[Employee, Average, Double] {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: Average = Average(0L, 0L)
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// Merge two intermediate values
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// Transform the output of the reduction
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[Average] = Encoders.product
// Specifies the Encoder for the final output value type
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
使用
spark.udf.register("myAverage2", MyAverageAggregator)
import spark.implicits._
val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json").as[Employee]
ds.show()
val averageSalary = MyAverageAggregator.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
总体执行流程如下:从提供的输入 API(SQL,Dataset, dataframe)开始,依次经过 unresolved 逻辑计划,解析的逻辑计划,优化的逻辑计划,物理计划,然后根据 cost based 优化,选取一条物理计划进行执行。
简单化成四个部分:
/1 analysis
Spark 2.0 以后语法树生成使用的是 antlr4,之前是 scalaparse。
/2 logical optimization
常量合并,谓词下推,列裁剪,boolean 表达式简化,和其它的规则。
/3 physical planning
eg:SortExec 。
/4 Codegen
codegen 技术是用 scala 的字符串插值特性生成源码,然后使用 Janino 编译成 java字节码,Eg: SortExec。
2. 自定义优化器
/1 实现
继承 Rule[LogicalPlan]
object MultiplyOptimizationRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case Multiply(left,right) if