如何配置Spark以将KryoSerializer用于lambda?还是我在Spark中发现了错误?我们对其他地方的数据序列化没有任何问题,只是这些lambda使用默认值而不是Kryo。
代码如下:
JavaPairRDD<String,IonValue> rdd; // provided
IonSexp filterExpression; // provided
Function<Tuple2<String,IonValue>,Boolean> filterFunc = record -> myCustomFilter(filterExpression,record);
rdd = rdd.filter(filterFunc);
引发异常:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.Closurecleaner$.ensureSerializable(Closurecleaner.scala:403)
at org.apache.spark.util.Closurecleaner$.org$apache$spark$util$Closurecleaner$$clean(Closurecleaner.scala:393)
at org.apache.spark.util.Closurecleaner$.clean(Closurecleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:388)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.filter(RDD.scala:387)
at org.apache.spark.api.java.JavaPairRDD.filter(JavaPairRDD.scala:99)
at com.example.Someclass.process(Someclass.java:ABC)
{more stuff}
Caused by: java.io.NotSerializableException: com.amazon.ion.impl.lite.IonSexpLite
Serialization stack:
- object not serializable (class: com.amazon.ion.impl.lite.IonSexpLite,value: (and (equals (literal 1) (path marketplace_id)) (equals (literal 351) (path product gl_product_group))))
- element of array (index: 1)
- array (class [Ljava.lang.Object;,size 2)
- field (class: java.lang.invoke.SerializedLambda,name: capturedArgs,type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda,SerializedLambda[capturingClass=class com.example.Someclass,functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;,implementation=invokeSpecial com/example/Someclass.lambda$process$8f20a2d2$1:(Lcom/amazon/ion/IonSexp;Lscala/Tuple2;)Ljava/lang/Boolean;,instantiatedMethodType=(Lscala/Tuple2;)Ljava/lang/Boolean;,numCaptured=2])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class com.example.Someclass$$Lambda$36/263969036,com.example.Someclass$$Lambda$36/263969036@31880efa)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$filter$1,name: f$1,type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$filter$1,<function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.Closurecleaner$.ensureSerializable(Closurecleaner.scala:400)
... 18 more
在这种情况下,有问题的filterExpression
是一个Ion S-Expression对象,它没有实现java.io.Serializable
。我们正在使用Kryo序列化器,并对其进行了注册和配置,以便可以对其进行序列化。
初始化火花配置时的代码:
sparkConf = new SparkConf().setappName("SomeAppName").setMaster("MasterLivesHere")
.set("spark.serializer",KryoSerializer.class.getcanonicalName())
.set("spark.kryo.registrator",KryoRegistrator.class.getcanonicalName())
.set("spark.kryo.registrationRequired","false");
我们的注册人代码:
kryo.register(com.amazon.ion.IonSexp.class);
kryo.register(Class.forName("com.amazon.ion.impl.lite.IonSexpLite"));
如果我尝试使用以下代码手动序列化该lambda
SerializationUtils.serialize(filterFunc);
由于filterExpression
无法序列化,因此失败并出现与预期相同的错误。但是,以下代码有效:
sparkContext.env().serializer().newInstance().serialize(filterFunc,ClassTag$.MODULE$.apply(filterFunc.getclass()));
这又是预期的,因为我们的Kryo设置能够处理这些对象。
所以我的问题/困惑是,当我们明确配置了Lambda以使其使用Kryo时,为什么Spark尝试使用org.apache.spark.serializer.JavaSerializer
序列化该lambda?