Kylin--麒麟分析 --Spark+Parquet

Kylin—麒麟开源数据分析,4.0现在是Spark+Parquet

架构:

Parquet文件:(列存储)

       文件构成:

              Header:   文件头信息

              Datablock:   文件信息   其中一个Data包含多个Row Group

Block (hdfs Block): 表示hdfs中的块,描述该文件格式的含义不变。该文件格式被设计成在hdfs上很好地工作。
File: hdfs文件,必须包含文件的元数据。它不需要实际包含数据。

行组(Row Group): 数据的逻辑水平分区。行组没有保证的物理结构。行组由数据集中每一列的列块组成。一个行组中包含的具体行数是不确定的

列块(Colunm Chunk): 特定列的数据块。它们位于特定的行组中,并保证在文件中是连续的。

页(Pages): 列块被划分为页。从概念上讲,页面是一个不可分割的单元,压缩和编码都是针对Page进行的。列块中可以有多个分页类型

Footer:   footer中的元数据包括version、schema、任何额外的键-值对,以及列元数据。列元数据包括类型、路径、编码、值的数量、压缩大小等。Footer中除了文件元数据之外,它还有一个4字节字段长度的footer length,以及一个4字节的魔术字(PAR1)

      

                                                                                                                   

使用适当的Spark资源和配置来构建Cube

       Kylin构建参数全部以kylin.engine.spark-conf开头,以下表格中的参数省略开头。

参数

说明

spark.executor.instances

Spark应用程序的Executor数量。

spark.executor.cores

每个Executor使用的核心数,Executor数量乘以Executor使用的核心数就是Spark程序运行的最大并行度。

spark.executor.memory

每个Executor使用的内存。

spark.executor.memoryOverhead

每个Executor使用的堆外内存。

spark.sql.files.maxPartitionBytes

读取文件时要打包到单个分区中的最大字节数,默认值为128M。如果源表(Hive source)中有许多小文件,spark会自动将许多小文件打包到单个分区中,以避免执行太多的小任务。

spark.sql.shuffle.partitions

配置为联接Join或聚合Shuffle时要使用的分区数,默认值为200。较大的值需要更多的CPU资源,而较小的值需要更多的内存资源。

1)Kylin根据Cube情况自动设置Spark参数

Kylin 4将使用以下分配规则来自动设置Spark资源和配置,所有Spark资源和配置都是根据源文件中最大文件的大小以及Cube是否具有准确的去重计数度量来设置的,这就是为什么我们需要在第一步中检测要构建多少个源文件的原因。

(1)Executor内存规则

如果${最大文件大小}>=100G and ${存在准确去重度量值}, 设置 'spark.executor.memory'为20G;

如果${最大文件大小}>=100G or (如果${最大文件大小}>=10G and ${存在准确去重度量值}), 设置 'spark.executor.memory' 为16G;

如果${最大文件大小}>=10G or (如果${最大文件大小}>=1G and ${存在准确去重度量值}), 设置 'spark.executor.memory' 为10G;

如果${最大文件大小}>=1G or ${存在准确去重度量值}, 设置 'spark.executor.memory' 为 4G;

否则设置'spark.executor.memory' 为1G。

(2)Executor核心数规则

如果${最大文件大小}>=1G or ${存在准确去重度量值},设置 'spark.executor.cores' 为5;

否则 设置 'spark.executor.cores' to 1。

(3)Executor堆外内存规则

如果${最大文件大小}>=100G and ${存在准确去重度量值},设置'spark.executor.memoryOverhead' 为6G, 所以这种情况下,每个Executor的内存为 20G + 6G = 26G;

如果${最大文件大小}>=100G or (如果${最大文件大小}>=10G and ${存在准确去重度量值}), 设置'spark.executor.memoryOverhead'为4G;

如果${最大文件大小}>=10G or (如果${最大文件大小}>=1G and ${存在准确去重度量值}), 设置'spark.executor.memoryOverhead' 为2G;

如果${最大文件大小}>=1G or ${存在准确去重度量值},设置 'spark.executor.memoryOverhead' 为1G;

否则设置 'spark.executor.memoryOverhead' 为512M。

(4)Executor实例数量规则

①读取参数'kylin.engine.base-executor-instance'的值作为基本Executor数量,默认值为5

②根据Cuboid个数来计算所需的Executor个数,配置文件中读取参数'kylin.engine.executor-instance-strategy'的值,默认为'100,2,500,3,1000,4',即Cuboid个数为0-100时,因数为1;100-500时,因数为2;500-1000时,因数为3;1000以上时,因数为4。然后用这个因数乘以第一步的基本Executor数量就是Executor的预估总数量。

③从Yarn资源池中的得到可用的总核心数和总内存数,然后用总核心数和总内存数除以kylin任务所需的核心数和内存数,两者求个最小值,就是Executor的可用总数量。

④最后在Executor的预估总数量和Executor的可用总数量之间取最小值作为Executor的实际最终总数量。

(5)Shuffle分区数量规则

       设置'spark.sql.shuffle.partitions'为'max(2, ${最大文件大小MB} / 32)'。

