从 Chill 0.6.0 (Kryo 2.21) 迁移到 0.9.5 (Kryo 4.0.2) 并反序列化旧消息

我们正在使用 Chill-bijection 与 Kryo 对进出 Kafka 的消息进行序列化/反序列化。我们应用程序的旧版本使用依赖于 User 的 Chill 0.6.0,而我们应用程序的新版本使用依赖于 com.esotericsoftware.kryo.kryo-2.21.jar 的 Chill 0.9.5。

为了尽量减少停机时间,我们的应用程序的新版本需要能够读取旧版本应用程序写入的消息,但它失败并出现错误:

com.esotericsoftware.kryo-shaded-4.0.2.jar

基于此:https://github.com/EsotericSoftware/kryo/releases/tag/kryo-parent-4.0.0 我实现了自定义 org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition prod_2x02_external_entity_updates-0 at offset 8764198. If needed,please seek past the record to continue consumption. Caused by: com.twitter.bijection.InversionFailure: Failed to invert: [B@14122f45 at com.twitter.bijection.InversionFailure$$anonfun$partialFailure$1.applyOrElse(InversionFailure.scala:43) at com.twitter.bijection.InversionFailure$$anonfun$partialFailure$1.applyOrElse(InversionFailure.scala:42) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38) at scala.util.Failure.recoverWith(Try.scala:236) at com.twitter.bijection.Inversion$.attempt(Inversion.scala:32) at com.X.backend.serialization.CustomKafkaKryoDeserializer.deserialize(KafkaKryoSerialization.scala:38) at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1310) at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128) at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1541) at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1315) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) at com.X.common.kafka.KafkaSubscriber$$anonfun$brokerFound$1.applyOrElse(KafkaSubscriber.scala:163) at akka.actor.actor.aroundReceive(actor.scala:535) at akka.actor.actor.aroundReceive$(actor.scala:533) at com.X.common.kafka.KafkaSubscriber.aroundReceive(KafkaSubscriber.scala:29) at akka.actor.actorCell.receiveMessage(actorCell.scala:577) at akka.actor.actorCell.invoke(actorCell.scala:547) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: com.X.backend.DashboardExternalEntities$ExtMessage Serialization trace: entity (com.X.backend.QueueMessageProtocol$ExternalEntityUpdated) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813) at com.twitter.chill.SerDeState.readClassAndObject(SerDeState.java:61) at com.twitter.chill.KryoPool.fromBytes(KryoPool.java:94) at com.X.backend.serialization.CustomKafkaKryoDeserializer.$anonfun$deserialize$1(KafkaKryoSerialization.scala:38) at com.twitter.bijection.Inversion$.$anonfun$attempt$1(Inversion.scala:32) at scala.util.Try$.apply(Try.scala:213) ... 25 common frames omitted Caused by: java.lang.ClassnotFoundException: com.X.backend.DashboardExternalEntities$ExtMessage at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154) ... 35 common frames omitted 和相应的类来添加 ScalaKryoInstantiator

setOptimizedGenerics(true)

但我仍然遇到同样的错误。有没有办法使用 Kryo 4.0.2 读取由 Kryo 2.21 序列化的消息?消息类本身没有改变。

shong29 回答:从 Chill 0.6.0 (Kryo 2.21) 迁移到 0.9.5 (Kryo 4.0.2) 并反序列化旧消息

事实证明,消息包已重命名,因此 Kryo 无法找到正确的类。

尽管如此,即使恢复了包重命名,Kryo 4.0.2 和 3.0.3 也无法反序列化使用 Kryo 2.21 序列化的消息。

总而言之,我们决定用 Protobuf 替换 Kryo,并编写一个 [('Key4',['LONG_A',9]),('Key3',['LONG',7]),('Key1',1])] 将 Kafka 消息从 Chill-bijection 0.6.0 (Kryo 2.21) 转换为 Protobuf。

本文链接:https://www.f2er.com/1074062.html

大家都在问