使用Kryo在Spark-shell和Scala罐中注册复杂的Scala类

我有一个新的spark 2.3.1应用程序...可以运行一会儿,但是现在随着数据量的增加而损坏。

原始错误是kryo序列化问题... com.esotericsoftware.kryo.KryoException: java.lang.NegativeArraySizeException失败。最奇怪的部分是它不一致...如果我在非共享群集上的相同数据上运行相同的代码,相同的数据,则可能失败也可能不会失败,并且似乎是完全随机的。

我将spark.kryoserializer.buffer.max2047m(我的默认设置)提高到256m(最大值),只是为了查看会发生什么,并且它仍然失败,并出现相同的错误。我还尝试过增加失败的RDD的并行性(每个执行器从3倍增加6倍),并且没有运气。

现在,我正在尝试在spark-shell --conf spark.kryo.registrationRequired=true中运行代码段,以找到我需要注册的所有类,以在序列化时缩小大小,然后将它们逐步添加到--conf 'spark.kryo.classesToRegister=org.myorg.MyClass1,org.myorg.MyClass2'中,以后再移动将它们全部找到之后,将它们放进罐子(conf.registerKryoClasses(Array(classOf[MyClass1],classOf[MyClass2]))中(远远超出我的预期)。

有一个我绝对不知道如何注册的人。错误看起来像这样...

Caused by: java.lang.IllegalArgumentException: Class is not registered: org.myorg.MyClass[]
Note: To register this class use: kryo.register(org.myorg.MyClass[].class);

我怀疑这是诸如Iterable[MyClass]之类的其他类的参数class myouterClass(val mcs: Iterable[MyClass]),但是我尝试注册的所有方法均无法正常工作。我相信MyClass[]java.lang.Array[MyClass],但我尝试注册我能想到的ArrayIterable[]等的每种组合,并且没有运气注册它。

对于在命令行启动spark-shell以及最终在代码中注册IterableListTupleN的语法有何建议?最终,我还会有一些非常嵌套的元组,但是我还没走那么远。

我在stackoverflow中可以找到的最接近的结果是在这里,但是我也无法使它起作用。 Require kryo serialization in Spark (Scala)

谢谢。

编辑

为了澄清...成功注册MyClass后,我仍然收到错误消息Class is not registered: MyClass[],但我不知道[]到底是什么或如何注册使那些消失。

iCMS 回答:使用Kryo在Spark-shell和Scala罐中注册复杂的Scala类

如果您的班级名称为MyClass,请尝试向[LMyClass;注册

conf.registerKryoClasses(Array( Class.forName("[LMyClass;")))

它应该为MyClass加载和注册数组类

本文链接:https://www.f2er.com/2157838.html

大家都在问