[spark] RDD 编程指南(翻译)

Overview

从高层次来看,每个 Spark 应用程序都包含一个driver program,该程序运行用户的main方法并在集群上执行各种并行操作。

Spark 提供的主要抽象是 resilient distributed dataset(RDD),它是跨集群节点分区的元素集合,可以并行操作。RDD 是通过从 Hadoop 文件系统中的文件开始创建的。用户还可以要求 Spark 将 RDD 持久保存在内存中,从而使其能够在并行操作中高效地重用。最后,RDD 会自动从节点故障中恢复。

Spark 中提供的第二抽象是 shared variables ,他可以用在并行操作中。默认情况下,当 Spark 将函数作为一组任务(task)在不同节点上并行运行时,它会将函数中使用的每个变量的副本携带给每个任务。有时,变量需要在任务之间共享或者在driver program和任务之间共享。Spark 支持两种类型的 shared variables,一是 broadcast variables 可用于在所有节点的内存中缓存一个值,二是 accumulators

,他是仅“added”的变量,例如counters和sums。

Resilient Distributed Datasets (RDDs)

Spark围绕 RDD 的概念展开,RDD是可以并行操作的元素的容错集合。有两种方法可以创建RDD:在driver program中并行化现有集合,或者引用外部存储系统中的数据集,例如共享文件系统、HDFS、HBase或任何提供Hadoop InputFormat的数据源。

Parallelized Collections

并行化集合是通过在驱动程序中的现有集合上调用JavaSparkContext的并行化方法创建的。集合的元素被复制以形成可以并行操作的分布式数据集。例如,以下是如何创建一个包含数字1到5的并行化集合:

List data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD distData = sc.parallelize(data);

一旦创建,分布式数据集(distData)就可以并行操作。并行集合的一个重要参数是将数据集划分为的分区数量。 Spark 将为集群的每个分区(partition)运行一个任务(task),任务将分配给节点执行。可以手动指定分区数或使用默认值。

RDD Operations

