无法通过SSL将Apache Spark连接到MongoDB

我已经在Ubuntu 18.04中成功安装了Apache Spark。我还向我的Spark安装中添加了mongo-spark-connector。我目前正在尝试连接到已在外部设置的MongoDB集群。我可以通过启用SSL的各种不同来源连接到我的MongoDB集群(MongoDB数据库服务器需要SSL)。当我尝试通过spark连接时,连接超时。要通过SSL连接,我通常使用私钥(pk.pem)和CA证书(ca.crt)。

我已完成以下设置:

  • 我已将私钥文件从PEM格式转换为pkcs12格式
  • 我已将CA证书转换为PEM格式
  • 我创建了一个新的密钥库,并添加了我新格式化的pkcs12文件(使用keytool)
  • 我创建了一个新的信任库,并以PEM格式(使用keytool)添加了我的CA证书

我目前使用以下命令启动脚本:

spark-submit \
--driver-java-options -Djavax.net.ssl.trustStore=/path/to/truststore.ks \
--driver-java-options -Djavax.net.ssl.trustStorePassword=tspassword \
--driver-java-options -Djavax.net.ssl.keyStore=/path/to/keystore.ks \
--driver-java-options -Djavax.net.ssl.keyStorePassword=kspassword \
--conf spark.executor.extraJavaOptions=--Djavax.net.ssl.trustStore=/path/to/truststore.ks \
--conf spark.executor.extraJavaOptions=--Djavax.net.ssl.trustStorePassword=tspassword \
--conf spark.executor.extraJavaOptions=--Djavax.net.ssl.keyStore=/path/to/keystore.ks \
--conf spark.executor.extraJavaOptions=--Djavax.net.ssl.keyStorePassword=kspassword \
python script.py

以下是我的PySpark代码:

mongo_url = 'mongodb://<user>:<pass>@<host>:<port>/db.collection?replicaSet=replica-set-name&ssl=true&authSource=test&readPreference=nearest'
mongo_df = sqlContext.read.format('com.mongodb.spark.sql.Defaultsource').option('uri',mongo_url).load()

执行脚本时,连接超时以及以下输出:

19/11/14 15:36:47 INFO SparkContext: Created broadcast 0 from broadcast at MongoSpark.scala:542
19/11/14 15:36:47 INFO cluster: Cluster created with settings {hosts=[<host>:<port>],mode=MULTIPLE,requiredClusterType=REPLICA_SET,serverSelectionTimeout='30000 ms',maxWaitQueueSize=500,requiredReplicaSetName='haip-replica-set'}
19/11/14 15:36:47 INFO cluster: Adding discovered server <host>:<port> to client view of cluster
19/11/14 15:36:47 INFO MongoClientCache: Creating MongoClient: [<host>:<port>]
19/11/14 15:36:47 INFO cluster: No server chosen by com.mongodb.client.internal.MongoClientDelegate$1@140f4273 from cluster description ClusterDescription{type=REPLICA_SET,connectionmode=MULTIPLE,serverDescriptions=[ServerDescription{address=<host>:<port>,type=UNKNOWN,state=CONNECTING}]}. Waiting for 30000 ms before timing out
19/11/14 15:36:47 INFO cluster: Exception in monitor thread while connecting to server mongodb-data-1.haip.io:27017
com.mongodb.MongoSocketReadException: Prematurely reached end of stream
    at com.mongodb.internal.connection.SocketStream.read(SocketStream.java:112)
    at com.mongodb.internal.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:580)
    at com.mongodb.internal.connection.InternalStreamConnection.receiveMessage(InternalStreamConnection.java:445)
    at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:299)
    at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:259)
    at com.mongodb.internal.connection.CommandHelper.sendAndReceive(CommandHelper.java:83)
    at com.mongodb.internal.connection.CommandHelper.executeCommand(CommandHelper.java:33)
    at com.mongodb.internal.connection.InternalStreamConnectionInitializer.initializeConnectionDescription(InternalStreamConnectionInitializer.java:105)
    at com.mongodb.internal.connection.InternalStreamConnectionInitializer.initialize(InternalStreamConnectionInitializer.java:62)
    at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:129)
    at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:117)
    at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):
  File "/home/user/script.py",line 16,in <module>
    mongo_df = sqlContext.read.format('com.mongodb.spark.sql.Defaultsource').option('uri',mongo_url).load()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",line 172,in load
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",line 1257,in __call__
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",line 63,in deco
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",line 328,in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o31.load.
: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches com.mongodb.client.internal.MongoClientDelegate$1@140f4273. Client view of cluster state is {type=REPLICA_SET,servers=[{address=<host>:<port>,state=CONNECTING,exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}]
    at com.mongodb.internal.connection.Basecluster.createTimeoutException(Basecluster.java:408)
    at com.mongodb.internal.connection.Basecluster.selectServer(Basecluster.java:123)
    at com.mongodb.internal.connection.AbstractmultiserverCluster.selectServer(AbstractmultiserverCluster.java:54)
    at com.mongodb.client.internal.MongoClientDelegate.getconnectedClusterDescription(MongoClientDelegate.java:147)
    at com.mongodb.client.internal.MongoClientDelegate.createclientSession(MongoClientDelegate.java:100)
    at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getclientSession(MongoClientDelegate.java:277)
    at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:181)
    at com.mongodb.client.internal.MongoDatabaseImpl.executeCommand(MongoDatabaseImpl.java:186)
    at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:155)
    at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:150)
    at com.mongodb.spark.MongoConnector$$anonfun$1.apply(MongoConnector.scala:237)
    at com.mongodb.spark.MongoConnector$$anonfun$1.apply(MongoConnector.scala:237)
    at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:174)
    at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:174)
    at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:157)
    at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:174)
    at com.mongodb.spark.MongoConnector.hasSampleAggregateOperator(MongoConnector.scala:237)
    at com.mongodb.spark.rdd.MongoRDD.hasSampleAggregateOperator$lzycompute(MongoRDD.scala:221)
    at com.mongodb.spark.rdd.MongoRDD.hasSampleAggregateOperator(MongoRDD.scala:221)
    at com.mongodb.spark.sql.MongoInferSchema$.apply(MongoInferSchema.scala:68)
    at com.mongodb.spark.sql.Defaultsource.constructRelation(Defaultsource.scala:97)
    at com.mongodb.spark.sql.Defaultsource.createRelation(Defaultsource.scala:50)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
    at sun.reflect.NativeMethodaccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodaccessorImpl.invoke(NativeMethodaccessorImpl.java:62)
    at sun.reflect.DelegatingMethodaccessorImpl.invoke(DelegatingMethodaccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

19/11/14 15:37:17 INFO SparkContext: Invoking stop() from shutdown hook
19/11/14 15:37:17 INFO MongoClientCache: Closing MongoClient: [<host>:<port>]

火花版本:2.4.4 Scala版本:2.11 Mongo Java驱动程序版本:3.11.2 Mongo Spark连接器版本:2.11-2.4.1

a277158272 回答:无法通过SSL将Apache Spark连接到MongoDB

密钥库和信任库的格式不正确(.ks)。一旦将密钥更改为正确的格式(.jks),它就可以工作。

本文链接:https://www.f2er.com/3101144.html

大家都在问