[ 更换 ]
热门城市
北京上海广州深圳成都杭州南京武汉天津西安重庆青岛沈阳长沙大连厦门无锡福州济南宁波昆明苏州郑州长春合肥南昌哈尔滨常州烟台南宁温州石家庄太原珠海南通扬州贵阳东莞徐州大庆佛山威海洛阳淮安呼和浩特镇江潍坊桂林中山临沂咸阳包头嘉兴惠州泉州三亚赣州九江金华泰安榆林许昌新乡舟山慈溪南阳聊城海口东营淄博漳州保定沧州丹东宜兴绍兴唐山湖州揭阳江阴营口衡阳郴州鄂尔多斯泰州义乌汕头宜昌大同鞍山湘潭盐城马鞍山襄樊长治日照常熟安庆吉林乌鲁木齐兰州秦皇岛肇庆西宁介休滨州台州廊坊邢台株洲德阳绵阳双流平顶山龙岩银川芜湖晋江连云港张家港锦州岳阳长沙县济宁邯郸江门齐齐哈尔昆山柳州绍兴县运城齐河衢州太仓张家口湛江眉山常德盘锦枣庄资阳宜宾赤峰余姚清远蚌埠宁德德州宝鸡牡丹江阜阳莆田诸暨黄石吉安延安拉萨海宁通辽黄山长乐安阳增城桐乡上虞辽阳遵义韶关泸州南平滁州温岭南充景德镇抚顺乌海荆门阳江曲靖邵阳宿迁荆州焦作丹阳丽水延吉茂名梅州渭南葫芦岛娄底滕州上饶富阳内江三明淮南孝感溧阳乐山临汾攀枝花阳泉长葛汉中四平六盘水安顺新余晋城自贡三门峡本溪防城港铁岭随州广安广元天水遂宁萍乡西双版纳绥化鹤壁湘西松原阜新酒泉张家界黔西南保山昭通河池来宾玉溪梧州鹰潭钦州云浮佳木斯克拉玛依呼伦贝尔贺州通化朝阳百色毕节贵港丽江安康德宏朔州伊犁文山楚雄嘉峪关凉山雅安西藏四川广东河北山西辽宁黑龙江江苏浙江安徽福建江西山东河南湖北湖南海南贵州云南陕西甘肃青海台湾内蒙古广西宁夏香港澳门
培训资讯网 - 为兴趣爱好者提供专业的职业培训资讯知识

Spark SQL|Spark,从入门到精通

Spark SQL 在 Hive 兼容层面仅依赖 HQL parser、Hive Metastore 和 Hive SerDe。也就是说,从 HQL 被解析成抽象语法树(AST)起,就全部由 Spark SQL 接管了。

发家史

熟悉 Spark SQL 的都知道,Spark SQL 是从 Shark 发展而来。Shark 为了实现 Hive 兼容,在 HQL 方面重用了 Hive 中 HQL 的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从 MR 作业替换成了 Spark 作业(辅以内存列式存储等各种和 Hive 关系不大的优化);同时还依赖 Hive Metastore 和 Hive SerDe(用于兼容现有的各种 Hive 存储格式)。

Spark SQL 在 Hive 兼容层面仅依赖 HQL parser、Hive Metastore 和 Hive SerDe。也就是说,从 HQL 被解析成抽象语法树(AST)起,就全部由 Spark SQL 接管了。执行计划生成和优化都由 Catalyst 负责。借助 Scala 的模式匹配等函数式语言特性,利用 Catalyst 开发执行计划优化策略比 Hive 要简洁得多。成都加米谷大数据培训机构,大数据开发数据分析与挖掘,2019春节前报名学费特惠,详情见加米谷大数据官网。

Spark SQL|Spark,从入门到精通

Spark SQL

Spark SQL 提供了多种接口:

纯 Sql 文本;

dataset/dataframe api。

当然,相应的,也会有各种客户端:

sql 文本,可以用 thriftserver/spark-sql;

编码,Dataframe/dataset/sql。

Dataframe/Dataset API 简介

Dataframe/Dataset 也是分布式数据集,但与 RDD 不同的是其带有 schema 信息,类似一张表。

