无法通过mapPartitions()将数据集的行元素存储在变量中

我试图创建一个Spark数据集,然后使用mapPartitions尝试访问其每个元素并将它们存储在变量中。使用下面的代码来实现同样的效果:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val df = spark.sql("select col1,col2,col3 from table limit 10")

val schema = StructType(Seq(
              StructField("col1",StringType),StructField("col2",StructField("col3",StringType)))

val encoder = RowEncoder(schema)

df.mapPartitions{iterator => { val myList = iterator.toList
                 myList.map(x=> { val value1 = x.getString(0)
                                  val value2 = x.getString(1)
                                  val value3 = x.getString(2)}).iterator}} (encoder)

我针对此代码遇到的错误是:

<console>:39: error: type mismatch;
 found   : org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[org.apache.spark.sql.Row]
 required: org.apache.spark.sql.Encoder[Unit]
 val value3 = x.getString(2)}).iterator}} (encoder)

最终,我的目标是将行元素存储在变量中,并对它们执行一些操作。不知道我在这里想念什么。任何对此的帮助将不胜感激!

nghnyh 回答:无法通过mapPartitions()将数据集的行元素存储在变量中

实际上,您的代码有几个问题:

  1. 您的地图声明没有返回值,因此Unit
  2. 如果您从mapPartitions返回一个String元组,则不需要RowEncoder(因为您不需要返回Row,但是不需要Tuple3编码器,因为它是Product

您可以这样编写代码:

df
 .mapPartitions{itr => itr.map(x=> (x.getString(0),x.getString(1),x.getString(2)))}
 .toDF("col1","col2","col3") // Convert Dataset to Dataframe,get desired field names

但是您可以在DataFrame API中使用简单的select语句,而无需在这里使用mapPartitions

df
.select($"col1",$"col2",$"col3")
本文链接:https://www.f2er.com/3130167.html

大家都在问