连接到AVRO架构注册表的SerializationException

我有4个使用者3个在kafka客户端的0.10.0.0版本上,但是其中一个已经移至2.0.0版。

当我调用RestService.getId来获取我的AVRO模式的版本时,它会在具有早期堆栈版本的三个使用方上成功,但在2.0.0版本上却失败,并带有此堆栈跟踪。

org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 18
Caused by: java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:210)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
    at sun.security.ssl.InputRecord.read(InputRecord.java:503)
    at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)
    at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385)
    at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413)
    at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397)
    at sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)
    at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1564)
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)
    at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
    at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:347)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:185)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:409)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:402)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:119)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:192)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:168)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:104)
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:62)
    at com.ciscospark.retention.kafkapurgelibrary.avro.deserialize.CompatibleAvroDeserializer.deserialize(CompatibleAvroDeserializer.java:48)
    at com.ciscospark.retention.kafkapurgelibrary.avro.deserialize.CompatibleAvroDeserializer.deserialize(CompatibleAvroDeserializer.java:19)
    at com.cisco.wx2.kafka.serialization.SparkKafkaDeserializer.deserialize(SparkKafkaDeserializer.java:34)
    at com.ciscospark.retention.kafkapurgelibrary.PurgeEventConsumerFactory.lambda$new$1(PurgeEventConsumerFactory.java:80)
    at com.cisco.wx2.kafka.serialization.SimpleKafkaDeserializer.deserialize(SimpleKafkaDeserializer.java:22)
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1009)
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:96)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1186)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1500(Fetcher.java:1035)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:544)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:505)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)

这是我的代码,用于创建与架构注册表的连接。它使用以https开头的URL

    private RestService getSchemaServiceRestService() {
        String avroSchemaRegistryUrl = this.getavroSchemaRegistryUrl;
        RestService restService = new RestService(avroSchemaRegistryUrl);
        log.info("Avro schema registry URL {}",avroSchemaRegistryUrl);

        if (avroSchemaRegistryUrl.startsWith("https")) {
            SSLContext sslContext = null;
            try {
                sslContext = SSLContextBuilder.create()
                        .loadKeyMaterial(AvroSerializer.class.getclassLoader().getResource("avroSchemaRegistryClient.jks"),avroSchemaRegistryKeyPass.toCharArray(),avroSchemaRegistryKeyPass.toCharArray())
                        .loadTrustMaterial(AvroSerializer.class.getclassLoader().getResource("avroSchemaRegistryServer.jks"),avroSchemaRegistryKeyPass.toCharArray())
                        .build();
            } catch (Exception e) {
                log.error("Exception when creating sslContext for schema registry client");
                throw new RuntimeException("Exception when creating sslContext for schema registry client.",e);
            }

            SSLSocketFactory factory = sslContext.getsocketFactory();
            restService.setSslSocketFactory(factory);
            log.info("Configured SSL for schema registry client");
        }

        return restService;
    }

此函数成功,但是当我第一次调用RestService.getId时,出现该异常。

任何人都知道如何让我的2.0.0版消费者使用吗?

windls2009 回答:连接到AVRO架构注册表的SerializationException

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/3158801.html

大家都在问