中⽂⽂档pyspark.sql.DataFrame class pyspark.sql.DataFrame(jdf, sql_ctx)
分布式的收集数据分组到命名列中。
⼀个DataFrame相当于在Spark SQL中⼀个相关的表,可在SQLContext使⽤各种⽅法创建,
2.1 agg(*exprs)
没有组的情况下聚集整个DataFrame (df.groupBy.agg()的简写)。
>>>l=[('cassie',5),('beiwang',4),('xs',2)]
>>>
>>>df = ateDataFrame(l,['name','age'])
>>>
>>>df.agg({"age": "max"}).collect()[Row(max(age)=5)]
>>>
>>>from pyspark.sql importfunctions as F
铁水预处理
>>>
>>> df.agg(F.min(df.age)).collect()
>>>
[Row(min(age)=2)]
2.2 alias(alias)
In [57]: l = [('cassie',2), ('beiwang',3)]
In [58]: df = ateDataFrame(l,['name', 'age'])
In [59]: from pyspark.sql.functions import *
In [60]: df1 = df.alias('df1')
In [61]: df2 = df.alias('df2')
In [62]: join_df = df1.join(df2, col("df1.name")==col("df2.name"), 'inner')
In [63]: join_df.select("df1.name")
Out[63]: DataFrame[name: string]
In [64]: join_df.select(col("df1.name"))
Out[64]: DataFrame[name: string]
In [65]: join_df.select(col("df1.name")).collect()
Out[65]: [Row(name=u'beiwang'), Row(name=u'cassie')]
In [66]: join_df.select("df1.name").collect()
Out[66]: [Row(name=u'beiwang'), Row(name=u'cassie')]
2.3 cache()
⽤默认的存储级别缓存数据(MEMORY_ONLY_SER).
2.4 coalesce(numPartitions)
返回⼀个有确切的分区数的分区的新的DataFrame。 与在⼀个RDD上定义的合并类似, 这个操作产⽣⼀个窄依赖。 如果从1000个分区到100个分区,不会有shuffle过程, ⽽是每100个新分区会需要当前分区的10个。
2.5 collect()
返回所有的记录数为⾏的列表。
>>> df.collect()
>>>
多西环素[Row(name=u'cassie', age=2), Row(name=u'beiwang', age=3)]
2.6 columns
>>> df.columns
>>>
['age','name']
大律师巴布
2.7 corr(col1, col2, method=None)
计算⼀个DataFrame相关的两列为double值。通常只⽀持⽪尔森相关系数。()和()类似。 参数:● col1 – 第⼀列的名称
参数:● col1
● col2 – 第⼆列的名称
● col2
● method
● method – 相关⽅法.当前只⽀持⽪尔森相关系数
('age','hobby')
2.8 count()
返回DataFrame的⾏数。
>>> df.count()2
>>>
2.9 cov(col1, col2)
计算由列名指定列的样本协⽅差为double值。v()和v()类似。 参数:● col1
参数:● col1 – 第⼀列的名称
● col2 – 第⼆列的名称
● col2界首教育网
v('hobby','age')
2.10 crosstab(col1, col2)
计算给定列的分组频数表,也称为相关表。每⼀列的去重值的个数应该⼩于1e4.最多返回1e6个⾮零对.每⼀⾏的第⼀列会是col1的去重值,列名称是col2的去重值。第⼀列的名称是$col1_$col2. 没有出现的配对将以零作为计数。sstab() and
参数:● col1 – 第⼀列的名称. 去重项作为每⾏的第⼀项。
参数:● col1
● col2 – 第⼆列的名称. 去重项作为DataFrame的列名称。
● col2
sstab("hobby", "age").show()
+---------+---+---+
|hobby_age| 2| 3|
+---------+---+---+
| 5| 0| 1|
| 10| 1| 0|
+---------+---+---+
2.11 cube(*cols)
创建使⽤指定列的当前DataFrame的多维⽴⽅体,这样可以聚合这些数据。
df.cube('hobby', df.age).count().show()
+-----+----+-----+
|hobby| age|count|
+-----+----+-----+
| 5|null| 1|
| null|null| 2|
| 10|null| 1|
| 10| 2| 1|
| null| 2| 1|
| 5| 3| 1|
| null| 3| 1|
+-----+----+-----+
2.12 describe(*cols)
计算数值列的统计信息。
一本珍贵的书包括计数,平均,标准差,最⼩和最⼤。如果没有指定任何列,这个函数计算统计所有数值列。
2.13 distinct()
返回⾏去重的新的DataFrame。
df.distinct().count()
2.14 drop(col)
返回删除指定列的新的DataFrame
df.drop('hobby').collect()中美外交档案解密
2.15 dropDuplicates(subset=None)
返回去掉重复⾏的⼀个新的DataFrame,通常只考虑某⼏列。
drop_duplicates()和dropDuplicates()类似。
df.dropDuplicates().show()
>>>df.dropDuplicates(['name','height']).show()
2.16 drop_duplicates(subset=None)
与以上相同。
2.17 dropna(how='any', thresh=None, subset=None)
返回⼀个删除null值⾏的新的DataFrame。dropna()和dataframenafunctions.drop()类似。
参数:● how – 'any'或者'all'。如果'any',删除包含任何空值的⾏。如果'all',删除所有值为null的⾏。
● thresh – int,默认为None,如果指定这个值,删除⼩于阈值的⾮空值的⾏。这个会重写'how'参数。
● subset – 选择的列名称列表。
df.na.drop().show()
dfnew.na.drop(how='all',thresh=2).show()
2.18 dtypes
返回所有列名及类型的列表。
>>> df.dtypes
>>>
[('age','int'), ('name','string')]
2.19 explain(extended=False)
将(逻辑和物理)计划打印到控制台以进⾏调试。
参数:● extended
参数:● extended – boolean类型,默认为False。如果为False,只打印物理计划。
2.20 fillna(value, subset=None)
替换空值,和na.fill()类似,DataFrame.fillna()和dataframenafunctions.fill()类似。
参数:● value - 要代替空值的值有int,long,float,string或dict.如果值是字典,subset参数将被忽略。值必须是要替换的列的映射,替换值必须是int,long,float或者string.
● subset - 要替换的列名列表。在subset指定的列,没有对应数据类型的会被忽略。例如,如果值是字符串,subset包含⼀个⾮字符串的列,这个
⾮字符串的值会被忽略。
dfnew.na.fill(50).show()
dfnew.na.fill({'age': 50, 'name': 'unknown'}).show()
2.21 filter(condition)
⽤给定的条件过滤⾏。
where()和filter()类似。
参数:● 条件 - ⼀个列的bool类型或字符串的SQL表达式。df.where(df.age == 2).collect()
df.filter(df.age == 2).collect()
2.22 first()
返回第⼀⾏。
>>> df.first()
>>>
Row(age=2, name=u'Alice')
2.23 flatMap(f)
返回在每⾏应⽤F函数后的新的RDD,然后将结果压扁。
是df.rdd.flatMap()的简写。
>>>
>>>df.rdd.flatMap(lambda p: p.name).collect()
[u'A', u'l', u'i', u'c', u'e', u'B', u'o', u'b']
2.24 foreach(f)
应⽤f函数到DataFrame的所有⾏。
是df.rdd.foreach()的简写。
def f(person):
print(person.name)
>>>
>>> df.foreach(f)
Alice
2.25 foreachPartition(f)
应⽤f函数到DataFrame的每⼀个分区。
是 df.rdd.foreachPartition()的缩写。
>>>def f(people):
>>>