5月11日,第八届中国数据库技术大会(DTCC 2017)在北京国际会议中心盛大开幕。本届DTCC大会以“数据驱动·价值发现”为主题,作为国内最受关注,规模最大的数据库技术大会,已吸引近5000名IT人士到会交流。到13日,会议已经持续了3天,但现场热度仍然不减,参会嘉宾依旧兴致盎然。笔者曾听一友人说,常沉下心来搞技术的人,也许看似木讷,但心中总会燃着一股不灭的火焰。今日一见,果然如此。
13日下午的“Hadoop生态系统”分会场上,来自奇虎360系统部大数据团队的李振炜先生向大家分享了如何去完成基于SparkSQL的海量数据仓库设计与实践。
▲奇虎360系统部大数据团队李振炜
奇虎360系统部大数据团队是早期把Spark应用到生产环境的团队之一,目前维护集群的总结点数超过8k,单集群最大节点3k+。每天支撑稳定运行的Spark任务,包括SparkSQL, Spark MLLib, Spark Streaming 超过10w。该团队扩展了SparkSQL大量的语法,并且优化执行过程,提高查询效率。并且对现有Spark MLLib算法性能做了优化,并根据业务需求实现大量新算法。同样完成了Spark对TensorFlow,Mxnet,Caffe等深度学习框架的集成。
SparkSQL近年来以更强的性能优势正逐渐取代Hive在数据仓库领域强势地位,360公司内部完成了hive作业向Spark迁移,修复了大量的Bug,优化了内核,显著提升执行效率,每天可支撑5万SQL作业的稳定运行。
Spark多数据源整合
在演讲中,李振炜先生向大家展示了多数据源整合架构,并介绍了其优势:1、在一个SQL中同时对不同的数据源中的数据分析;2、同一个表不同的SQL自动选择合适的数据源;3、分析结构写入合适的数据源。
SparkSQL海量数据即席查询的实现
张振炜指出,目前想要实现SparkSQL海量数据即席查询,面临着一些痛点。如数据量越来越大,但是即席分析的需求越来越强烈;数据既有全表扫描的需求(机器学习),又有随机读的需求(ad-hoc);为了充分利用现有开源平台提高查询性能,数据常常需要存储多份,增加存储成本,同时也增加了平台的运维成本;同一份数据需要在不同的数据源保持同步;随着数据量的增大,一些平台的扩展遇到了瓶颈,影响了查询性能。
此即席查询的实现,目标是为数据建立一套外部索引。但前提是不能引入新的存储格式,而且必须兼容现有的业务逻辑。清洗后的数据以Parquet格式写入HDFS,用Spark对每个Parquet 文件批量建Lucene索引。SQL查询转化为Spark任务,每个Executor由所处理Parquet 文件,得到对应索引文件所在DataNode的地址,向其IndexServer发起查询请求并得到命中的索引。根据索引读取对应的Page,返回查询结果。
如何进行入库优化。Parquet中每一个RowGroup中的数据按列存储,每列以Page为压缩和存储单位。为了保证同一行中的数据在每列中Page的序号一致,我们把按字节大小划分Page改为了按行数划分,同时把每列Page ID以及对应Page在整个Parquet文件的offset记录在Parquet的元信息中。
每个RowGroup中都记录每列值最大值和最小值,可以作为一个粗粒度的索引。在此基础上,我们对数值列做了进一步改进:计算出均值和方差,对应列数值做如下处理:
把均值和方差以及由y0y1…yN-1yN组成的Byte数组写入到RowGroup元信息中。为整个Parquet文件也建立改进后的索引。
建立索引。对于一个表是否建索引以及哪些列建索引,可以在建表时指定,并写到元数据库中。每行作为Lucene的一个Doc, 同时需要索引的列作为Filed。每个Parquet文件建立一个Lucene文件。为减少索引文件的大小,对所有Field只索引而不存储,同时删减掉一些不需要的Lucene功能,大大减少Lucene的体积。
SparkSQL转化。把Parquet文件整体作为输入,略过计算SplitSize 的过程。Spark Driver端缓存必要的元信息,避免频繁请求元数据库和NameNode。合并Spark Task,缩短调度的时间。可以支持复杂的Filter,并且支持字符串的匹配查找。
SparkSQL执行查询。索引查询策略:固定数据,移动查询。在所有的DataNode部署IndexServer,Executor根据索引文件副本所在的节点,向IndexServer发起查询,使得计算和存储完全本地化,避免网络传输,加快查询。索引文件是多副本,Executor可以同时向多个IndexServer 发起查询,最先返回的作为结果,避免查询长尾的出现,保证查询的稳定性。索引文件的管理全部依赖于Hdfs,具有极强扩展能力,同时减少运维压力。IndexServer 是无中心,无状态独立服务,依托于多副本,自动实现容错。
▲每个Task根据索引查询结果自动选择是执行Scan还是Seek.
▲SparkSQL读取数据
性能分析。利用文件级别,RowGroup级别改进后的索引,以极小代价判断是否可以跳过当前的文件或RowGroup。Executor查询索引结果为未命中的代价50-ms;查询命中并获取到数据的平均代价为200ms。对于万亿规模的数据,生成20w个Parquet文件,每个文件约500万条记录,包括一个RowGroup,每个Page包含2000行记录,一共有2500个Page;Spark启动125个Executor,每个Executor 是4个cores。如果命中5%的数据,Skip相应文件的代价为(1-5%)*20w*50ms/125/4=19s,Seek相应文件的代价为5%*20w*200ms/125/4=4s。整个查询可以在30s内完成。
根据张振炜先生的介绍,我们也可以看出此方案拥有很多明显的优势。将数据和索引分离,可完全兼容现有的业务,同时数据和索引存储于Hdfs,可以支持海量的数据。IndexServer无中心,无状态,容错及负载均衡天然与Hdfs绑定,运维成本低。查询找数据,避免了数据的网络传输。顺序读和随机读可以根据具体的执行逻辑,自动切换,并且代价很低。对于不再使用的索引,可以单独删除,节约空间。
关于奇虎360:360公司作为中国领先的互联网络安全企业,汇聚了国内规模领先的高水平安全技术团队,积累了接近万件原创技术和核心技术的专利,并在此基础上开发出拥有数亿用户的360安全卫士、360手机卫士等安全产品,同时为上百万家国家机关和企事业单位提供包括安全咨询、安全运维、安全培训等全方位安全服务。
除此之外,360公司利用大数据、云计算及人工智能技术,通过技术创新、产品创新,打造了包括360手机、360儿童智能手表、360智能摄像机、360行车记录仪及360安全路由器等一系列智能硬件产品,致力于通过智能手机、智能穿戴、智能家居及车联网等智能产品为用户解决信息安全、出行安全、家居安全等网络安全问题。
同时,依托于大数据、云计算平台,360还为广大用户提供包括搜索、直播、游戏、影视、金融、新闻等在内的其他互联网服务。