Jackson 创建的 Java Iterator 问题破坏了 Scala Flink 应用程序

有一个 Scala Flink 应用程序,我在其中使用 Jackson 库解析 JSON。解析由自定义方法处理,并使用延迟启动概念来保持快速。

现在,无论出于何种原因,在 Flink 管道中进一步传递带有惰性值的模型会导致 util.Iterator 出现一些奇怪的错误,Kryo 是读取 JSON 的主干。我怀疑该问题实际上可能来自 .toList,但我不知道如何确认。值得注意的是,在同一个(flink)map 中急切地初始化模型(使用 case class Root(items: Collection[Data]) case class Data(data: Collection[Double]) def toRoot(node: JsonNode): Root = { val data: util.Iterator[JsonNode] = if (node.hasnonNull("items")) node.get("items").elements() else node.elements() val items: Collection[Data] = data.asScala.map(x => toData(x)) Root(items) } )修复了这个问题。但事实并非如此,我想进一步传递我的懒惰模型。

最后,我提供了一个带有演示代码的存储库,但我也想在 StackOverflow 中提供所有详细信息。

示例模型和解析定义:

{
  "items": [
    {
      "data": [
        11.71476355252127,48.342882259940176,507.3,11.714791605037252,...

JSON 数据类似于:

map

并在一个 env.fromCollection(Seq(input)) .map(i => flatten(read(i))) .print() 中完成所有工作:

env.fromCollection(Seq(input))
   .map(i => read(i))
   .map(i => flatten(i))
   .print()

但进一步传递失败:

Caused by: java.util.ConcurrentModificationException
    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:911)
    at java.util.ArrayList$Itr.next(ArrayList.java:861)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)

有错误:

  • Scala 2.11
Caused by: java.lang.NullPointerException
    at com.esotericsoftware.kryo.util.DefaultClassResolver.writeclass(DefaultClassResolver.java:80)
    at com.esotericsoftware.kryo.Kryo.writeclass(Kryo.java:488)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:57)
    ... 29 more
  • Scala 2.12
Time_Of_Day

我已经创建了一个演示项目,其中包含所有准备用 Scala 2.11 和 2.12 进行测试的示例,因为它实际上给出了不同的结果 可用 HERE

z4521215 回答:Jackson 创建的 Java Iterator 问题破坏了 Scala Flink 应用程序

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/1066034.html

大家都在问