kafka ssl用于python和java配置

我想为python java构建ssl配置

我将它们合并到一个外壳中

#!/bin/bash

PASSWORD="asdfasdf"
KEY_PASSWORD=${PASSWORD}
STORE_PASSWORD=${PASSWORD}
VALIDITY=365000

# server
keytool -keystore kafka.server.keystore.jks -alias SERVER_ALIAS -validity ${VALIDITY} -genkey -keypass ${KEY_PASSWORD} -storepass ${STORE_PASSWORD} -keyalg RSA
openssl req -new -x509 -keyout ca-key -out ca-cert -days ${VALIDITY} -passin pass:"$PASSWORD" -passout pass:"$PASSWORD"

keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert -storepass ${STORE_PASSWORD}
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert -storepass ${STORE_PASSWORD}

keytool -keystore kafka.server.keystore.jks -alias SERVER_ALIAS -certreq -file cert-file -storepass ${STORE_PASSWORD}
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days ${VALIDITY} -CAcreateserial -passin pass:"$PASSWORD"

keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert -storepass ${STORE_PASSWORD}
keytool -keystore kafka.server.keystore.jks -alias SERVER_ALIAS -import -file cert-signed -storepass ${STORE_PASSWORD}


# client
keytool -keystore kafka.client.keystore.jks -alias CLIENT_ALIAS -validity ${VALIDITY} -genkey -keypass ${KEY_PASSWORD} -storepass ${STORE_PASSWORD} -keyalg RSA
keytool -keystore kafka.client.keystore.jks -alias CLIENT_ALIAS -certreq -file client-cert-file -storepass ${STORE_PASSWORD}
openssl x509 -req -CA ca-cert -CAkey ca-key -in client-cert-file -out client-cert-signed -days ${VALIDITY} -CAcreateserial -passin pass:"$PASSWORD"
keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file ca-cert -storepass ${STORE_PASSWORD}
keytool -keystore kafka.client.keystore.jks -alias CLIENT_ALIAS -import -file client-cert-signed -storepass ${STORE_PASSWORD}


# gen pem
openssl genrsa -des3 -passout pass:${PASSWORD} -out kafka.client.key 2048
openssl req -passin pass:${PASSWORD} -passout pass:${PASSWORD} -key kafka.client.key -new -out kafka.client.req
openssl x509 -req -CA ca-cert -CAkey ca-key -in kafka.client.req -CAserial ca-cert.srl -out client.pem -days ${VALIDITY} -passin pass:${PASSWORD}

当我使用-keyalg RSA生成jks时。该配置适用于 python ,但不适用于 Java

当我不使用 -keyalg RSA来生成jks时。该配置适用于 java ,但不适用于 Py

如何生成config,python和java都可以?

test-certs 目录中的

gen ssl配置

测试代码

# python product

from pykafka import KafkaClient,SslConfig
from pykafka.common import OffsetType
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

broker_list = ['127.0.0.1:9092']
cert_path = 'test-certs'
ssl_config = SslConfig(cafile=Path(cert_path).joinpath('ca-cert'),certfile=Path(cert_path).joinpath('client.pem'),keyfile=Path(cert_path).joinpath('kafka.client.key'),password='asdfasdf')

client = KafkaClient(hosts=','.join(broker_list),ssl_config=ssl_config)

pprint(client.topics)
topic = client.topics['test']
producer = topic.get_producer()
i = 0
while True:
   producer.produce(f'msg{i}'.encode())
   i += 1
   time.sleep(0.5)

Java客户代码

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

public class Demo {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setappName("Kafka").setMaster("local[2]");
        JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf,new Duration(1000));

        Map<String,Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers","127.0.0.1:9092");
        kafkaParams.put("security.protocol","SSL");
        kafkaParams.put("ssl.keystore.location","test-certs/kafka.client.keystore.jks");
        kafkaParams.put("ssl.truststore.location","test-certs/kafka.client.truststore.jks");
        kafkaParams.put("ssl.keystore.password","asdfasdf");
        kafkaParams.put("ssl.truststore.password","asdf");
        kafkaParams.put("ssl.key.password","asdf");

        kafkaParams.put("key.deserializer",StringDeserializer.class);
        kafkaParams.put("value.deserializer",StringDeserializer.class);
        kafkaParams.put("group.id","big_data");
        kafkaParams.put("auto.offset.reset","latest");
        kafkaParams.put("enable.auto.commit",false);

        Collection<String> topics = Arrays.asList("traffic","traffic4","traffic6","test");

        JavaInputDStream<ConsumerRecord<String,String>> stream = KafkaUtils.createDirectStream(
                streamingContext,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics,kafkaParams)
        );

//        stream.mapToPair(record -> new Tuple2<>(record.key(),record.value()));

        stream.print();


        streamingContext.start();
        try {
            streamingContext.awaitTermination();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

java依赖项jdk 1.8 maven3.3.9

    <properties>
        <spark.version>2.4.4</spark.version>
        <scala.version>2.11.12</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>io.snappydata</groupId>
            <artifactId>snappy-spark-streaming-kafka-0.10_2.11</artifactId>
            <version>2.1.1.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.11</artifactId>
            <version>1.6.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.3.1</version>
        </dependency>

    </dependencies>

server.properties

中有关ssl的kafka配置
listeners=SSL://:9092

security.inter.broker.protocol=SSL
ssl.protocol = TLS
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type = JKS
ssl.keystore.location = test-certs/kafka.server.keystore.jks
ssl.keystore.password = asdfasdf
ssl.key.password = asdfasdf
ssl.truststore.type = JKS
ssl.truststore.location = test-certs/kafka.server.truststore.jks
ssl.truststore.password = asdfasdf
ssl.endpoint.identification.algorithm=

lcc297298883 回答:kafka ssl用于python和java配置

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/3105544.html

大家都在问