纱线上的Flink发生意外的块数据错误

我有一个Flink应用程序,该应用程序使用来自Kafka群集的数据并运行SQL数据转换。我正在EMR上运行此应用程序,并且当我使用Java -jar选项运行时,该应用程序按预期运行。

但是,当使用命令flink运行yarn时,运行-m yarn-cluster -yn 2 -yjm 1g -ytm 2g JarFileName arg1 arg2

应用程序失败,并出现以下堆栈跟踪。

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
    at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
    ... 36 more
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:239)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.StreamCorruptedException: unexpected block data
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:224)

更新:

此问题已解决。根本原因是Flink的类加载器。我们最初使用的是Springboot maven插件。我们将其更改为maven shade plugin,并解决了该问题。

参考: Integration - Apache Flink + Spring Boot

engkeluu 回答:纱线上的Flink发生意外的块数据错误

您使用哪个Flink版本?您确定在群集上使用相同版本吗?

此类(反序列化)问题通常与用于序列化和反序列化的不同版本有关。

本文链接:https://www.f2er.com/3027532.html

大家都在问