我想为python java构建ssl配置
我将它们合并到一个外壳中
#!/bin/bash
PASSWORD="asdfasdf"
KEY_PASSWORD=${PASSWORD}
STORE_PASSWORD=${PASSWORD}
VALIDITY=365000
# server
keytool -keystore kafka.server.keystore.jks -alias SERVER_ALIAS -validity ${VALIDITY} -genkey -keypass ${KEY_PASSWORD} -storepass ${STORE_PASSWORD} -keyalg RSA
openssl req -new -x509 -keyout ca-key -out ca-cert -days ${VALIDITY} -passin pass:"$PASSWORD" -passout pass:"$PASSWORD"
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert -storepass ${STORE_PASSWORD}
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert -storepass ${STORE_PASSWORD}
keytool -keystore kafka.server.keystore.jks -alias SERVER_ALIAS -certreq -file cert-file -storepass ${STORE_PASSWORD}
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days ${VALIDITY} -CAcreateserial -passin pass:"$PASSWORD"
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert -storepass ${STORE_PASSWORD}
keytool -keystore kafka.server.keystore.jks -alias SERVER_ALIAS -import -file cert-signed -storepass ${STORE_PASSWORD}
# client
keytool -keystore kafka.client.keystore.jks -alias CLIENT_ALIAS -validity ${VALIDITY} -genkey -keypass ${KEY_PASSWORD} -storepass ${STORE_PASSWORD} -keyalg RSA
keytool -keystore kafka.client.keystore.jks -alias CLIENT_ALIAS -certreq -file client-cert-file -storepass ${STORE_PASSWORD}
openssl x509 -req -CA ca-cert -CAkey ca-key -in client-cert-file -out client-cert-signed -days ${VALIDITY} -CAcreateserial -passin pass:"$PASSWORD"
keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file ca-cert -storepass ${STORE_PASSWORD}
keytool -keystore kafka.client.keystore.jks -alias CLIENT_ALIAS -import -file client-cert-signed -storepass ${STORE_PASSWORD}
# gen pem
openssl genrsa -des3 -passout pass:${PASSWORD} -out kafka.client.key 2048
openssl req -passin pass:${PASSWORD} -passout pass:${PASSWORD} -key kafka.client.key -new -out kafka.client.req
openssl x509 -req -CA ca-cert -CAkey ca-key -in kafka.client.req -CAserial ca-cert.srl -out client.pem -days ${VALIDITY} -passin pass:${PASSWORD}
当我使用-keyalg RSA
生成jks时。该配置适用于 python ,但不适用于 Java 。
当我不使用 -keyalg RSA
来生成jks时。该配置适用于 java ,但不适用于 Py 。
如何生成config,python和java都可以?
test-certs 目录中的gen ssl配置
测试代码
# python product
from pykafka import KafkaClient,SslConfig
from pykafka.common import OffsetType
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
broker_list = ['127.0.0.1:9092']
cert_path = 'test-certs'
ssl_config = SslConfig(cafile=Path(cert_path).joinpath('ca-cert'),certfile=Path(cert_path).joinpath('client.pem'),keyfile=Path(cert_path).joinpath('kafka.client.key'),password='asdfasdf')
client = KafkaClient(hosts=','.join(broker_list),ssl_config=ssl_config)
pprint(client.topics)
topic = client.topics['test']
producer = topic.get_producer()
i = 0
while True:
producer.produce(f'msg{i}'.encode())
i += 1
time.sleep(0.5)
Java客户代码
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
public class Demo {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setappName("Kafka").setMaster("local[2]");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf,new Duration(1000));
Map<String,Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers","127.0.0.1:9092");
kafkaParams.put("security.protocol","SSL");
kafkaParams.put("ssl.keystore.location","test-certs/kafka.client.keystore.jks");
kafkaParams.put("ssl.truststore.location","test-certs/kafka.client.truststore.jks");
kafkaParams.put("ssl.keystore.password","asdfasdf");
kafkaParams.put("ssl.truststore.password","asdf");
kafkaParams.put("ssl.key.password","asdf");
kafkaParams.put("key.deserializer",StringDeserializer.class);
kafkaParams.put("value.deserializer",StringDeserializer.class);
kafkaParams.put("group.id","big_data");
kafkaParams.put("auto.offset.reset","latest");
kafkaParams.put("enable.auto.commit",false);
Collection<String> topics = Arrays.asList("traffic","traffic4","traffic6","test");
JavaInputDStream<ConsumerRecord<String,String>> stream = KafkaUtils.createDirectStream(
streamingContext,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics,kafkaParams)
);
// stream.mapToPair(record -> new Tuple2<>(record.key(),record.value()));
stream.print();
streamingContext.start();
try {
streamingContext.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
java依赖项jdk 1.8 maven3.3.9
<properties>
<spark.version>2.4.4</spark.version>
<scala.version>2.11.12</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>io.snappydata</groupId>
<artifactId>snappy-spark-streaming-kafka-0.10_2.11</artifactId>
<version>2.1.1.7</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.3.1</version>
</dependency>
</dependencies>
server.properties
中有关ssl的kafka配置listeners=SSL://:9092
security.inter.broker.protocol=SSL
ssl.protocol = TLS
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type = JKS
ssl.keystore.location = test-certs/kafka.server.keystore.jks
ssl.keystore.password = asdfasdf
ssl.key.password = asdfasdf
ssl.truststore.type = JKS
ssl.truststore.location = test-certs/kafka.server.truststore.jks
ssl.truststore.password = asdfasdf
ssl.endpoint.identification.algorithm=