在应用上述所有分配规则后,可以在“kylin.log”文件中找到一些日志消息:

2)根据实际情况手动设置Spark参数

根据Kylin自动调整的配置值,如果仍存在一些Cube构建性能问题,可以适当更改这些配置的值以进行尝试,例如:

(1)如果您从spark ui观察到某些任务中存在严重的GC现象,或者发现大量executor丢失或获取失败错误,您可以更改这两个配置的值,以增加每个executor的内存:

  • spark.executor.memory=
  • spark.executor.memoryOverhead=

    一般调整策略是将参数的值调整为2倍。如果问题解决了,您可以适当地调整以避免浪费资源。在增加每个Executor的内存后,如果仍有严重的内存问题,可以考虑调整'spark.executor.cores'为1,此调整可以使单个任务是每个Executor的独家任务,并且执行效率相对较低,但它可以通过这种方式来避免构建失败。

    (2)如果您从spark ui观察到,有大量任务需要为多轮计划(每轮都用掉了所有内核),您可以更改这两个配置的值,以增加spark应用程序的内核数:

    • spark.executor.cores=
    • spark.executor.instances=

      (3)如果有一些Executor丢失或获取数据错误,并且仅仅因为Shuffle期间的减速器数量太小,或者数据倾斜,可以尝试增加'spark.sql.shuffle.partitions'的值。

      • spark.sql.shuffle.partitions=

        全局字典构建性能调优

        1)全局字典介绍

        在OLAP数据分析领域,根据去重结果的要求分为近似去重和精确去重,而精确去重是一种非常常见的要求。

        对于精确去重,最常用的处理方法是bit map方法。对于整型数据,我们可以将这些整数直接保存到bit map中。但除了整型之外,还有其他类型,如String,为了实现精确的重复数据删除,我们首先需要对这些数据建立一个字典进行统一映射,然后使用bit map方法进行统计。

        Kylin使用预计算技术来加速大数据分析。在Cube的增量构建过程中,为了避免由于对不同时间段分别构造字典而导致最终去重结果出现错误,一个Cube中的所有segments将使用同一个字典,即全局字典。

        原理:

        每个构建任务都将生成一个新的全局字典

        每个新的构建任务的字典会根据版本号保存,旧的全局字典会逐渐删除

        一个全局字典包含一个元数据文件和多个字典文件,每个字典文件称为一个bucket (bucket)。

        每个bucket被划分为两个映射(Map),并将这两个映射组合成一个完整的映射关系。

        原理:Global Dictionary on Spark - Global Dictionary on Spark - Apache Software Foundation

                 滑动验证页面

        2)调优参数

        如果cube定义了精确去重(即count(distinct)语法)的度量值,Kylin4.0将基于Spark为这些度量值分布式地构建全局字段的值(之前版本是单点构建)。这部分的优化主要是调整一个参数:

        1. kylin.dictionary.globalV2-threshold-bucket-size  (默认值500000)

        如果CPU资源充足,减少此配置的值可以减少单个分区中的数据量,从而加快构建全局字典。

        3)使用全局字典

        在已有的Model中,创建一个新的Cube用于测试全局字典,设置度量为COUNT_DISTINCT,返回类型选择Precisely

        如果构建失败,可能是yarn资源限制,构建时单个容器申请的cpu核数超过yarn单个容器默认最大4核,修改hadoop的yarn-site.xml,分发配置文件,重启yarn

             yarn.scheduler.maximum-allocation-vcores

             8

        4)查看全局字典

        在HDFS的Kylin元数据目录下,对应工程目录会生成一个dict目录。

         快照表构建性能调优

        Snapshot Table - 快照表:每一张快照表对应一个Hive维度表,为了实时记录Hive维度表的数据变化,Kylin的cube每次构建都会对hive维度表创建一个新的快照,以下是快照表的调优参数。

        构建Snapshot table时,主要调整2个参数来调优:

        参数名

        默认值

        说明

        kylin.snapshot.parallel-build-enabled

        true

        使用并行构建,保持开启

        kylin.snapshot.shard-size-mb

        128MB

        如果CPU资源充足,可以减少值来增加并行度,建议并行度在Spark应用CPU核数的3倍以内。

        并行度=原表数据量/该参数

        查询性能优化

        在Kylin4.0中,查询引擎(SparderContext)也使用spark作为计算引擎,它是真正的分布式查询引擎,特别是在复杂查询方面,性能会优于Calcite。然而,仍然有许多关键性能点需要优化。除了上面提到的设置适当的计算资源之外,它还包括减少小的或不均匀的文件,设置适当的分区,以及尽可能多地修剪parquet文件。Kylin4.0和Spark提供了一些优化策略来提高查询性能。

         使用排序列快速读取parquet文件

        创建cube时,可以指定维度列的排序,当保存cube数据时,每个cuboid的第一个维度列将用于执行排序操作。其目的是在使用排序列进行查询时,通过parquet文件的最小最大索引尽可能地过滤不需要的数据。

               在cube构建配置的高级配置中,rowkey的顺序就是排序顺序:

        页面中可以左键点击ID进行拖拽,调整顺序

         使用shardby列来裁剪parquet文件

        Kylin4.0底层存储使用的是Parquet文件,并且Parquet文件在存储的时候是会按照某一列进行分片的。这个分片的列在Kylin里面,我们称为是shardBy列,Kylin默认按照shardBy列进行分片,分片能够使查询引擎跳过不必要的文件,提高查询性能。我们在创建Cube时可以指定某一列作为shardBy列,最好选择高基列(基数高的列),并且会在多个cuboid中出现的列作为shardBy列。

               如下图所示,我们按照时间(月)过滤,生成对应的Segment,然后按照维度A作为shardBy列进行分片,每个Segment里面都会有相应的分片。如果我们在查询的时候按照时间和维度A进行过滤,Kylin就会直接选择对应Segment的对应分片,大大的提升的查询效率。

        在Kylin4.0中,parquet文件存储的目录结构如下:

        查询时,查询引擎可以通过日期分区列过滤出segment-level目录,并通过cuboid过滤出cuboid-level目录。但是在cuboid-level目录中仍有许多parquet文件,可以使用shard by列进一步裁剪parquet文件。目前在SQL查询中只支持以下过滤操作来裁剪parquet文件:Equality、In、InSet、IsNull。

        1)修改cube配置

        这里拿已有的cube来做演示,先对已有cube清空数据。

        对其disable禁用:

        2)指定shardby列

        进行编辑:

        点击高级配置:

        选择需要的列,将shardby改成true。

        点击Overview,选择保存:

        3)重新构建

        当构建cube数据时,它会根据这个shard按列对parquet文件进行重分区。如果没有指定一个shardby的列,则对所有列进行重分区。

        减少小的或不均匀的parquet文件

        在查询时读取太多小文件或几个太大的文件会导致性能低下,为了避免这个问题,Kylin 4.0在将cube数据作为parquet文件构建时,会按照一定策略对parquet文件进行重分区,以减少小的或不均匀的parquet文件。

         相关配置

        参数名

        默认值

        说明

        kylin.storage.columnar.shard-size-mb

        128MB

        有shardby列的parquet文件最大大小

        kylin.storage.columnar.shard-rowcount

        2500000

        每个parquet文件最多包含的行数

        kylin.storage.columnar.shard-countdistinct-rowcount

        1000000

        指定cuboid的bitmap大小

        kylin.storage.columnar.repartition-threshold-size-mb

        128MB

        每个parquet文件的最大大小

        6.3.2 重分区的检查策略

        (1)如果这个cuboid有shardBy的列;

        (2)parquet文件的平均大小 < 参数'kylin.storage.columnar.repartition-threshold-size-mb'值 ,且 parquet文件数量大于1;这种情况是为了避免小文件太多;

        (3)parquet文件的数量 <  (parquet文件的总行数/ 'kylin.storage.columnar.shard-rowcount' * 0.75),如果这个cuboid有精确去重的度量值(即count(distinct)),使用'kylin.storage.columnar.shard-countdistinct-rowcount'来代替'kylin.storage.columnar.shard-rowcount';这种情况是为了避免不均匀的文件;

        如果满足上述条件之一,它将进行重分区,分区的数量是这样计算的:

        ${fileLengthRepartitionNum} =

        Math.ceil(${parquet文件大小MB} / ${kylin.storage.columnar.shard-size-mb})

        ${rowCountRepartitionNum} =

        Math.ceil(${parquet文件总行数} / ${kylin.storage.columnar.shard-rowcount})

               分区数量=Math.ceil(( ${fileLengthRepartitionNum} + ${ rowCountRepartitionNum } ) / 2)

         合理调整参数的方式

        1)查看重分区的信息,可以通过下面命令去log中查找

        grep "Before repartition, cuboid" logs/kylin.log

        比如官方案例:可以看到分区数有809个。

        2)增大'kylin.storage.columnar.shard-rowcount'或'kylin.storage.columnar.shard-countdistinct-rowcount'的值,重新构建,查看日志:

        可以看到:分区数变成了3个,构建的时间也从58分钟降低到24分钟。

        4)查询性能得到提高

        原先查询要1.7秒,扫描58个文件:

        调整参数后,查询只要0.4秒,扫描4个文件:

         将多个小文件读取到同一个分区

        当已经构建的segments中有很多小文件时,可以修改参数'spark.sql.files.maxPartitionBytes' (默认值为128MB)为合适的值,这样可以让spark引擎将一些小文件读取到单个分区中,从而避免需要太多的小任务。

               如果有足够的资源,可以减少该参数的值来增加并行度,但需要同时减少'spark.hadoop.parquet.block.size'(默认值为128MB)的值,因为parquet文件的最小分割单元是RowGroup,这个blocksize参数表示parquet的RowGroup的最大大小。

         使用堆外内存

        Spark可以直接操作堆外内存,减少不必要的内存开销,减少频繁的GC,提高处理性能。

        相关配置:

        spark.memory.offHeap.enabled

        设置为true,使用堆外内存进行spark shuffle

        spark.memory.offHeap.size

        堆外内存的大小