如何将RDD [Map [String,Any]]转换为数据框?

我正在使用 RDD [Map [String,Any]] ,并且我正在尝试将其转换为数据框。我没有可以指定数据框的架构。

我尝试执行 rdd.toDF ,但这没有帮助。它引发了如下错误。

Exception in thread "main" java.lang.ClassnotFoundException: scala.Any
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.javaClass(JavaMirrors.scala:555)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1211)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1203)
    at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toJava$1.apply(TwoWayCaches.scala:49)
    at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
    at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
    at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toJava(TwoWayCaches.scala:44)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1203)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeclass(JavaMirrors.scala:194)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeclass(JavaMirrors.scala:54)
    at org.apache.spark.sql.catalyst.ScalaReflection$.getclassFromType(ScalaReflection.scala:700)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:84)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:65)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:64)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:512)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:445)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:445)
    at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:434)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
    at org.apache.spark.sql.SQLImplicits.newMapEncoder(SQLImplicits.scala:172)

样本输入

val data: RDD[Map[String,Any]] = appContext.sc.parallelize(List(
      Map("A" -> "B"),//Value could be String
      Map("C" -> 123),//Value could be Numerical(Long,Double,Int etc)
      Map("D" -> Map("E" -> "F")),// Could be another Map
      Map("G" -> List("H","I")),// List of values
      Map("J" -> List(              // List Of Maps
        Map("K" -> "L"),Map("M" -> "N")
      ))
    ))

通过执行以下操作(JsonUtils是Jackson的包装器),我能够将其放入数据帧,但是却给我带来了性能问题。

def convert(data: RDD[Map[String,Any]]): DataFrame = {
        sparkSession.read.json(data.map(each => JsonUtils.toJson(each)))
      }

还有其他方法可以用来实现更好的性能吗?任何建议都非常感谢!

更新:我没有使用DataFrame进行任何此类处理。我只想以3种不同的格式编写输出,然后转换为DataFrame是我能找到的获得一致输出的最佳方法。在不实际转换为Dataframe的情况下实现此目的的任何其他建议也将很有帮助。

df.write.avro("/path/to/avroFile")
df.write.parquet("/path/to/parquetFile")
df.write.json("/path/to/jsonFile")
taoguolin 回答:如何将RDD [Map [String,Any]]转换为数据框?

您将无法将包含Any的RDD转换为数据框。

但是,也许您可​​以分离您的初始RDD(也许对您有用),例如:

一个仅具有Map(String,String)的RDD,另一个具有Map(String,Int)的RDD,等等。

拥有RDD之后,您可以使用toDF方法将它们转换为DF并最终加入它们,因此最终您将拥有一个具有以下内容的数据框:

+-----+-------------+----------+-----------------+
| Key | StringValue | IntValue |    MapValue     |
+-----+-------------+----------+-----------------+
| A   | SomeString  |      123 | Map("A" -> "B") |
| B   | SomeString  |      456 | Map("B" -> "C") |
+-----+-------------+----------+-----------------+
本文链接:https://www.f2er.com/3105809.html

大家都在问