光速入门spark(待续)(1),2024年最新大数据开发 MVP模式详解

| spark.kubernetes.driverEnv.[EnvironmentVariableName] | (none) | Add the environment variable specified by EnvironmentVariableName to the Driver process. The user can specify multiple of these to set multiple environment variables. |

| spark.kubernetes.driver.secrets.[SecretName] | (none) | Add the Kubernetes Secret named SecretName to the driver pod on the path specified in the value. For example, spark.kubernetes.driver.secrets.spark-secret=/etc/secrets. |

| spark.kubernetes.executor.secrets.[SecretName] | (none) | Add the Kubernetes Secret named SecretName to the executor pod on the path specified in the value. For example, spark.kubernetes.executor.secrets.spark-secret=/etc/secrets. |

| spark.kubernetes.driver.secretKeyRef.[EnvName] | (none) | Add as an environment variable to the driver container with name EnvName (case sensitive), the value referenced by key key in the data of the referenced Kubernetes Secret. For example, spark.kubernetes.driver.secretKeyRef.ENV_VAR=spark-secret:key. |

| spark.kubernetes.executor.secretKeyRef.[EnvName] | (none) | Add as an environment variable to the executor container with name EnvName (case sensitive), the value referenced by key key in the data of the referenced Kubernetes Secret. For example, spark.kubernetes.executor.secrets.ENV_VAR=spark-secret:key. |

| spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.path | (none) | Add the Kubernetes Volume named VolumeName of the VolumeType type to the driver pod on the path specified in the value. For example, spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint. |

| spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.readOnly | (none) | Specify if the mounted volume is read only or not. For example, spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false. |

| spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].options.[OptionName] | (none) | Configure Kubernetes Volume options passed to the Kubernetes with OptionName as key having specified value, must conform with Kubernetes option format. For example, spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-pvc-claim. |

| spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.path | (none) | Add the Kubernetes Volume named VolumeName of the VolumeType type to the executor pod on the path specified in the value. For example, spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint. |

| spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.readOnly | false | Specify if the mounted volume is read only or not. For example, spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false. |

| spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].options.[OptionName] | (none) | Configure Kubernetes Volume options passed to the Kubernetes with OptionName as key having specified value. For example, spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-pvc-claim. |

| spark.kubernetes.memoryOverheadFactor | 0.1 | This sets the Memory Overhead Factor that will allocate memory to non-JVM memory, which includes off-heap memory allocations, non-JVM tasks, and various systems processes. For JVM-based jobs this value will default to 0.10 and 0.40 for non-JVM jobs. This is done as non-JVM tasks need more non-JVM heap space and such tasks commonly fail with “Memory Overhead Exceeded” errors. This prempts this error with a higher default. |

| spark.kubernetes.pyspark.pythonVersion | "2" | Python版本,如果使用python的话 |

其余spark配置参考:https://spark.apache.org/docs/2.4.8/configuration.html。

举例:提交spark到k8s

可以这样来提交一个任务,同时设置 driver 和 executor 的 CPU、内存的资源 request 和 limit 值(driver 的内存 limit 值为 request 值的 110%)。

./spark-submit

// 设置cluster模式启动

–deploy-mode cluster

–class org.apache.spark.examples.SparkPi

// 指定k8s apiserver的地址

–master k8s://https://172.20.0.113:6443

–kubernetes-namespace spark-cluster

// 指定k8s的serviceAccount

–conf spark.kubernetes.authenticate.driver.serviceAccountName=spark

// k8s资源限额

–conf spark.driver.memory=100G

–conf spark.executor.memory=10G

–conf spark.driver.cores=30

–conf spark.executor.cores=2

–conf spark.driver.maxResultSize=10240m

–conf spark.kubernetes.driver.limit.cores=32

–conf spark.kubernetes.executor.limit.cores=3

–conf spark.kubernetes.executor.memoryOverhead=2g

–conf spark.executor.instances=5

–conf spark.app.name=spark-pi

// spark创建Pod模板

–conf spark.kubernetes.driver.docker.image=sz-pg-oam-docker-hub-001.tendcloud.com/library/spark-driver:v2.1.0-kubernetes-0.3.1-1

–conf spark.kubernetes.executor.docker.image=sz-pg-oam-docker-hub-001.tendcloud.com/library/spark-executor:v2.1.0-kubernetes-0.3.1-1

–conf spark.kubernetes.initcontainer.docker.image=sz-pg-oam-docker-hub-001.tendcloud.com/library/spark-init:v2.1.0-kubernetes-0.3.1-1