可以用下面一张图详细对比 Dataset/dataframe 和 RDD 的区别:

Spark SQL|Spark,从入门到精通

Dataset 是在 spark1.6 引入的,目的是提供像 RDD 一样的强类型、使用强大的 lambda 函数,同时使用 Spark SQL 的优化执行引擎。到 spark2.0 以后,DataFrame 变成类型为 Row 的 Dataset,即为:

type DataFrame = Dataset[Row]

Spark SQL|Spark,从入门到精通

所以,很多移植 spark1.6 及之前的代码到 spark2+的都会报错误,找不到 dataframe 类。

基本操作

val df = spark.read.json(“file:///opt/meitu/bigdata/src/main/data/people.json”)

df.show()

import spark.implicits._

df.printSchema()

df.select("name").show()

df.select($"name", $"age" + 1).show()

df.filter($"age" > 21).show()

df.groupBy("age").count().show()

spark.stop()

分区分桶 排序

分桶排序保存hive表

df.write.bucketBy(42,“name”).sortBy(“age”).saveAsTable(“people_bucketed”)

分区以parquet输出到指定目录

df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

分区分桶保存到hive表

df.write .partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("users_partitioned_bucketed")

cube rullup pivot

cube

sales.cube("city", "year”).agg(sum("amount")as "amount”) .show()

rull up

sales.rollup("city", "year”).agg(sum("amount")as "amount”).show()

pivot 只能跟在groupby之后

sales.groupBy("year").pivot("city",Seq("Warsaw","Boston","Toronto")).agg(sum("amount")as "amount”).show()

SQL 编程

Spark SQL 允许用户提交 SQL 文本,支持以下三种手段编写 SQL 文本:

1. spark 代码

2. spark-sql的shell

3. thriftserver

支持 Spark SQL 自身的语法,同时也兼容 HSQL。

1. 编码

要先声明构建 SQLContext 或者 SparkSession,这个是 SparkSQL 的编码入口。早起的版本使用的是 SQLContext 或者 HiveContext,spark2 以后,建议使用的是 SparkSession。

SQLContext

new SQLContext(SparkContext)

HiveContext

new HiveContext(spark.sparkContext)

SparkSession

不使用 hive 元数据:

val spark = SparkSession.builder()

.config(sparkConf) .getOrCreate()

使用 hive 元数据:

val spark = SparkSession.builder()

.config(sparkConf) .enableHiveSupport().getOrCreate()

使用

val df =spark.read.json("examples/src/main/resources/people.json")

df.createOrReplaceTempView("people")

spark.sql("SELECT * FROM people").show()

2. spark-sql 脚本

spark-sql 启动的时候类似于 spark-submit 可以设置部署模式资源等,可以使用

bin/spark-sql –help 查看配置参数。

需要将 hive-site.xml 放到 ${SPARK_HOME}/conf/ 目录下,然后就可以测试

show tables;

select count(*) from student;

3. thriftserver

thriftserver jdbc/odbc 的实现类似于 hive1.2.1 的 hiveserver2,可以使用 spark 的 beeline 命令来测试 jdbc server。

安装部署

/1 开启 hive 的 metastore

bin/hive --service metastore

/2 将配置文件复制到spark/conf/目录下

/3 thriftserver

sbin/start-thriftserver.sh --masteryarn --deploy-mode client

对于 yarn 只支持 client 模式。

/4 启动 bin/beeline

/5 连接到 thriftserver

!connect jdbc:hive2://localhost:10001

Spark SQL|Spark,从入门到精通

用户自定义函数

1. UDF

定义一个 udf 很简单,例如我们自定义一个求字符串长度的 udf:

val len = udf{(str:String) => str.length}

spark.udf.register("len",len)

val ds =spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json")

ds.createOrReplaceTempView("employees")

ds.show()

spark.sql("select len(name) from employees").show()

2. UserDefinedAggregateFunction

定义一个 UDAF

import org.apache.spark.sql.{Row, SparkSession}

import org.apache.spark.sql.expressions.MutableAggregationBuffer

import org.apache.spark.sql.expressions.UserDefinedAggregateFunction

import org.apache.spark.sql.types._

object MyAverageUDAF extends UserDefinedAggregateFunction {

//Data types of input arguments of this aggregate function

definputSchema:StructType = StructType(StructField("inputColumn", LongType) :: Nil)

//Data types of values in the aggregation buffer

defbufferSchema:StructType = {

StructType(StructField("sum", LongType):: StructField("count", LongType) :: Nil)

}

//The data type of the returned value

defdataType:DataType = DoubleType

//Whether this function always returns the same output on the identical input

defdeterministic: Boolean = true

//Initializes the given aggregation buffer. The buffer itself is a `Row` that inaddition to

// standard methods like retrieving avalue at an index (e.g., get(), getBoolean()), provides

// the opportunity to update itsvalues. Note that arrays and maps inside the buffer are still

// immutable.

definitialize(buffer:MutableAggregationBuffer): Unit = {

buffer(0) = 0L

buffer(1) = 0L

}

//Updates the given aggregation buffer `buffer` with new input data from `input`

defupdate(buffer:MutableAggregationBuffer, input: Row): Unit ={

if(!input.isNullAt(0)) {

buffer(0) = buffer.getLong(0)+ input.getLong(0)

buffer(1) = buffer.getLong(1)+ 1

}

}

// Mergestwo aggregation buffers and stores the updated buffer values back to `buffer1`

defmerge(buffer1:MutableAggregationBuffer, buffer2: Row): Unit ={

buffer1(0) = buffer1.getLong(0)+ buffer2.getLong(0)

buffer1(1) = buffer1.getLong(1)+ buffer2.getLong(1)

}

//Calculates the final result

defevaluate(buffer:Row): Double =buffer.getLong(0).toDouble /buffer.getLong(1)

}

使用 UDAF

val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json")

ds.createOrReplaceTempView("employees")

ds.show()

spark.udf.register("myAverage", MyAverageUDAF)

val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")

result.show()

3. Aggregator

定义一个 Aggregator

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}

import org.apache.spark.sql.expressions.Aggregator

case class Employee(name: String, salary: Long)

case class Average(var sum: Long, var count: Long)

object MyAverageAggregator extends Aggregator[Employee, Average, Double] {

// A zero value for this aggregation. Should satisfy the property that any b + zero = b

def zero: Average = Average(0L, 0L)

// Combine two values to produce a new value. For performance, the function may modify `buffer`

// and return it instead of constructing a new object

def reduce(buffer: Average, employee: Employee): Average = {

buffer.sum += employee.salary

buffer.count += 1

buffer

}

// Merge two intermediate values

def merge(b1: Average, b2: Average): Average = {

b1.sum += b2.sum

b1.count += b2.count

b1

}

// Transform the output of the reduction

def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count

// Specifies the Encoder for the intermediate value type

def bufferEncoder: Encoder[Average] = Encoders.product

// Specifies the Encoder for the final output value type

def outputEncoder: Encoder[Double] = Encoders.scalaDouble

}

使用

spark.udf.register("myAverage2", MyAverageAggregator)

import spark.implicits._

val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json").as[Employee]

ds.show()

val averageSalary = MyAverageAggregator.toColumn.name("average_salary")

val result = ds.select(averageSalary)

result.show()

Spark SQL|Spark,从入门到精通

总体执行流程如下:从提供的输入 API(SQL,Dataset, dataframe)开始,依次经过 unresolved 逻辑计划,解析的逻辑计划,优化的逻辑计划,物理计划,然后根据 cost based 优化,选取一条物理计划进行执行。

简单化成四个部分:

/1 analysis

Spark 2.0 以后语法树生成使用的是 antlr4,之前是 scalaparse。

/2 logical optimization

常量合并,谓词下推,列裁剪,boolean 表达式简化,和其它的规则。

/3 physical planning

eg:SortExec 。

/4 Codegen

codegen 技术是用 scala 的字符串插值特性生成源码,然后使用 Janino 编译成 java字节码,Eg: SortExec。

2. 自定义优化器

/1 实现

继承 Rule[LogicalPlan]

object MultiplyOptimizationRule extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {

case Multiply(left,right) if

相关内容

怎么学?如何干?西藏7市(地)委书记在西藏日报发表文章

启航新征程 开创新局面为全面建设社会主义现代化新拉萨努力奋斗自治区党委常委、拉萨市委书记 普布顿珠党的二十大大笔擘画坚持以中国式现代化全面推进中华民族伟大复兴的宏伟蓝图,区党委十届三次全会全面铺开建设美丽幸福西藏、共圆伟大复兴梦想的壮阔实践···

2023年全区智慧旅游专业人才培训班圆满结业

6月2日,自治区旅游发展厅主办的全区智慧旅游专业人才培训班在拉萨圆满结业。培训共历时三天,累计完成全区旅游行政管理部门工作人员及涉旅企业专业技术人员培训100人。本次培训是区旅发厅深入推进学习贯彻习近平新时代中国特色社会主义思想主题教育重要···

徐汇萨迦共同举办大美西藏首届口腔学习班

随着现代医学的发展,口腔健康越来越受到人们的关注,为提高西藏地区口腔技术水平和服务质量,近日,徐汇区牙病防治所和萨迦县中心医院远程连线共同举办首届口腔学习班开班仪式。徐汇区卫生健康委副主任胡强,上海援藏干部、萨迦县委常务副书记、常务副县长沈···

自觉抵制“一对一”“一对多”等学科类培训!哈尔滨中小学生暑期预警来了

17日,记者从哈尔滨市各区教育局获悉,2023年暑假将至,南岗区、道里区、香坊区教育局向家长发出预警,自觉抵制违规培训,各区义务教育阶段学科类培训机构已经全部注销,暑假期间以任何形式开展的学科类培训均属于违规培训。家长要自觉抵制任何机构或个···

哈尔滨市道里区企投局举办“招商大讲堂”专题培训

黑龙江网讯(记者 王惠婷)10月24日,哈尔滨市道里区企投局组织开展“招商大讲堂”专题培训班,全区各招商专班负责同志及业务骨干参加培训。本次培训从实际需求出发,紧紧围绕当前招商工作中的热点、难点,对“什么是招商引资”“道里区的产业结构和主导···

团黑龙江省委举办全省青年文明号青年岗位能手学习宣传贯彻党的二十大精神培训交流会

中国青年报客户端讯(李海涛)为深入学习贯彻党的二十大精神,充分发挥青年文明号、青年岗位能手示范引领作用,在全省职业青年中掀起学习党的二十大精神热潮,11月22日,团黑龙江省委举办全省青年文明号青年岗位能手学习宣传贯彻党的二十大精神培训交流会···

辽宁葫芦岛举办外贸政策培训会推动外贸保稳增量

辽宁省葫芦岛市外贸政策培训会4月3日举办。 辽宁省贸促会供图中新网葫芦岛4月3日电 (李晛)辽宁省葫芦岛市外贸政策培训会4月3日举办。本次活动由辽宁省贸促会支持、葫芦岛市商务局主办,葫芦岛市贸促会、葫芦岛海关、中国出口信用保险辽宁分公司和辽···

山西运城:严查无证校外培训机构 查封9家警告2家

新华社太原8月5日电(记者王飞航)记者从山西省运城市政府了解到,运城市教育局近日联合市公安局等多家单位,对中心城区无证校外培训机构进行了一次突击检查,共检查了13家校外培训机构,查封9家,警告2家,发放整改通知书4份。今年7月,运城市教育局···

山西开展培训筑牢森林“防火墙”

山西新闻网3月30日讯(记者 卢奕如)今日,记者从山西省应急管理厅获悉,全省举办森林草原防灭火业务培训,邀请专家以视频会议形式,围绕森林扑火指挥实操、森林草原火灾防控经验做法、火灾现场各级各类指挥员具体操作中遇到的问题等内容进行授课。培训内···

校外培训机构治理工作取得进展 山西停办近1300所

资料图:小学生排队等待进入校园。中新社记者 刘文华 摄中新网5月11日电 据教育部网站消息,按照校外培训机构专项治理工作整体安排,教育部、民政部、国家市场监管总局启动了校外培训机构治理专项督查工作。5月9日至10日,督查组率先在北京市开展华···

山西综改区举办省技术创新中心申报培训

  8月18日,山西综改区科技金融部举办2023年度省技术创新中心申报培训会,来自区内企业、科研院所及有关单位代表160余人参加了培训。  山西省技术创新中心是以产业前沿引领技术和关键共性技术研发为核心的产业技术创新平台,承担着为区域和产业···

山西汾阳医院开展健康教育与控烟知识培训

来源:【吕梁日报-吕梁新闻网】本报讯 (记者 刘少伟) 5月18日,在“世界无烟日”到来之际,山西汾阳医院组织开展健康教育与控烟知识培训。近年来,山西汾阳医院全面落实健康中国战略,根据国家卫健委《关于2011年起全国医疗卫生系统全面禁烟的决···

山西省文物局年度田野考古技术培训班开班

10月10日,山西省文物局在运城闻喜上郭城址、邱家庄墓群举办2023年度田野考古技术培训班开班仪式。该次培训为期三个月,通过理论和实践两部分教学,旨在推进山西考古工作高质量发展,提升考古业务人员专业技术水平。本次培训由山西省考古研究院和山西···

最低每课时9元!全省学科类校外培训课时长和收费标准出台

近日,山西省发改委、省教育厅下发《关于中小学学科类校外培训收费标准及有关事项的通知》,明确全省中小学学科类校外培训收费标准,从12月17日起执行。《通知》对全省线上线下学科类校外培训基准收费标准和浮动幅度制定了科学标准。其中,义务教育阶段线···

山西天镇 阳光职业培训学校培养乡村“新农人”乡村振兴添动能

(记者 贺文生) 山西天镇县阳光职业培训学校紧紧围绕乡村振兴战略,按照“政府引导、农民自愿、立足产业、突出重点”的原则,创新高素质农民技能培训方式方法,采取以“授人以渔”的方式,让人才振兴成为助推农业农村现代化的内生动力,以高素质农民引领现···

山西:建立全过程 全链条 无缝隙安全培训制度

黄河新闻网讯(记者杨江涛)日前,山西省应急管理厅下发了《山西省安全培训管理暂行办法》(以下简称《办法》)。我省将进一步抓好安全生产这个基本盘、基本面,推动全省安全培训工作制度化、规范化、科学化,促进安全培训工作高质量发展。山西省应急管理厅厅···

山西:艺考培训机构纳入全国监管平台管理

央广网太原10月6日消息(记者郎麒) 日前,山西省教育厅、省发改委、省公安厅等部门联合制定《加强面向高中阶段学生艺考培训规范管理工作方案》,针对艺考培训的突出特点和实际情况,全面规范艺考培训行为,将艺考培训机构统一纳入全国校外教育培训监管与···

太平财险阳泉中支开展消防安全教育和有限空间作业培训

为强化员工安全意识,进一步提升员工消防和有限空间突发事件应急处理能力,9月14日,太平财险阳泉中支邀请北京市卫民安消防教育咨询中心山西分中心讲师向全体员工开展了一次消防安全教育和有限空间作业课程培训。按照防消结合、预防为主的原则,本次讲座通···

山西省数字化转型贯标试点工作宣贯培训会在太原举行

  10月20日消息,山西省数字化转型贯标试点工作宣贯培训会在太原举行。省工信厅介绍,作为国家数字化转型贯标试点省份,试点启动后将引导企业加快数字化转型,助力制造业高端化、智能化、绿色化发展。  今年,工信部启动数字化转型贯标试点工作,我省···

山西马兰花创业培训讲师大赛收官 太原市获多个奖项

山西新闻网8月31日讯(记者 冯耿姝)8月29日,山西省第四届马兰花创业培训讲师大赛圆满收官,太原市代表队在比赛中分获多个二、三等奖和优秀奖。本届大赛以“启迪创新思维·激发创业梦想”为主题,全省共有56名教师晋级复赛,其中,太原市有7名选手···