本文共 4352 字,大约阅读时间需要 14 分钟。
PySpark是Apache Spark在Python中的接口。它不仅允许您使用Python api编写Spark应用程序,而且还提供了用于在分布式环境中交互分析数据的PySpark shell。PySpark支持Spark的大部分特性,如Spark SQL、DataFrame、Streaming、MLlib(机器学习)和Spark Core。
对pyspark中的DataFrame API 快速简介:
PySpark DataFrames are lazily evaluated. They are implemented on top of RDDs. When Spark transforms data, it does not immediately compute the transformation but plans how to compute later. When actions such as collect() are explicitly called, the computation starts.import findsparkfindspark.init()from pyspark.sql import SparkSessionfrom datetime import datetime, dateimport pandas as pdfrom pyspark.sql import Rowspark = SparkSession.builder.getOrCreate()df = spark.createDataFrame( [ Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)), Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)), Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0)) ])df.show()df.printSchema()
记得加上
import findspark findspark.init()
df = spark.createDataFrame([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)), (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))], schema='a long, b double, c string, d date, e timestamp')df
pandas_df = pd.DataFrame({ 'a': [1, 2, 3], 'b': [2., 3., 4.], 'c': ['string1', 'string2', 'string3'], 'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)], 'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]})df = spark.createDataFrame(pandas_df)df
rdd = spark.sparkContext.parallelize([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)), (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))])df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])df
结果都一样,如下
df.collect()'''[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)), Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)), Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]'''
PySpark DataFrame还提供了转换回一个pandas DataFrame来利用pandas api。请注意,toPandas还将所有数据收集到驱动端,当数据太大而无法容纳到驱动端时,这些数据很容易导致内存不足错误。
PySpark DataFrame是延迟计算的,简单地选择一个列不会触发计算,但它会返回一个column实例。
df = spark.createDataFrame([ ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30], ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60], ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])df.show()
def plus_mean(pandas_df): return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()
df1 = spark.createDataFrame( [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)], ('time', 'id', 'v1'))df2 = spark.createDataFrame( [(20000101, 1, 'x'), (20000101, 2, 'y')], ('time', 'id', 'v2'))def asof_join(l, r): return pd.merge_asof(l, r, on='time', by='id')df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas( asof_join, schema='time int, id int, v1 double, v2 string').show()
DataFrame和Spark SQL共享相同的执行引擎,因此它们可以无缝地互换使用。例如,你可以注册DataFrame作为一个表,然后像下面这样轻松地运行SQL:
df.createOrReplaceTempView("tableA")spark.sql("SELECT count(*) from tableA").show()
±-------+
|count(1)| ±-------+ | 8| ±-------+转载地址:http://wnsl.baihongyu.com/