RDD 支持两种类型的操作:

  • transformations(从现有dataset创建新dataset)。例如,map 是一种transformations,它将每个dataset每个元素传递给函数并返回表示结果的新 RDD
  • actions(在对dataset运行计算后将值返回给driver program)。例如,reduce 是一个使用某个函数聚合 RDD 的所有元素并将最终结果返回给driver program的操作

    Spark中的所有transformations都是 lazy 的,因为它们不会立即计算结果。相反,它们只记住应用于某些基本 dataset(例如文件)的transformations。只有当操作需要将结果返回给driver program时,transformations 才会被计算。这样的设计使得Spark能够更高效地运行。例如,我们可以意识到,通过map创建的dataset将在reduce中使用,并且只将reduce的结果返回给driver program,而不是更大的映射dataset。

    默认情况下,每次对transform后的RDD运行操作时,都可能会被重新计算。但是,您也可以使用持久(或缓存)方法将RDD持久化在内存中,在这种情况下,Spark将保留集群中的元素,以便在您下次查询时更快地访问它。还支持在磁盘上持久化RDD,或跨多个节点复制。如下图所示,如果不cache/persist 任何内容,那么每次您需要输出时(当您调用诸如“count”之类的操作时),都会从磁盘读取数据并完成操作。您可以在读取后进行缓存,然后所有其他操作都会跳过读取并从缓存的数据开始。

    为了说明 RDD 基础知识,请看下面的简单程序:

    JavaRDD lines = sc.textFile("data.txt");
    JavaRDD lineLengths = lines.map(s -> s.length());
    int totalLength = lineLengths.reduce((a, b) -> a + b);
    

    第一行定义了来自外部文件的基本RDD。该数据集没有加载到内存中,也没有以其他方式对其进行操作:行只是指向文件的指针。第二行将lineLengths定义为map transformation的结果。同样,由于 lazy,lineLengths不会立即计算。最后,我们运行 reduce,这是一个操作。此时,Spark将计算分解为在不同机器上运行的任务,每台机器都运行其 map 和本地数据的 reduce,只将其答案返回给driver program。

    如果 lineLengths 可能被再次使用,可以增加下面代码

    lineLengths.persist(StorageLevel.MEMORY_ONLY());
    

    在reduce之前,这会导致lineLengths在第一次计算后保存在内存中。

    Understanding closures

    关于Spark,更难的事情之一是理解跨集群执行代码时变量和方法的范围和生命周期。修改超出其范围的变量的RDD操作可能是混淆的常见来源。在下面的示例中,我们将查看使用foreach()增加计数器的代码,但其他操作也可能出现类似的问题。

    Example

    考虑下面简单的计算,将RDD元素sum。根据是否在同一JVM中执行,它的行为可能会有所不同。一个常见的例子是在本地模式下运行Spark(–master=local[n])与将Spark应用程序部署到集群(例如,通过Spark提交到YARN):

    int counter = 0;
    JavaRDD rdd = sc.parallelize(data);
    // Wrong: Don't do this!!
    rdd.foreach(x -> counter += x);
    println("Counter value: " + counter);
    
    Local vs. cluster modes

    上述代码的行为是未定义的,可能无法按预期工作。为了执行作业,Spark将RDD操作的处理分解为任务,每个任务都由执行器执行。在执行之前,Spark计算任务的 closures。closures 是执行器在RDD上执行计算(在本例中为foreach())时必须可见的变量和方法。此closures 被序列化并发送给每个执行器。

    发送给每个excutor的closure中的变量现在是副本,因此,当在foreach函数中引用counter时,它不再是driver program上的count。driver program的内存中仍然有一个counter,但不再对excutor可见!

    在本地模式下,在某些情况下,foreach 函数实际上将在与driver program相同的 JVM 中执行,并且将引用相同的原始counter,并且可能会更新它。

    为了确保在这些场景中定义良好的行为,应该使用 Accumulator.。Spark中的 Accumulator专门用于提供一种机制,用于在集群中跨worker node 执行时安全地更新变量.

    一般来说,closure——像循环或本地定义的方法这样的构造——不应该被用来改变一些全局状态。Spark不定义或保证对从闭包外部引用的对象的更改行为。执行此操作的一些代码可能在本地模式下工作,但这只是偶然的,这样的代码在分布式模式下不会按预期运行。如果需要一些全局聚合,请使用Accumulator。

    Printing elements of an RDD

    另一个常见的习惯用法是尝试使用rdd. foreach(println)或rdd.map(println)打印出RDD的元素。在单台机器上,这将生成预期的输出并打印RDD的所有元素。但是,在集群模式下,excutor 调用的stdout的输出现在写入执行程序的stdout,而不是driver program上的stdout,因此driver program上的stdout不会显示这些!要在驱动程序上打印所有元素,可以使用 collect() 首先将RDD带到 driver program 节点,因此使用:rdd.collect().foreach(println).

    Working with Key-Value Pairs

    虽然大多数Spark操作适用于包含任何类型对象的RDD,但一些特殊操作仅适用于键值对的RDD。最常见的是分布式“shuffle”操作,例如通过键对元素进行分组或聚合,reduceByKey和sortByKey等。

    Shuffle operations

    Spark中的某些操作会触发称为shuffle的事件。shuffle是Spark重新分配数据的机制,以便在分区之间以不同的方式分组。这通常涉及跨executor和机器复制数据,这使得shuffle成为一项复杂且成本高昂的操作。

    为了理解在shuffle过程中会发生什么,我们可以考虑一个例子,这个例子中有一个reduceByKey 操作,它生成一个新的RDD,其中一个键的所有值都被组合成一个tuple,这个tuple就是键和对与该键相关的所有值执行一个reduce函数的结果。挑战在于,单个键的所有值不一定都位于同一分区,甚至同一台机器上,但它们必须位于同一位置才能计算结果。

    对于大多数操作,Spark不会自动地将数据重新分布到特定的节点或分区以满足特定操作的需要。相反,每个任务通常只处理一个分区内的数据。然而,对于像reduceByKey这样的操作,Spark需要将具有相同键(key)的所有值(value)聚合在一起以进行计算。这意味着,如果这些值分布在不同的分区中,Spark必须执行一个全局的重组操作(all-to-all operation),这个过程被称为shuffle。在shuffle过程中,Spark会执行以下步骤:

    1. 读取所有分区的数据,以找出每个键对应的所有值。
    2. 将具有相同键的值跨分区传输到相同的节点,以便可以对它们进行聚合。
    3. 在每个节点上,对每个键的所有值进行最终的聚合计算,得到每个键的最终结果。

    可能导致随机播放的操作包括repartition operations like repartition and coalesce、‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.

    Performance Impact

    Shuffle是一项昂贵的操作,因为它涉及磁盘I/O、数据序列化和网络I/O。为了组织shuffle的数据,Spark生成一组任务——map 任务来组织数据,以及一组reduce任务来聚合数据。这个术语来自MapReduce,与Spark的map和reduce操作没有直接关系。

    从内部来看,单个map任务的结果保存在内存中直到内存放不下。然后,根据目标分区对它们进行排序并写入单个文件。在reduce端,任务读取相关的排序block

    某些shuffle操作可能会消耗大量的堆内存,因为它们使用内存中的数据结构在传输数据之前或之后组织数据。具体来说,reduceByKey和aggregateByKey在map端创建这些结构,而’ByKey操作在reduce端生成这些结构。当数据在内存放不下时,spark会将这些数据spill到磁盘,从而导致磁盘IO的额外开销和垃圾回收机制的增加。

    Shuffle还会在磁盘上生成大量中间文件。从Spark 1.3开始,这些文件将被保留,直到相应的RDD不再使用并被垃圾收集掉。这样做是为了在重新计算lineage时不需要重新创建随机文件。垃圾收集可能只在很长一段时间后发生,如果应用程序保留对这些RDD的引用,或者如果GC不经常启动。这意味着长时间运行的Spark作业可能会消耗大量磁盘空间。临时存储目录在配置Spark上下文时由park. local.dir配置参数指定。

    RDD Persistence

    Spark中最重要的功能之一是跨操作在内存中持久化(或缓存)dataset。当您持久化RDD时,每个节点都将其计算的任何分区存储在内存中,并在该dataset(或从该dataset派生的dataset)的其他操作中重用它们。这使得未来的操作更快(通常超过10倍)。缓存是迭代算法和快速交互使用的关键工具。

    您可以使用RDD上的persist() cache()方法将其标记为持久化。第一次在操作中计算时,它将保存在节点的内存中。Spark的缓存是容错的——如果RDD的任何分区丢失,它将使用最初创建它的转换自动重新计算。

    此外,每个持久化的RDD都可以使用不同的storage level,来存储,例如,允许您将数据集持久化在磁盘上,将其持久化在内存中,但作为序列化的Java对象(以节省空间),跨节点复制它。cache()方法是使用默认存储级别的简写,即StorageLevel.MEMORY_ONLY(在内存中存储反序列化的对象)。

    Spark还会在shuffle操作中自动持久化一些中间数据(例如,reduceByKey),即使用户没有调用persist。这样做是为了避免在shuffle期间节点发生故障时重新计算整个input。如果用户计划重用新生成的RDD,我们仍然建议他们在生成的RDD上调用persist。

    Which Storage Level to Choose?

    Spark的存储级别旨在在内存使用和CPU效率之间提供不同的权衡。我们建议通过以下过程来选择一个:

    • 如果您的RDD适合默认存储级别(MEMORY_ONLY),请保持这样。这是CPU效率最高的选项,允许RDD上的操作尽可能快地运行。
    • 如果没有,请尝试使用MEMORY_ONLY_SER并选择一个快速序列化库,以使对象更加节省空间,但访问速度仍然相当快。(Java和Scala)
    • 不要spill到磁盘,除非计算数据集的函数很重,或者它们过滤了大量数据。否则,重新计算分区可能与从磁盘读取分区一样快。
    • 如果您想要快速故障恢复(例如,如果使用Spark处理来自Web应用程序的请求),请使用replicated 的存储级别。所有存储级别都通过重新计算丢失的数据提供完全的容错能力,但复制的存储级别允许您继续在RDD上运行任务,而无需等待重新计算丢失的分区。

      Removing Data

      Spark会自动监视每个节点上的缓存使用情况,并以最近最少使用(LRU)的方式删除旧的数据分区。如果您想手动删除RDD而不是等待它从缓存中删除,请使用RDD.unpersist()方法。请注意,此方法默认不阻塞。要在释放资源之前阻塞,请在调用此方法时指定blocking=true。

      Shared Variables

      通常,当传递给Spark操作(如map或reduce)的函数在远程集群节点上执行时,它会在函数中使用的所有变量的单独副本上工作。这些变量被复制到每台机器上,并且远程机器上的变量的更新不会传播回driver program。跨任务支持通用的读写共享变量将是低效的。然而,Spark确实为两种常见的使用模式提供了两种有限类型的共享变量:broadcast variables and accumulators.

      Broadcast Variables

      广播变量允许程序员将只读变量缓存在每台机器上,而不是将其副本与task一起发送。例如,它们可以用来以有效的方式为每个节点提供大型输入数据集的副本,减少了数据传输的开销从task粒度下降到节点粒度。Spark还尝试使用有效的广播算法来分发广播变量,以降低通信成本。

      Spark action通过一组stage执行,由分布式“shuffle”操作分隔。Spark自动广播每个stage内task所需的公共数据。以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前进行反序列化。这意味着显式创建广播变量仅在跨多个stage的task需要相同数据或以反序列化形式缓存数据很重要时才有用。

      广播变量是通过调用SparkContext.broadcast(v)从变量v创建的。广播变量是v的包装器,可以通过调用value方法访问它的值。下面的代码显示了这一点:

      Broadcast broadcastVar = sc.broadcast(new int[] {1, 2, 3});
      broadcastVar.value();
      // returns [1, 2, 3]
      

      创建广播变量后,应该在集群上运行的任何函数中使用它而不是值v,这样v就不会多次发送到节点。此外,对象v在广播后不应该被修改,以确保所有节点都获得相同的广播变量值(例如,如果变量稍后传送到新加入的节点)。

      要释放广播变量复制到执行器上的资源,请调用.unpersist()。如果广播之后再次使用,它将被重新广播。要永久释放广播变量使用的所有资源,请调用.destroy()。之后广播变量就不能使用了。请注意,这些方法默认情况下不会阻塞。要阻塞直到资源被释放,请在调用它们时指定blocking=true。

      Accumulators

      Accumulators是仅通过关联和交换运算“added”的变量,因此可以有效地支持并行。它们可用于实现计数器(如在 MapReduce 中)或求和。 Spark 原生支持数字类型的累加器,程序员可以添加对新类型的支持。

      作为用户,您可以创建命名或未命名的累加器。如下图所示,修改该累加器的阶段将在Web UI中显示一个命名累加器(在本例中为计数器)。Spark在“任务”表中显示由任务修改的每个累加器的值。

      然后,在集群上运行的task可以使用add方法add到Accumulators。但是,他们无法读取其值。只有driver program可以使用其value方法读取累加器的值。

      LongAccumulator accum = jsc.sc().longAccumulator();
      sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
      // ...
      // 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
      accum.value();
      // returns 10
      

      对于仅在action中执行的Accumulators更新,Spark保证每个任务对Accumulators的更新只会应用一次,即重新启动的任务不会更新值。在transformations中,用户应该知道,如果重新执行任务或作业阶段,每个任务的更新可能会应用不止一次。

      累加器不会改变 Spark 的惰性求值模型。如果它们是在 RDD 的操作中更新的,则只有当 RDD 作为action的一部分进行计算时,它们的值才会更新。因此,在像 map() 这样的惰性转换中进行累加器更新时,不能保证执行。下面的代码片段演示了这个属性:

      LongAccumulator accum = jsc.sc().longAccumulator();
      data.map(x -> { accum.add(x); return f(x); });
      // Here, accum is still 0 because no actions have caused the `map` to be computed.
      

      reference

      https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds