本文共 2971 字,大约阅读时间需要 9 分钟。
PySpark是Apache Spark在Python中的高级接口,它不仅允许开发者利用Python API编写Spark应用程序,还能在分布式环境中通过PySpark shell交互分析数据。PySpark支持Spark SQL、DataFrame、Streaming、MLlib以及Spark Core等核心功能,为数据处理和分析提供了强大工具。
PySpark DataFrame是Spark SQL的Python实现,提供了一个类似Pandas的API。它基于RDD(弹性分布式数据集),并支持延迟计算。当数据操作或转换触发时,Spark会规划如何执行,而不是立即计算。例如,collect()方法会将数据收集到驱动端执行,但对于大数据集,这会导致内存不足问题。
PySpark DataFrame可以通过多种方式创建:
import findsparkimport pyspark.sql as psfrom datetime import datetime, datespark = ps.SparkSession.builder.getOrCreate()rows = [ ps.Row(a=1, b=2.0, c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)), ps.Row(a=2, b=3.0, c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)), ps.Row(a=3, b=4.0, c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))]df = spark.createDataFrame(rows)df.show()
df = spark.createDataFrame( [ (1, 2.0, 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)), (2, 3.0, 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4.0, 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0)) ], schema='a long, b double, c string, d date, e timestamp')
import pandas as pdpandas_df = pd.DataFrame({ 'a': [1, 2, 3], 'b': [2.0, 3.0, 4.0], 'c': ['string1', 'string2', 'string3'], 'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)], 'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]})df = spark.createDataFrame(pandas_df) rdd = spark.sparkContext.parallelize([ (1, 2.0, 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)), (2, 3.0, 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4.0, 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))])df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
PySpark DataFrame提供了丰富的操作功能,包括数据筛选、聚合、转换和函数应用。
# 选择特定列color_col = df.select('color')color_col.show() # 分组后应用函数def plus_mean(df): return df.assign(v1=df['v1'] - df['v1'].mean())# 按照颜色分组并应用函数df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show() def asof_join(l, r): return pd.merge_asof(l, r, on='time', by='id')# 定义数据框df1 = spark.createDataFrame([ (20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)], ('time', 'id', 'v1'))df2 = spark.createDataFrame([ (20000101, 1, 'x'), (20000101, 2, 'y')], ('time', 'id', 'v2'))# 数据合并df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(asof_join, schema='time int, id int, v1 double, v2 string').show() PySpark DataFrame可以通过show()方法查看数据,并使用collect()或toPandas()将数据收集到驱动端进行本地处理:
data = df.collect()print(data)
PySpark DataFrame可以与Spark SQL无缝集成,通过注册表并运行SQL查询:
df.createOrReplaceTempView('tableA')result = spark.sql('SELECT count(*) FROM tableA')result.show() PySpark为数据分析和处理提供了强大的工具,适用于结构化数据处理、机器学习模型训练、数据流处理等多种场景。通过合理使用PySpark功能,开发者可以充分发挥Spark平台的计算能力,实现高效的数据分析任务。
转载地址:http://wnsl.baihongyu.com/