Flink 1.9.0-更改状态对象后,状态反序列化失败

更改状态对象后,我们的状态无法加载。 (不足为奇)

我们的状态对象看起来像这样:

public class StateHolder {
    private Set<Object1> objects1 = new HashSet<>();
    private Set<Object2> objects2 = new HashSet<>();
    // 4 more sets of objects,Object3 to Object6 let's call them that
    // no args constructor,and getters and setters
...
}

它的用法如下:

ValueStateDescriptor<StateHolder> aggregateValueStateDescriptor = new ValueStateDescriptor<>(
                getDescriptorNamePrefix(STATE_PREFIX,STATE_NAME_COMMAND_AGGREGATE,DATE_OF_STATE_CREATION),TypeInformation.of(new TypeHint<StateHolder>() {
                })
        );
        commandAggregateState = getRuntimeContext().getState(aggregateValueStateDescriptor);

最近,我们将该字段添加到了Object3。我们为其设置默认值。它是一个字符串,定义为:private String newField = "";

这样做之后,我们得到以下异常:

com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 104
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.contrib.streaming.state.rocksDBValueState.value(rocksDBValueState.java:90)

很明显,我们的Set正在使用KryoSerializer反序列化,我想这是可以预料的。大概不是很理想。

我无法理解的是,这到底是为什么失败以及如何解决这个问题?因此我们在Object3类中添加了字段,但是在objects5字段(Set<Object5>)上反序列化失败。新字段会导致缓冲区意外移动,从而从错误的位置读取注册ID吗?

我们的POJO符合文档中列出的所有规则(也许这可能不相关,因为Kryo反序列化失败了)

* The class is public and standalone (no non-static inner class)
* The class has a public no-argument constructor
* All non-static,non-transient fields in the class (and all superclasses) are either public (and non-final) or have a public getter- and a setter- method that follows the Java beans naming conventions for getters and setters.

通过搜索异常,有一些建议,例如实现自定义序列化程序。 您是否同意这是正确的方法?如果是这样的话,通过谷歌搜索,我只会发现如何执行自定义的Avro和Protobuff序列化程序,而我们都不使用它们来保存状态。看来,我们的数据是通过PojoSerializer序列化的,该PojoSerializer使用不同的KryoSerializer对集合进行反序列化,其中之一显然失败了。我们是否应该为此失败字段使用自定义PojoSerializer创建新的自定义KryoSerializer,如果是这样,该自定义KryoSerializer应该是什么样子?我们是否应该具有符合java.util.Set的硬编码注册ID?

感谢您阅读所有这些内容,如果这是重复的问题,我深表歉意。我找不到一个与此类似的东西。

更新

因此,在建议将新字段初始化移动为NoArgsConstructor的默认值之后,我们原来的异常被包装了。

java.lang.RuntimeException: Error while getting state
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
at fake.package.pipeline.handler.codebook.function.CommandHandlerCodeBookFunction.initializestate(CommandHandlerCodeBookFunction.java:88)
at fake.package.pipeline.handler.Handler.open(Handler.java:104)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.open(KeyedCoProcessOperator.java:62)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: Error while trying to migrate rocksDB state.
at org.apache.flink.contrib.streaming.state.AbstractrocksDBState.migrateSerializedValue(AbstractrocksDBState.java:213)
at org.apache.flink.contrib.streaming.state.rocksDBKeyedStateBackend.migrateStateValues(rocksDBKeyedStateBackend.java:603)
at org.apache.flink.contrib.streaming.state.rocksDBKeyedStateBackend.updateRestoredStateMetaInfo(rocksDBKeyedStateBackend.java:532)
at org.apache.flink.contrib.streaming.state.rocksDBKeyedStateBackend.tryRegisterKvStateInformation(rocksDBKeyedStateBackend.java:482)
at org.apache.flink.contrib.streaming.state.rocksDBKeyedStateBackend.createInternalState(rocksDBKeyedStateBackend.java:643)
at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 11 more
Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 104
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
at org.apache.flink.contrib.streaming.state.AbstractrocksDBState.migrateSerializedValue(AbstractrocksDBState.java:210)
... 21 more
Boney2009 回答:Flink 1.9.0-更改状态对象后,状态反序列化失败

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/2478891.html

大家都在问