如何将列表转换为具有多列的行

从csv文件创建一个DataFrame,处理每一行,想创建一个具有相同列数的新行。

val df = spark.read.format("csv").load("data.csv")
def process(line: Row) : Seq[String] = {
  val list = new ArrayList[String]
  for (i <- 0 to line.size-1) {
    list.add(line.getString(i).toUpperCase)
  }
  list.asScala.toSeq
}
val df2 = df.map(process(_))
df2.show

期望/希望得到:

+---+---+---+                                                                   
| _1| _2| _3|
+---+---+---+
| X1| X2| X3|
| Y1| Y2| Y3|
+---+---+---+

获取:

+------------+                                                                     
|       value|
+------------+
|[X1,X2,X3]|
|[Y1,Y2,Y3]|
+------------+

输入文件data.csv:

x1,x2,x3
y1,y2,y3

请注意,代码也应在此输入文件中起作用:

x1,x3,x4
y1,y3,y4

对于这个输入文件,我想查看结果

+---+---+---+---+                                                               
| _1| _2| _3| _4|
+---+---+---+---+
| X1| X2| X3| X4|
| Y1| Y2| Y3| Y4|
+---+---+---+---+

请注意,我在process()中使用了tpUpperCase()只是为了使简单的示例起作用。 process()中的实际逻辑可能要复杂得多。

panlikq 回答:如何将列表转换为具有多列的行

第二次更新,将rdd更改为行

@USML ,基本上将 Seq [String] 更改为 Row ,以便可以将rdd进行并行化。这是一个分布式并行集合,需要序列化

val df2 = csvDf.rdd.map(process(_)).map(a => Row.fromSeq(a)) 
//df2: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
// And we use dynamic Schema (e.g. same number of columns as csv
spark.createDataFrame(df2,schema = dynamicSchema).show(false)
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|X1 |X2 |X3 |
|Y1 |Y2 |Y3 |
+---+---+---+

已更改要求的更新 只要您正在阅读 CSV,最终输出将具有与我们使用 df相同 作为您的csv。架构,以在调用 process 方法后创建数据框架。试试这个:

val df = spark.read.format("csv").load("data.csv")
val dynamicSchema = df.schema // This makes sure to prserve  same number of columns
def process(line: Row) : Seq[String] = {
  val list = new ArrayList[String]
  for (i <- 0 to line.size-1) {
    list.add(line.getString(i).toUpperCase)
  }
  list.asScala.toSeq
}
val df2 = df.rdd.map(process(_)).map(a => Row.fromSeq(a)) // df2 is actually an RDD // updated conversion to Row

val finalDf = spark.createDataFrame(df2,schema = dynamicSchema) // We use same schema

finalDf.show(false)

文件内容 =>

cat data.csv
a1,b1,c1,d1
a2,b2,c2,d2

代码 =>

import org.apache.spark.sql.Row
val csvDf = spark.read.csv("data.csv")
csvDf.show(false)
+---+---+---+---+
|_c0|_c1|_c2|_c3|
+---+---+---+---+
|a1 |b1 |c1 |d1 |
|a2 |b2 |c2 |d2 |
+---+---+---+---+

def process(cols: Row): Row  = { Row("a","b","c","d") } // Check the Data Type

val df2 = csvDf.rdd.map(process(_)) // df2: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]

val finalDf = spark.createDataFrame(df2,schema = csvDf.schema)

finalDf.show(false)
+---+---+---+---+
|_c0|_c1|_c2|_c3|
+---+---+---+---+
|a  |b  |c  |d  |
|a  |b  |c  |d  |
+---+---+---+---+

要标记数据类型,以映射

更好地练习设置类型安全案例类 休息应该很容易

本文链接:https://www.f2er.com/3157069.html

大家都在问