返回信息流Spark的createDataFrame函数是通过函数到底是通过rdd[row]中的一条数据的schema来创建的data frame,还是怎么来创建的?比如说如果rdd[row]中每行的schema不完全相同,用rdd来createDataFrame会发生什么?我看了看源码也没弄明白这个问题:
/**
* Creates a `DataFrame` from an RDD[Row].
* User can specify whether the input rows should be converted to Catalyst rows.
*/
private[sql] def createDataFrame(
rowRDD: RDD[Row],
schema: StructType,
needsConversion: Boolean) = {
// TODO: use MutableProjection when rowRDD is another DataFrame and the applied
// schema differs from the existing schema on any field data type.
val catalystRows = if (needsConversion) {
val encoder = RowEncoder(schema)
rowRDD.map(encoder.toRow)
} else {
rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)}
}
val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
Dataset.ofRows(self, logicalPlan)
}
另外创建出来的data frame是带数据的data frame,还是只有一个data frame?
这是一条镜像帖。来源:北邮人论坛 / java / #62683同步于 2019/9/16
该镜像源已超过 30 天没有更新,可能在源站已被删除。
Java机器人发帖
关于Spark createDataFrame函数的两个问题
PMS
2019/9/16镜像同步5 回复
订阅后,新回复会通过你的通知中心匿名送达。
5 条回复
【 在 AA071427 的大作中提到: 】
: RDD 和 DF 都是抽象的概念,不持有数据
你看看官方文档 https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html 关于createDataFrame函数的例子,好像是带数据的啊:
>>> l = [('Alice', 1)]
>>> spark.createDataFrame(l).collect()
[Row(_1=u'Alice', _2=1)]
>>> spark.createDataFrame(l, ['name', 'age']).collect()
[Row(name=u'Alice', age=1)]
>>> d = [{'name': 'Alice', 'age': 1}]
>>> spark.createDataFrame(d).collect()
[Row(age=1, name=u'Alice')]
>>> rdd = sc.parallelize(l)
>>> spark.createDataFrame(rdd).collect()
[Row(_1=u'Alice', _2=1)]
>>> df = spark.createDataFrame(rdd, ['name', 'age'])
>>> df.collect()
[Row(name=u'Alice', age=1)]
>>> from pyspark.sql import Row
>>> Person = Row('name', 'age')
>>> person = rdd.map(lambda r: Person(*r))
>>> df2 = spark.createDataFrame(person)
>>> df2.collect()
[Row(name=u'Alice', age=1)]
>>> from pyspark.sql.types import *
>>> schema = StructType([
... StructField("name", StringType(), True),
... StructField("age", IntegerType(), True)])
>>> df3 = spark.createDataFrame(rdd, schema)
>>> df3.collect()
[Row(name=u'Alice', age=1)]
>>> spark.createDataFrame(df.toPandas()).collect()
[Row(name=u'Alice', age=1)]
>>> spark.createDataFrame(pandas.DataFrame([[1, 2]])).collect()
[Row(0=1, 1=2)]
>>> spark.createDataFrame(rdd, "a: string, b: int").collect()
[Row(a=u'Alice', b=1)]
>>> rdd = rdd.map(lambda row: row[1])
>>> spark.createDataFrame(rdd, "int").collect()
[Row(value=1)]
>>> spark.createDataFrame(rdd, "boolean").collect()
Traceback (most recent call last):
...
Py4JJavaError: ...
【 在 PMS 的大作中提到: 】
:
: 你看看官方文档 https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html 关于createDataFrame函数的例子,好像是带数据的啊:
:
: ...................
RDD只是描述了数据转换的过程,数据是通过RDD的描述去获得的,你把RDD理解成保存了数据的元信息就好了,真正的数据还是在存储里,RDD只是一个逻辑上的概念