spark基本原理&UI界面解读

这里是引用

1 八股文

1.1 基本原理

  1. driver节点是整个应用程序的指挥所

  2. 指挥官是sparkcontext

  3. 环境:构建一个集群

  4. 应用程序提交

  5. 确定主节点,确定指挥所driver,确定指挥官sparkcontext

  6. sparkcontext会向资源管理器申请资源

  7. 会将作业分为不同阶段

  8. 将不同任务分到不同节点执行

  9. 整个过程还会进行监控

  10. 资源管理器收到sparkcontext的资源请求

  11. 会向executor分配资源

  12. 启动executor进程,才会启动线程

  13. executor进程是驻留在不同的work node中

  14. 会有成百上千个进程和work node

  15. sparkcontext对象要根据 rdd依赖关系 构建一个DAG图

  16. 代码就是针对RDD一次次的操作

  17. 这些操作会被转换成一个有向无环图 dag

  18. DAG会被提交到dag scheduler解析

  19. DAG图会被切为很多个阶段 stage

  20. 每个stage又分为若干个任务

  21. 每一个阶段stage是任务的集合

  22. 把这个阶段stage提交给task scheduler

  23. task scheduler负责分发任务

  24. worker node上的executor会向task scheduler主动申请

  25. task scheduler会返回任务给worker node上的executor去派生线程去执行

  26. 计算给节点的分发原则:

  27. 计算向数据靠拢。数据在哪个节点上面,task scheduler优先分配,完成本地化的处理。

  28. executor运行的结果会再次反馈给task scheduler

  29. 再向上传给 dag scheduler

  30. spark context做最后的处理。返回给用户看或者写入HDFS

  31. sparkcontext:代表了整个应用程序连接集群的通道。链接应用和集群

1.2 核心原理

1.2.1 关键概念

driver: 该进程调用spark程序的main方法,并且启动sparkcontext

cluster Manager:该进程负责和外部集群工具打交道,申请或释放集群资源

Woker:该进程是一个守护进程,负责启动和管理executor

executor:该进程是一个JVM虚拟机。负责运行spark task

1.2.2 运行步骤

  1. 启动driver,创建sparkcontext
  2. client提交程序给driver,driver向cluster manager申请集群资源
  3. 资源申请完毕,在worker中启动executor
  4. driver将程序转化为tasks,分发给executor执行

1.2.3 spark运行模型

1.2.3.1 单机

