Kryo编码器v.s. Spark数据集中的RowEncoder

以下示例的目的是了解Spark数据集中两种编码器的区别。

我可以这样做:

val df = Seq((1,"a"),(2,"d")).toDF("id","value")

import org.apache.spark.sql.{Encoder,Encoders,Row}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._

val myStructType = StructType(Seq(StructField("id",Integertype),StructField("value",StringType)))
implicit val myRowEncoder = RowEncoder(myStructType)

val ds = df.map{case row => row}
ds.show

//+---+-----+
//| id|value|
//+---+-----+
//|  1|    a|
//|  2|    d|
//+---+-----+

我也可以这样做:

val df = Seq((1,Row}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._

implicit val myKryoEncoder: Encoder[Row] = Encoders.kryo[Row] 

val ds = df.map{case row => row}
ds.show

//+--------------------+
//|               value|
//+--------------------+
//|[01 00 6F 72 67 2...|
//|[01 00 6F 72 67 2...|
//+--------------------+

代码的唯一区别是:一种使用 Kryo编码器,另一种使用 RowEncoder

问题:

  • 两者之间有什么区别?
  • 为什么一个显示编码的值,另一个显示人类可读的值?
  • 我们什么时候应该使用哪个?
liu2050031 回答:Kryo编码器v.s. Spark数据集中的RowEncoder

Encoders.kryo只需创建一个编码器,即可使用Kryo 序列化T类型的对象

RowEncoder是Scala中具有apply和其他工厂方法的对象。 RowEncoder可以从模式创建ExpressionEncoder [Row]。 在内部,apply为Row类型创建一个BoundReference,并为输入模式返回一个ExpressionEncoder [Row],一个CreateNamedStruct序列化器(使用serializerFor内部方法),一个模式的反序列化器和Row类型

RowEncoder了解架构,并将其用于序列化和反序列化。

Kryo比Java序列化(通常多达10倍)要快得多,而且更紧凑,但不支持所有Seri​​alizable类型,并且需要您预先注册要在程序中使用的类,以实现最佳性能。

Kryo可以有效地存储大型数据集和网络密集型应用程序。

有关更多信息,您可以参考以下链接:

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-RowEncoder.html https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-Encoders.html https://medium.com/@knoldus/kryo-serialization-in-spark-55b53667e7ab https://stackoverflow.com/questions/58946987/what-are-the-pros-and-cons-of-java-serialization-vs-kryo-serialization#:~:text=Kryo%20is%20significantly%20faster%20and,in%20advance%20for%20best%20performance

,

根据Spark的文档,SparkSQL不使用Kryo或Java 序列化(通常)。

Kyro适用于RDD,不适用于数据框或数据集。因此,问题是 一点点光束afaik。

Does Kryo help in SparkSQL? 这详细介绍了自定义对象,但是...

更新一些空闲时间后的答案

您的示例并不是我所说的自定义类型。他们是 只是带有基本体的结构。没问题。

Kyro是序列化程序,DS,DF使用编码器来获得列优势。 Spark内部使用Kyro进行改组。

此用户定义的示例case class Foo(name: String,position: Point)是我们可以使用DS或DF或通过kyro进行的示例。那是什么 Tungsten和Catalyst的工作重点是“了解 数据的结构”?并因此能够进行优化。您还可以获得 kyro的单个二进制值,我发现了一些有关如何 成功使用它,例如加入。

KYRO示例

import org.apache.spark.sql.{Encoder,Encoders,SQLContext}
import org.apache.spark.{SparkConf,SparkContext}
import spark.implicits._

case class Point(a: Int,b: Int)
case class Foo(name: String,position: Point)

implicit val PointEncoder: Encoder[Point] = Encoders.kryo[Point]
implicit val FooEncoder: Encoder[Foo] = Encoders.kryo[Foo]
 
val ds = Seq(new Foo("bar",new Point(0,0))).toDS
ds.show()

返回:

+--------------------+
|               value|
+--------------------+
|[01 00 D2 02 6C 6...|
+--------------------+

使用案例类Example的DS编码器

import org.apache.spark.sql.{Encoder,position: Point)

val ds = Seq(new Foo("bar",0))).toDS
ds.show()

返回:

+----+--------+
|name|position|
+----+--------+
| bar|  [0,0]|
+----+--------+

这使我成为使用Spark,钨,催化剂的一种方式。

现在,当涉及到Any时,更复杂的事情就是这样,但是Any并不是一件好事:

val data = Seq(
    ("sublime",Map(
      "good_song" -> "santeria","bad_song" -> "doesn't exist")
    ),("prince_royce",Map(
      "good_song" -> 4,"bad_song" -> "back it up")
    )
  )

val schema = List(
    ("name",StringType,true),("songs",MapType(StringType,true)
  )

val rdd= spark.sparkContext.parallelize(data) 

rdd.collect

val df = spark.createDataFrame(rdd)
df.show()
df.printSchema()

返回:

Java.lang.UnsupportedOperationException: No Encoder found for Any.

然后这个示例很有趣,它是一个有效的自定义对象用例 Spark No Encoder found for java.io.Serializable in Map[String,java.io.Serializable]。但我会远离这种情况。

结论

希望这会有所帮助。

本文链接:https://www.f2er.com/1458602.html

大家都在问