// 提交真正的计算任务

local:///opt/spark/examples/jars/spark-examples_2.11-2.2.0-k8s-0.4.0-SNAPSHOT.jar 10000000

这将启动一个包含一千万个 task 的计算 pi 的 spark 任务,任务运行过程中,drvier 的 CPU 实际消耗大约为 3 核,内存 40G,每个 executor 的 CPU 实际消耗大约不到 1 核,内存不到 4G,我们可以根据实际资源消耗不断优化资源的 request 值。

PySpark库

什么是PySpark

前面使用过bin/pyspark 程序, 提供一个Python解释器执行环境来运行Spark任务,PySpark指的是Python的运行类库, 可以在Python代码中:import pyspark。

PySpark 是Spark官方提供的一个Python类库, 内置了完全的Spark API, 可以通过PySpark类库来编写Spark应用程序,并将其提交到Spark集群中运行.

下图是PySpark类库和标准Spark框架的简单对比:

可以通过Python自带的pip程序进行安装:pip install pyspark -i https://pypi.tuna.tsinghua.edu.cn/simple

应用入口:SparkContext

Spark Application程序入口为:SparkContext,任何一个应用首先需要构建SparkContext对象,如下两步构建:

  1. 第一步、创建SparkConf对象,设置Spark Application基本信息,比如应用的名称AppName和应用运行Master
  2. 第二步、基于SparkConf对象,创建SparkContext对象

WordCount代码实战

首先创建一个本地word.txt文件:

Hello Spark

Hello World

Hello Hello

然后编写对应处理脚本:

如果有如下报错:

24/03/17 19:46:06 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 2)

org.apache.spark.SparkException: Python worker failed to connect back.

at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:192)

at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)

at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)

at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)

at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)

at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)

at org.apache.spark.scheduler.Task.run(Task.scala:139)

at org.apache.spark.executor.Executor T a s k R u n n e r . TaskRunner. TaskRunner.anonfun$run 3 ( E x e c u t o r . s c a l a : 554 ) a t o r g . a p a c h e . s p a r k . u t i l . U t i l s 3(Executor.scala:554) at org.apache.spark.util.Utils 3(Executor.scala:554)atorg.apache.spark.util.Utils.tryWithSafeFinally(Utils.scala:1529)

at org.apache.spark.executor.Executor T a s k R u n n e r . r u n ( E x e c u t o r . s c a l a : 557 ) a t j a v a . b a s e / j a v a . u t i l . c o n c u r r e n t . T h r e a d P o o l E x e c u t o r . r u n W o r k e r ( T h r e a d P o o l E x e c u t o r . j a v a : 1136 ) a t j a v a . b a s e / j a v a . u t i l . c o n c u r r e n t . T h r e a d P o o l E x e c u t o r TaskRunner.run(Executor.scala:557) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor TaskRunner.run(Executor.scala:557)atjava.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)atjava.base/java.util.concurrent.ThreadPoolExecutorWorker.run(ThreadPoolExecutor.java:635)

at java.base/java.lang.Thread.run(Thread.java:833)

Caused by: java.net.SocketTimeoutException: Accept timed out

at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:708)

at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:752)

at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:675)

at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:641)

at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:617)

at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:574)

at java.base/java.net.ServerSocket.accept(ServerSocket.java:532)

at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:179)

… 15 more

就需要加上两个环境变量:

os.environ[‘PYSPARK_PYTHON’] = sys.executable

os.environ[‘PYSPARK_DRIVER_PYTHON’] = sys.executable

这两行代码的作用是设置环境变量,用于指定 Spark 使用的 Python 解释器以及驱动程序所使用的 Python 解释器。

os.environ[‘PYSPARK_PYTHON’] = sys.executable 将当前 Python 解释器的路径赋给环境变量 PYSPARK_PYTHON,这告诉 Spark 使用与当前 Python 解释器相同的解释器来执行 Python 代码。

os.environ[‘PYSPARK_DRIVER_PYTHON’] = sys.executable 将当前 Python 解释器的路径赋给环境变量 PYSPARK_DRIVER_PYTHON,这告诉 Spark 驱动程序使用与当前 Python 解释器相同的解释器。

from pyspark import SparkContext, SparkConf

import os

import sys

os.environ[‘PYSPARK_PYTHON’] = sys.executable

os.environ[‘PYSPARK_DRIVER_PYTHON’] = sys.executable

spark_config = SparkConf().setAppName(“minProject”).setMaster(“local[*]”)

