我尝试使用handleSubmit
在Spark中添加具有行号的列,如下所示:
zipWithIndex
但是我试图在JAVA中做如下相同的事情
val df = sc.parallelize(Seq((1.0,2.0),(0.0,-1.0),(3.0,4.0),(6.0,-2.3))).toDF("x","y")
val rddzip = df.rdd.zipWithIndex;
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid",LongType,false)))
val dfZippedWithId = spark.createDataFrame(rddzip.map{ case (row,index) => Row.fromSeq(row.toSeq ++ Array(index))},newSchema)
我得到以下错误
JavaRDD<Row> rdd = (JavaRDD) df.toJavaRDD().zipWithIndex().map(t -> {
Row r = t._1;
Long index = t._2 + 1;
ArrayList<Object> list = new ArrayList<>();
for(Object item: JavaConverters.seqAsJavaListConverter(r.toSeq()).asJava()) {
list.add(item);
}
return RowFactory.create(JavaConverters.seqAsJavaListConverter(t._1.toSeq()).asJava().add(t._2));
});
StructType newSchema = df.schema()
.add(new StructField(name,DataTypes.LongType,true,Metadata.empty()));
return df.sparkSession().createDataFrame(rdd,newSchema);
有帮助吗?