使用线程模拟并行来运行程序

  • 一般用于测试开发
  • 不能启动spark的master、worker守护进程
  • 不能启动Hadoop的各项服务
  • sparksubmit进程是客户提交任务的client进程,又是spark的driver程序、还充当着spark执行task的executor的角色
    1.2.3.2 集群

    使用集群管理器来和不同类型的集群交互,将任务运行在集群中。

    • spark standalone
    • hadoop yarn
    • Apache mesos
    • kubernetes

      2 Spark UI深入解读

      • 在日常的开发工作中,我们总会遇到spark应用运行失败,或者执行效率不达预期的情况。对于这样的问题,想找到根本原因(root cause),就可以通过spark ui来获取最直接、最直观的线索,在全量的审查spark应用的同时,迅速定位问题所在。
      • 如果我们把失败的、或是执行低效的spark应用看做是“病人”的话,那么spark ui中关于应用的众多衡量指标(metrics),就是这个病人的“体检报告”。结合多样的metrics,身为“大夫”的开发者可以结合经验来迅速定位“病灶”。

        2.1 spark UI一级入口

        打开spark UI,映入眼帘的是默认的Jobs页面。JObs页面记录着应用中涉及的actions动作,以及与数据读取、移动有关的动作。其中每一个action都对应着一个job,而每一个job都对应着一个作业。我们一会再去对jobs页面做展开,现在先把目光集中在spark ui最上面的导航条,这里面罗列这spark ui的所有一级入口

        入口页包含内容。作用
        jobsactions,以及数据的读取与移动等操作。作业详情预览
        stagesDAG中每个stage的入口。作业详情预览
        storage分布式数据集缓存(cache)详情页。审查cache在内存与磁盘中的分布情况
        storage分布式数据集缓存(cache)详情页。审查cache在内存与磁盘中的分布情况
        environment配置项,环境变量详情。审查spark配置项是否符合预期
        executors分布式运行环境与计算负载详情页。深入审查执行计划中的每一个环节
        • JOB:作业详情的预览
          • 什么情况下spark会产生一个job?对应的action算子会产生,对应的数据的读取和移动操作。
          • stage。一个Job可以拆分为多个stage。job拆分stage的详细情况。划分stage的依据是宽窄依赖
          • storage。在cache数据的时候,会有应用。
          • environment。spark任务的配置项、环境变量。
          • executor。分布式运行环境计算负载的详情页,作用是审查executor之间是否存在数据倾斜。比如,看input的数据的差异用于评估负载情况。
          • SQL。spark SQL执行计划的详情页。

            executor、environment、storage不存在二级入口,但是SQL、storage、JOBs有二级入口

            2.1.1 executors

            2.1.1.1 summary
            • RDD blocks:原始分区数据集的分区数量
            • storage memory:用于cache时所占的内存的占用情况。
            • disk used。计算过程中消耗的磁盘空间。
            • cores。用于计算的CPU的核数。
            • acitive task:活跃的task数量
            • failed task:失败的task数量
            • complete task:完成的task数量
            • task time(GC time)。任务的执行时候,以及任务的GC时间
            • input。输入数据量的大小
            • shuffle read 大小
            • shuffle write 大小
            • blocklisted。黑名单
              2.1.1.2 executors
              • executors tab的主要内容如下,主要包含“summary”和“executors”两部分。这两部分记录的度量指标是一致的,其中“executors”以更新粒度记录者每一个executor的详情,而第一部分“summary”是下面所有executors度量指标的简单加和。
              • sparkUI都提供了哪些metrics,来量化每一个executor的工作负载(workload)。
                metric含义
                RDD原始数据集的分区数量
                storage memory用于cache的内存占用
                disk used计算过程中消耗的磁盘空间
                cores用于计算的CPU核数
                active/failed/completetotal tasks
                task time(GC time)任务执行时间(括号内为任务GC时间)
                input输入数据量大小
                shuffle head/writeshuffle读写过程中消耗的数据量
                logs/thread dump日志与core dump
                • 不难发现,executors页面清清楚楚的记录着每一个executor消耗的数据量,以及他们对CPU、内存与磁盘等硬件资源的消耗。基于这些信息,我们可以轻松判断应用中是否存在数据倾斜的隐患。
                • Thread Dump。Java中的诊断工具,每个JVM都可以显示所有线程在某一个点的状态,用作Java定位问题的诊断功能。
                  • runnable。当前可以运行的线程
                  • timed-waiting。线程主动等待的意思。
                  • waiting。等待的线程。

                    summary是executors所有指标聚合的情况。

                    基于这些信息,盘点不同executor之间是否存在负载不均衡的情况、数据倾斜的隐患。

                    2.1.2 environment

                    • 各种各样环境变量与配置信息。

                      metric含义
                      runtime informationJava Scala版本号信息
                      spark properties所有spark配置项的设置细节,重点
                      Hadoop propertieshadoop配置项细节
                      system properties应用提交方法(spark-shell/spark-submit)
                      classpath entriesclasspath路径设置信息
                      • spark properties是重点。其中记录着所有运行时生效的spark配置项设置。通过spark properties,我们可以确认运行时的设置,与我们预期的设置是否一致,从而排除因配置项设置错误而导致的稳定性或是性能问题。
                        2.1.2.1 runtime information
                        2.1.2.2 spark properties

                        spark任务的各种配置项、判断参数是否合理

                        2.1.2.3 resource properties

                        不重要

                        2.1.2.4 Hadoop properties

                        Hadoop的各种配置项

                        2.1.2.5 system properties

                        系统配置项,可以看启动命令。sum.java.command

                        2.1.2.6 classpath properties

                        配置、jar包的路径

                        2.1.3 storage

                        • 记录了每一个缓存,rdd cache、dataframe cache。包括缓存级别、已缓存的分区数、缓存比例、内存大小与磁盘大小。
                        • spark支持不同的缓存级别,他是存储介质(内存、磁盘)、存储形式(对象、序列化字节)与副本数量的排列组合。对于data frame来说,默认的级别是单副本的disk memory deserialized,也就是存储介质为内存加磁盘,粗出形式为对象的单一副本存储方式。
                          metric含义
                          storage level存储级别
                          cached partitions已缓存的分区数
                          fraction caches缓存比例
                          size in memory内存大小
                          size on disk磁盘大小
                          • cached partitions 和fraction caches分别记录着数据集成功缓存的分区数量,以及这些缓存的分区占所有分区的比例。当fraction cached小于100%的时候。说明分布式数据集并没有完全缓存到内存(或是磁盘)。对于这种情况,我们要警惕缓存换入换出可能带来的性能隐患。

                          • 基于storage页面提供的详细信息,我们可以有的放失的设置于内存有关的配置项,如spark.executor.memory、spark.executor.fraction、spark.executor.storageFraction、从而有针对性的对storage memory进行调整。

                          • cache partitions 已缓存的分区数

                          • fraction cached。缓存的比例,代表缓存的分区占所有分区的比例,当小于100%的时候,代表分布式的数据没有完全划分到内存或者磁盘里面。

                            • 缓存换入换出,有可能带来性能的问题。
                            • size in memory。内存缓存的大小

                              • storage memory不足的情况下,会把他size到磁盘里面。
                              • size in disk。磁盘缓存的大小

                                2.1.4 SQL

                                • 以actions为单位,记录着每个action对应的sparksql执行计划。我们需要点击“description”列中的超链接,才能进入到二级页面,去了解每个执行计划的详细信息。
                                • Jobs:同理,低于jobs来说,spark ui也是以actions为粒度,记录着每个action对应作业的执行情况。我们要了解作业详情页,也必须通过“description”页面提供的二级入口链接。
                                • 一个action对应一个query,一个query会有多个job id。
                                • 以actions为单位,记录着每个action对应的spark sql执行计划。我们需要点击“description”列中的超链接,才能进入到二级页面,去了解每个执行计划的详细信息。

                                  2.1.4 JOBs

                                  • description。描述
                                  • submitted。提交时间
                                  • duration。执行时间
                                  • stage。成功

                                    2.1.4 stage

                                    • 我们知道,每一个作业,都包含多个阶段,就是我们常说的stages。在stages页面,spark ui罗列了应用中涉及的所有stage,这些stages分属于不同的作业。要想查看哪些stages隶属于哪个job,还需要从jobs的descritions二级入口进入查看。
                                    • stage页面,更多的是一种预览,要想查看每一个stage的详情,同样需要从“description”进入详情页。

                                      总结:

                                      一级入口重点内容
                                      executors不同executors之间,是否存在负载倾斜
                                      environment不同executors之间,是否存在负载倾斜
                                      storage分布式数据集的缓存级别,内存,磁盘缓存比例
                                      SQL初步了解不同执行计划的执行时间,确实是否符合预期
                                      jobs初步感知不同jobs的执行时间,确实是否符合预期
                                      stage初步感知不同stage的执行时间,确实是否符合预期
                                      • 记录了以action为粒度,记录了每个action作业的情况。
                                      • executor可以看到不同executor负载情况、执行情况,判断数据倾斜
                                      • environment可以看到spark任务的配置情况,判断配置是否合理。参数的配置。
                                        • 配置优先级:code>conf>默认
                                        • SQL。可以了解不同任务执行时间是否符合预期。
                                        • job。可以看到job的执行情况,是否符合预期
                                        • storage。可以看到storage的执行情况,是否符合预期
                                        • stage。可以看到stage的执行情况,是否符合预期

                                          2.2 spark UI二级入口

                                          • 所为二级入口,指的是通过一次超链接跳转才能访问到的页面。对于SQL、jobs和stages这三类入口来说,二级入口往往已经提供了足够的信息,基本覆盖了“体检报告”的全部内容。因此,尽管spark UI也提供了少量的三级入口(需要凉调才能到达的页面),但是这些隐藏在“犄角旮旯”的三级入口,往往不需要开发者去特别关注。
                                          • 接下来我们就沿着sql->job->stages的顺序,一次的去访问他们的耳机入口,从而针对全局dag,作业以及执行阶段,获得更加深入的探索和洞察。

                                            2.2.1 sql详情页

                                            • 在 SQLtab一级入口,我们看到有1个条目

                                            • 点击图中的“description”,即可进入到该作业的执行计划页面,如下图所示。

                                              2.2.1.1 exchange
                                              • 可以看到,对应每一个exchange,spark ui都提供了丰富的metrics来刻画shuffle的计算过程。从shuffle write到shuffle read,从数据量到处理时间,应有尽有。
                                                metrics含义
                                                shuffle records writtenshuffle write阶段写入的数据条目数量
                                                shuffle write time totalshuffle write阶段花费的写入时间
                                                records readshuffle read阶段读取的数据条目数量
                                                local bytes read totalshuffle read阶段从本地节点读取的数据总量
                                                fetch wait time totalshuffle read阶段花费在网络传输上的时间
                                                remote bytes read totalshuffle read阶段跨网络、从远端节点读取的数据总量
                                                date size total原始数据在内存中展开之后的总大小
                                                remote bytes read to diskshuffle read阶段因数据块过大而直接落盘的情况
                                                shuffle bytes written totalshuffle中间文件总大小
                                                • 结合这份shuffle的体检报告,我们就能一量化的方式,去掌握shuffle过程的计算细节,从而为调优提供更多的洞察和思路。

                                                  2.2.1.2 sort

                                                  • 接下来,我们再来说说sort。相比exchange,sort的度量指标没有那么多,不过,他们足以让我们一窥sort再运行时,对内存的消耗,如下图所示。

                                                    -

                                                    metrics含义
                                                    sort time total排序消耗的总时间
                                                    peak memory total内存的消耗峰值(集群范围内)
                                                    spill size total排序过程中移除到磁盘的数据总量
                                                    • 可以看到“peak memory total ”和“spill size total”这两个数值,足以指导我们更有针对性的去设置spark.executor.memory、spark.memory.fraction、spark.memory.storageFraction,从而使得execution memory区域得到充分的保障。
                                                      2.2.1.2 aggragate
                                                      • 与sort类似,衡量aggregate的度量指标,主要记录的也是操作的内存消耗。

                                                        -

                                                      • 可以看到对于aggregate操作,spark ui也记录着磁盘移除与峰值消耗,即spill size和peak memory total。这两个数值也为内存的调整提供了依据。
                                                        2.2.2 jobs详情页
                                                        • 接下里,我们再来说说jobs详情页。jobs详情页非常的简单、直观,他罗列这隶属于当前job的所有stages。要想访问每一个stage的执行细节,我们还需要通过description的超链接做跳转。

                                                          2.2.3 stages详情页
                                                          • 实际上,要访问stage详情,我们还有另外一种选择,那就是直接从stages以及入口进入,然后完成跳转。因此stage详情页也归类为二级入口。
                                                          • 接下来我们以id为8的stage位例,去看一看详情页都记录着哪些关键信息。

                                                          • 在所有的二级入口中,stage详情页的信息流可以说是巨大的。点击stage详情页,可以看到他主要包含3大信息类。分别是stage dag、event timeline与task metrics.
                                                          • 其中task metics又分为summary和entry detail两部分,提供不同粒度的信息汇总。而task metrics中记录的指标类别,还可以通过“show additional metrics”选项进行扩展。
                                                            2.2.3.1 stage dag
                                                            • 接下来,我们沿着“stage dag -> event timeline -> task metrics的顺讯,依次讲讲这些页面所包含的内容
                                                            • 首先,我们先来看看最简单的stage dag。点开蓝色的dag visualization按钮,我们就能获取到当前stage的dag,如下图所示。
                                                            • 之所以说stage dag简单,是因为咱们在SQL🎧入口,已经对dag做过详细的说明。而stage dag仅仅是SQL页面完整的dag的子集,毕竟,SQL页面的dag,针对的是作业(job)。因此,只要掌握了作业的dag,自然也就掌握了每一个stage的dag
                                                              2.2.3.2 event timeline
                                                              • 与dag visualization并列,在summary metrics之上有一个event timeline的按钮,点开它,我们可以得到如下图所示的可视化信息。

                                                                -

                                                              • even timeline,记录着分布式任务调度与执行的过程中,不同计算环节主要的时间花销。图中的每一个条带,都代表着一个分布式任务,条带由不同颜色构成。其中不同颜色的矩形,代表不同环节的计算时间。
                                                              • 为了方便叙述,我还是同表格形式帮你梳理了这些环节的含义与作用,你可以保存以后随时查看。
                                                                metrics含义
                                                                scheduler delay调度延迟(调度系统开销)
                                                                task deserialization time任务的反序列化时间(调度系统开销)
                                                                shuffle read timeshuffle read时间开销
                                                                executor computing time计算时间
                                                                shuffle write timeshuffle write时间开销
                                                                result serialization time任务结果的序列化时间
                                                                getting result time结果收集花费的时间
                                                                • 理想情况下,条带的大部分应该都是绿色的,也就是任务的时间消耗,大部分都是执行时间。不过,实际情况并不总是如此,比如,有些时候,蓝色的部分占比较多,或是橙色的部分占比较大
                                                                • 在这些情况下,我们就可以结合event timeline,来判断作业是否存在跳读开销过大,或是shuffle负载过重的问题,从而有针对性的对不同环节做调优。
                                                                • 比方说,如果条带中深蓝的部分(scheduler delay)很多,那就说明任务的调度开销 很重。这个时候,我们就需要参考”三足鼎立“的掉调优方法,去相应的调整cpu、内存与并行度、从而减低任务的调度开销。
                                                                • 再比如,如果条带中黄色(shuffle write time)与橙色(shuffle read time)的面积较大,就说明任务的shuffle负载很重,这个时候,我们就需要考虑,有没有可能通过利用broadcast join来消除shuffle,从而缓解任务的shuffle负担。
                                                                  2.2.3.3 task metrics
                                                                  • 说完stage dag和event timeline,最后,我们再来说一说stage详情页的重头戏:task metrics。
                                                                  • 之所以说他是重头戏,在于task metrics以不同的粒度,提供了详尽的量化指标。其中,task以task为粒度,记录着每一个分布式任务的执行细节,而summary metrics则是对于所有tasks执行细节的统计汇总。我们先看看粗粒度的summary metrics,载入展开细粒度的task。
                                                                    2.2.3.4 summary metrics
                                                                    • 首先我们点开show additional metrics按钮,勾选select all,让那所有的度量指标都生效,如下图所示。这么做的目的,在于获取最详尽的task执行信息。
                                                                    • 可以看到,select all生效之后,spark ui打印出了所有的执行细节。老规矩,为了方便叙述,我还是把这些metrics整理到表格中,方便你随时查阅。其中,task deserialization time、result serialization time、getting result time、scheduler delay与刚刚表格中的含义相同,不再赘述,这里我们紧整理出新出现的task metrics。
                                                                    • metrics含义
                                                                      durationtask 执行时间
                                                                      gc time任务执行过程中,Java gc时间
                                                                      peak execution memory内存消耗峰值
                                                                      spill(memory)溢出数据的内存占用
                                                                      spill(disk)溢出数据的磁盘占用
                                                                      shuffle read size/ recordsshuffle read 阶段读取的数据量与条目数量
                                                                      shuffle read blocked timeshuffle read阶段的网络延迟
                                                                      shuffle remote readsshuffle read阶段跨节点、从远端节点拉取的数据量
                                                                      shuffle write size/recordsshuffle write阶段写入的数据量与条目数量
                                                                      shuffle write timeshuffle write阶段话费的写入时间
                                                                      • 对于这些详尽的task metrics,难能可贵的,spark UI以最大最小(max min)以及分位点(25%分位、50%分位、75%分位)的方式,提供了不同的metrics的统计分布。这一点非常重要,原因在于,这些metrics的统计分布,可以让我们非常清晰的量化任务的负载分布。
                                                                      • 换句话说,根基不同metrics的统计信息分布,我们就可以轻而易举的判定,当前作业的不同任务之间,是相对均衡,还是存在严重的倾斜。如果判断计算存在负载倾斜,那么我们就需要利用手工加盐或者是aqe的自动倾斜处理,去消除任务之间的不均衡,从而改善作业的性能。
                                                                      • 在上面的表格中,有一半的metrics是预shuffle直接相关的,比如shuffle read size/record,shuffle remote reads等等。
                                                                      • 这些metrics我们在介绍SQL详情的时候,已经详细说过了。另外duration、gctime以及peak execution memory,这些metrics的含义,要么已经讲过,要么过于简单、无需解释。因此,对于这三个指标,咱们也不多写笔墨。
                                                                      • 这里要特别值得关注的,是spill(memory)和spill(disk)这两个指标。spill,也即溢出数据,他指的是因内存数据结构(partitionpairbuffer、appendonlymap,等等)空间受限,而腾挪出去的数据。spill(memory)表示的是,这部分数据在内存的存储大小,而spill(disk)表示的是,这些数据在磁盘中的大小。
                                                                      • 因此,用spill(memory)除以spill(disk),就可以得到“数据膨胀系数”的近似值,我们把他记为explosion ratio。有了explosion ratio,对于一份存储在磁盘中的数据,我们就可以预估他在内存中的存储大小,从而准确的把我数据的内存消耗。
                                                                        2.2.3.4 task
                                                                        • 介绍完粗粒度的summary metrics,接下来,我们再来说说细粒度的tasks。实际上,task的不少指标,是预summary高度重合的,如下图所示。同理,这些重合的metrics,咱们不在赘述,你可以参考summary的部分,来理解这些metrics。唯一的区别,就是这些指标是针对每一个task进行度量的。

                                                                          metrics含义
                                                                          locality level本地性级别
                                                                          logs执行日志
                                                                          errors执行错误细节
                                                                          • 可以看到新指标并不多,这里最值得关注的,是locality level,也就是本地性级别。在调度系统重,我们讲过,每个task都有自己的本地性倾向。结合本地性倾向,调度系统会把tasks调度到适度的executors或者是计算节点,尽可能保证数据不动,代码动。
                                                                          • logs与errors属于spark ui的三级入口,他们是tasks的执行日志,详细记录了tasks在执行过程中的运行时状态。一般来说,我们不需要深入到三级入口进行debug。errors列提供的报错信息,往往足以让我们迅速的定位问题所在。
                                                                          • 思考问题:为什么有跳过的skip stage或者task?

                                                                            • skipped stages表示已经执行过了。

                                                                              rdd rdd.filter.map => transformation

                                                                              rdd.cache

                                                                              count reduce的操作 两个都是action

                                                                              count

                                                                              执行reduce的时候不需要重新计算他的transformation

                                                                              3 Spark 参数调优与Spark sql调优

                                                                              3.1 spark数据倾斜的解决方案

                                                                              产生数据倾斜的主要原因是在shuffle中不同的key对应的数据量不同,导致不同的task分配的数据量不均衡。

                                                                              • 提高shuffle的并行度
                                                                              • 使用随机数前缀。将相同的key增加不同的前缀,使这些相同的key分散到不同的task进行处理。
                                                                                • 缺点:对内存要求高
                                                                                • reduce join转为map join。适用于两个表里面一个表比较小的场景,在map端进行小表广播,用map算子实现与join相同的效果。
                                                                                  • 优点:不需要shuffle
                                                                                  • 缺点:只只用于大表join小表的情况。
                                                                                  • 过滤少数导致数据倾斜的key
                                                                                    • 缺点:数据量多的key没有业务使用上的含义。场景单一
                                                                                    • 优点:实现简单。
                                                                                    • 在hive进行预处理,然后将数据传给hive
                                                                                    • 使用两阶段预聚合操作。
                                                                                      • 先局部聚合,再做全局聚合。适用于reducebykey groupby的场景。
                                                                                      • 优点:显著提升spark性能
                                                                                      • 缺点:适用于最固定的场景