【spark床头书系列】PySpark 安装指南 PySpark DataFrame 、PySpark Pandas Api快速入门权威指南

PySpark 安装指南 PySpark DataFrame 、PySpark Pandas Api快速入门用法示例权威指南点击这里看全文

文章目录

  • PySpark 安装指南
    • 支持的Python版本
    • 使用PyPI安装
    • 使用Conda安装
    • 手动下载安装(最常用)
    • 从源代码构建安装
    • 依赖项
  • PySpark DataFrame 快速入门指南
    • 创建DataFrame
    • 选择和访问数据
    • 应用函数
    • 分组数据
    • 数据输入/输出
    • 使用SQL
  • 快速入门:Spark Connect
    • 启动带有Spark Connect的Spark服务器
    • 连接到Spark Connect服务器
    • 创建DataFrame
  • 快速入门:Spark上的Pandas API
    • 对象创建
      • 具有特定数据类型
      • 显示数据的前几行
      • 显示索引、列和底层numpy数据
      • 显示数据的快速统计摘要
      • 转置数据
      • 按索引排序
      • 按值排序
    • 缺失数据
      • 删除任何具有缺失数据的行。
      • **填充缺失值**
    • 操作
      • 统计
      • Spark配置
    • 分组
    • 绘图
    • 输入/输出数据
      • CSV
      • Parquet
      • Spark IO
  • 往期高质量文档

PySpark 安装指南

PySpark是Apache Spark官方发布的一部分,可以在Apache Spark网站上获取。对于Python用户,PySpark还提供了从PyPI进行pip安装的方式。这通常适用于本地使用或作为连接到集群的客户端,而不是设置一个集群本身。

本页面提供了使用pip、Conda、手动下载和从源代码构建的PySpark安装说明。

支持的Python版本

Python 3.8及以上版本。

使用PyPI安装

使用PyPI进行PySpark安装的方法如下:

pip install pyspark

如果您想为特定组件安装额外的依赖项,可以按照以下方式安装:

# Spark SQL
pip install pyspark[sql]
# 在Spark上使用pandas API
pip install pyspark[pandas_on_spark] plotly  # 如果需要绘制数据,还可以安装plotly。
# Spark Connect
pip install pyspark[connect]

对于带有/不带有特定Hadoop版本的PySpark,可以使用PYSPARK_HADOOP_VERSION环境变量进行安装:

PYSPARK_HADOOP_VERSION=3 pip install pyspark

默认发行版使用Hadoop 3.3和Hive 2.3。如果用户指定不同版本的Hadoop,pip安装将自动下载并使用PySpark所需的不同版本。根据网络和镜像选择的情况,下载可能需要一些时间。可以设置PYSPARK_RELEASE_MIRROR环境变量手动选择镜像以加快下载速度。

PYSPARK_RELEASE_MIRROR=http://mirror.apache-kr.org PYSPARK_HADOOP_VERSION=3 pip install

建议在pip命令中使用-v选项以跟踪安装和下载的状态:

PYSPARK_HADOOP_VERSION=3 pip install pyspark -v

PYSPARK_HADOOP_VERSION支持以下值:

  • without:使用用户提供的Apache Hadoop构建的Spark
  • 3:为Apache Hadoop 3.3及更高版本预构建的Spark(默认)

请注意,带有/不带有特定Hadoop版本的PySpark安装是实验性的。它可能会在次要版本之间发生更改或删除。

使用Conda安装

Conda是一个开源的包管理和环境管理系统(由Anaconda开发),最好通过Miniconda或Miniforge进行安装。该工具既跨平台又与语言无关,在实际应用中,Conda可以取代pip和virtualenv。

Conda使用所谓的"channels"来分发软件包。除了Anaconda自身的默认channel之外,最重要的channel是conda-forge。conda-forge是社区驱动的打包项目,提供了最广泛和最新的软件包(通常也是Anaconda channel的上游)。

从终端创建一个新的Conda环境并激活它,步骤如下:

conda create -n pyspark_env
conda activate pyspark_env

激活环境后,使用以下命令在同一会话中安装pyspark、所选Python版本以及其他您想要与pyspark一起使用的包(也可以分步安装):

conda install -c conda-forge pyspark  # 在这里还可以添加"python=3.8 some_package [etc.]"来指定Python版本和其他包

请注意,Conda下的PySpark由社区单独维护;虽然新版本通常会很快地打包发布,但其在conda(-forge)中的可用性不直接与PySpark的发布周期同步。

虽然在Conda环境中使用pip是可行的(使用与上述相同的命令),但不建议这样做,因为pip与Conda不兼容。

有关有用的Conda命令的简要摘要,请参阅其Cheat Sheet。

手动下载安装(最常用)

PySpark包含在Apache Spark网站提供的发行版中。您可以从该网站下载所需的发行版,并将tar文件解压到您希望安装Spark的目录中,例如:

tar xzvf spark-3.5.0-bin-hadoop3.tgz

确保SPARK_HOME环境变量指向解压后的目录,并更新PYTHONPATH环境变量以便找到SPARK_HOME/python/lib下的PySpark和Py4J库。以下是一个示例:

cd spark-3.5.0-bin-hadoop3
export SPARK_HOME=`pwd`
export PYTHONPATH=$(ZIPS=("$SPARK_HOME"/python/lib/*.zip); IFS=:; echo "${ZIPS[*]}"):$PYTHONPATH

从源代码构建安装

要从源代码安装PySpark,请参考构建Spark的相关文档。

依赖项

下表列出了PySpark所需的一些依赖项及其支持的版本:

包名 支持的版本 备注
py4j >=0.10.9.7 必需
pandas >=1.0.5 Spark SQL和Spark Connect所需;Spark SQL可选
pyarrow >=4.0.0 Spark SQL和Spark Connect所需;Spark SQL可选
numpy >=1.15 Spark SQL和MLLib DataFrame API所需;Spark SQL可选
grpcio >=1.48,<1.57 Spark Connect所需
grpcio-status >=1.48,<1.57 Spark Connect所需
googleapis-common-protos ==1.56.4 Spark Connect所需

请注意,PySpark要求Java 8或更高版本,并正确设置JAVA_HOME环境变量。如果使用JDK 11,请设置-Dio.netty.tryReflectionSetAccessible=true以启用与Arrow相关的功能,并参考下载指南。

这是PySpark的安装指南,您可以根据自己的环境和需求选择适合您的安装方法。如果您需要更详细的安装说明和指导,请参考官方文档或相关资源。

PySpark DataFrame 快速入门指南

本文是PySpark DataFrame API的简短介绍和快速入门。PySpark DataFrames是惰性求值的,它们是建立在RDD之上的。当Spark对数据进行转换时,并不立即计算转换结果,而是计划如何在以后进行计算。只有在显式调用collect()等操作时,计算才会开始。本文展示了DataFrame的基本用法,主要面向新用户。您可以在快速入门页面上的“在线笔记本:DataFrame”中自己运行最新版本的这些示例。

Apache Spark文档网站还提供了其他有用的信息,包括最新版本的Spark SQL和DataFrames、RDD编程指南、结构化流处理编程指南、Spark流处理编程指南和机器学习库(MLlib)指南。

PySpark应用程序从初始化SparkSession开始,SparkSession是PySpark的入口点,如下所示。如果通过pyspark可执行文件在PySpark shell中运行它,则shell会自动将会话创建在变量spark中。

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

创建DataFrame

可以使用pyspark.sql.SparkSession.createDataFrame方法创建一个PySpark DataFrame,通常通过传递一个列表、元组、字典和pyspark.sql.Rows的列表,一个pandas DataFrame或一个由此类列表组成的RDD来实现。pyspark.sql.SparkSession.createDataFrame方法可以通过scheme参数指定DataFrame的模式。当省略该参数时,PySpark会通过从数据中取样来推断相应的模式。

首先,可以从一组行创建一个PySpark DataFrame:

from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

使用显式模式创建一个带有模式的PySpark DataFrame:

df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)