spark_ctx = SparkContext(conf=spark_config)

spark读取本地文件

rdd = spark_ctx.textFile(“./word.txt”)

print(rdd.collect())

分割为单词

flatMapRdd = rdd.flatMap(lambda line: line.split(" "))

print(flatMapRdd.collect())

转换为二元组,表示每个单词出现一次

mapRDD = flatMapRdd.map(lambda x: (x, 1))

print(mapRDD.collect())

按照Key分组聚合

resultRDD = mapRDD.reduceByKey(lambda a, b: a + b)

第三步、输出数据

print(resultRDD.collect())

原理分析:

最终将结果保存到本地:

第三步、输出数据

print(resultRDD.collect())

resultRDD.saveAsTextFile(“./output.txt”)

Python On Spark 执行原理

PySpark宗旨是在不破坏Spark已有的运行时架构,在Spark架构外层包装一层Python API,借助Py4j实现Python和Java的交互,进而实现通过Python编写Spark应用程序,其运行时架构如下图所示。

Spark核心编程

Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:

➢ RDD : 弹性分布式数据集

➢ 累加器:分布式共享只写变量

➢ 广播变量:分布式共享只读变量

RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

  1. 弹性
  • 存储的弹性:内存与磁盘的自动切换;
  • 容错的弹性:数据丢失可以自动恢复;
  • 计算的弹性:计算出错重试机制;
  • 分片的弹性:可根据需要重新分片。
    1. 分布式:数据存储在大数据集群不同节点上
    2. 数据集:RDD封装了计算逻辑,并不保存数据
    3. 数据抽象:RDD是一个抽象类,需要子类具体实现
    4. 不可变:RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑
    5. 可分区、并行计算
    1. RDD(Resilient Distributed Dataset)弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。
    2. 所有的运算以及操作都建立在RDD 数据结构的基础之上。
    3. 可以认为RDD是分布式的列表List或数组Array,抽象的数据结构,RDD是一个抽象类Abstract Class和泛型Generic Type
    RDD的五大特性
    RDD是有分区的
    RDD的方法会作用到所有分区上
    RDD之间是有依赖关系的
    Key-Value型的RDD可以有分区器
    RDD的分区规划,会尽量靠近数据所在服务器
    WordCount案例分析
    RDD编程
    RDD的创建

    Spark RDD 编程的程序入口对象是SparkContext对象(不论何种编程语言),只有构建出SparkContext, 基于它才能执行后续的API调用和计算。

    本质上, SparkContext对编程来说, 主要功能就是创建第一个RDD出来。

    RDD的创建主要有2种方式:

    1. 通过并行化集合创建( 本地对象转分布式RDD )
    2. 读取外部数据源( 读取文件)
    并行化创建

    from pyspark import SparkContext, SparkConf

    import os

    import sys

    os.environ[‘PYSPARK_PYTHON’] = sys.executable

    os.environ[‘PYSPARK_DRIVER_PYTHON’] = sys.executable

    spark_config = SparkConf().setAppName(“create add”).setMaster(“local[4]”)

    spark_context = SparkContext(conf=spark_config)

    data = [1, 2, 3, 4, 5, 6, 6, 7, 8]

    rdd = spark_context.parallelize(data, 4)

    print(rdd.collect())

    读取文件创建

    from pyspark import SparkContext, SparkConf

    import os

    import sys

    os.environ[‘PYSPARK_PYTHON’] = sys.executable

    os.environ[‘PYSPARK_DRIVER_PYTHON’] = sys.executable

    spark_config = SparkConf().setAppName(“create add”).setMaster(“local[4]”)

    spark_context = SparkContext(conf=spark_config)

    rdd = spark_context.textFile(“./word.txt”)

    print(rdd.collect())

    RDD 算子
    Transformation算子
    map算子
    flatMap算子
    reduceByKey算子

    对于一个WordCount的应用,可以:

    groupBy算子

    说白了也就是,一个函数的return值是分组的key,条件是分组的条件。

    filter算子

    和python的filter一个意思,如果返回true则代表当前值有用,否则抛弃。

    distinct算子
    union算子
    join算子
    intersection算子
    glom算子

    import os

    import sys

    from pyspark import SparkContext, SparkConf

    os.environ[‘PYSPARK_PYTHON’] = sys.executable

    os.environ[‘PYSPARK_DRIVER_PYTHON’] = sys.executable

    spark_config = SparkConf().setAppName(“create add”).setMaster(“local[4]”)

    spark_context = SparkContext(conf=spark_config)

    rdd = spark_context.parallelize([1, 2, 3, 4, 5], 2)

    rdd2 = rdd.glom()

    print(rdd.collect())

    [1, 2, 3, 4, 5]

    print(rdd2.collect())

    [[1, 2], [3, 4, 5]]

    groupByKey算子
    sortBy算子

    import os

    import sys

    from random import shuffle

    from pyspark import SparkContext, SparkConf

    os.environ[‘PYSPARK_PYTHON’] = sys.executable

    os.environ[‘PYSPARK_DRIVER_PYTHON’] = sys.executable

    spark_config = SparkConf().setAppName(“create add”).setMaster(“local[4]”)

    spark_context = SparkContext(conf=spark_config)

    data = [number for number in range(100)]

    原地打乱

    shuffle(data)

    rdd = spark_context.parallelize(data, 10)

    rdd2 = rdd.glom()

    print(rdd2.collect())

    rdd3 = rdd2.sortBy(lambda x: x[-1], ascending=True, numPartitions=1)

    print(rdd3.collect())

    官方给的案例:

    Examples

    tmp = [(‘a’, 1), (‘b’, 2), (‘1’, 3), (‘d’, 4), (‘2’, 5)]

    sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()

    [(‘1’, 3), (‘2’, 5), (‘a’, 1), (‘b’, 2), (‘d’, 4)]

    sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()

    [(‘a’, 1), (‘b’, 2), (‘1’, 3), (‘d’, 4), (‘2’, 5)]

    sortByKey算子
    案例
    Action算子
    collection算子
    reduce算子
    fold算子
    first算子

    import os

    import sys

    from random import shuffle

    from pyspark import SparkContext, SparkConf

    os.environ[‘PYSPARK_PYTHON’] = sys.executable

    os.environ[‘PYSPARK_DRIVER_PYTHON’] = sys.executable

    spark_config = SparkConf().setAppName(“create add”).setMaster(“local[4]”)

    spark_context = SparkContext(conf=spark_config)

    data = [number for number in range(100)]

    原地打乱

    shuffle(data)

    rdd = spark_context.parallelize(data, 10)

    print(rdd.getNumPartitions())

    为了可观性,多加一步glom运算

    print(rdd.glom().collect())

    first_number = rdd.first()

    print(first_number)

    可以看出取的是第一个分区的第一个元素。

    take算子

    同理,take也是按顺序取出元素,当前分区不够就到下一个分区去找。

    top算子
    count算子
    takeSample算子
    takeOrdered算子
    foreach算子
    saveAsTextFile算子
    分区操作算子
    mapPartitions算子
    foreachPartition算子
    partitionBy算子
    repartition算子
    coalesce算子
    mapValues算子
    join算子
    groupByKey和reduceByKey的区别
    对于分区操作有什么要注意的地方

    尽量不要增加分区, 可能破坏内存迭代的计算管道。

    RDD的持久化

    RDD 的数据是过程数据

    RDD 的缓存

    RDD 的CheckPoint

    1. Cache和Checkpoint区别
    • Cache是轻量化保存RDD数据, 可存储在内存和硬盘, 是分散存储, 设计上数据是不安全的(保留RDD

      血缘关系)

    • CheckPoint是重量级保存RDD数据, 是集中存储, 只能存储在硬盘(HDFS)上, 设计上是安全的(不保留

      RDD血缘关系)

      1. Cache 和 CheckPoint的性能对比?
      • Cache性能更好, 因为是分散存储, 各个Executor并行执行, 效率高, 可以保存到内存中(占内存),更快
      • CheckPoint比较慢, 因为是集中存储, 涉及到网络IO, 但是存储到HDFS上更加安全(多副本)

        Spark案例练习

        搜索引擎日志分析案例

        使用搜狗的用户查询数据集:(我放到了https://scripterbro.github.io/files/SogouQ.txt)进行数据分析。

        自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

        深知大多数大数据工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

        因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。

        既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!

        由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

        如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)

        5cab.png)

        自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

        深知大多数大数据工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

        因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。

        [外链图片转存中…(img-jgOHfVG2-1712553639323)]

        [外链图片转存中…(img-Ay4QAw1I-1712553639324)]

        [外链图片转存中…(img-5tQcPHV1-1712553639324)]

        [外链图片转存中…(img-YHVc6cM6-1712553639324)]

        [外链图片转存中…(img-h7QaOmf1-1712553639324)]

        既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!

        由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

        如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)

        [外链图片转存中…(img-wHSypmFZ-1712553639325)]