我已经在Ubuntu 18.04中成功安装了Apache Spark。我还向我的Spark安装中添加了mongo-spark-connector。我目前正在尝试连接到已在外部设置的MongoDB集群。我可以通过启用SSL的各种不同来源连接到我的MongoDB集群(MongoDB数据库服务器需要SSL)。当我尝试通过spark连接时,连接超时。要通过SSL连接,我通常使用私钥(pk.pem)和CA证书(ca.crt)。
我已完成以下设置:
- 我已将私钥文件从PEM格式转换为pkcs12格式
- 我已将CA证书转换为PEM格式
- 我创建了一个新的密钥库,并添加了我新格式化的pkcs12文件(使用keytool)
- 我创建了一个新的信任库,并以PEM格式(使用keytool)添加了我的CA证书
我目前使用以下命令启动脚本:
spark-submit \
--driver-java-options -Djavax.net.ssl.trustStore=/path/to/truststore.ks \
--driver-java-options -Djavax.net.ssl.trustStorePassword=tspassword \
--driver-java-options -Djavax.net.ssl.keyStore=/path/to/keystore.ks \
--driver-java-options -Djavax.net.ssl.keyStorePassword=kspassword \
--conf spark.executor.extraJavaOptions=--Djavax.net.ssl.trustStore=/path/to/truststore.ks \
--conf spark.executor.extraJavaOptions=--Djavax.net.ssl.trustStorePassword=tspassword \
--conf spark.executor.extraJavaOptions=--Djavax.net.ssl.keyStore=/path/to/keystore.ks \
--conf spark.executor.extraJavaOptions=--Djavax.net.ssl.keyStorePassword=kspassword \
python script.py
以下是我的PySpark代码:
mongo_url = 'mongodb://<user>:<pass>@<host>:<port>/db.collection?replicaSet=replica-set-name&ssl=true&authSource=test&readPreference=nearest'
mongo_df = sqlContext.read.format('com.mongodb.spark.sql.Defaultsource').option('uri',mongo_url).load()
执行脚本时,连接超时以及以下输出:
19/11/14 15:36:47 INFO SparkContext: Created broadcast 0 from broadcast at MongoSpark.scala:542
19/11/14 15:36:47 INFO cluster: Cluster created with settings {hosts=[<host>:<port>],mode=MULTIPLE,requiredClusterType=REPLICA_SET,serverSelectionTimeout='30000 ms',maxWaitQueueSize=500,requiredReplicaSetName='haip-replica-set'}
19/11/14 15:36:47 INFO cluster: Adding discovered server <host>:<port> to client view of cluster
19/11/14 15:36:47 INFO MongoClientCache: Creating MongoClient: [<host>:<port>]
19/11/14 15:36:47 INFO cluster: No server chosen by com.mongodb.client.internal.MongoClientDelegate$1@140f4273 from cluster description ClusterDescription{type=REPLICA_SET,connectionmode=MULTIPLE,serverDescriptions=[ServerDescription{address=<host>:<port>,type=UNKNOWN,state=CONNECTING}]}. Waiting for 30000 ms before timing out
19/11/14 15:36:47 INFO cluster: Exception in monitor thread while connecting to server mongodb-data-1.haip.io:27017
com.mongodb.MongoSocketReadException: Prematurely reached end of stream
at com.mongodb.internal.connection.SocketStream.read(SocketStream.java:112)
at com.mongodb.internal.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:580)
at com.mongodb.internal.connection.InternalStreamConnection.receiveMessage(InternalStreamConnection.java:445)
at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:299)
at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:259)
at com.mongodb.internal.connection.CommandHelper.sendAndReceive(CommandHelper.java:83)
at com.mongodb.internal.connection.CommandHelper.executeCommand(CommandHelper.java:33)
at com.mongodb.internal.connection.InternalStreamConnectionInitializer.initializeConnectionDescription(InternalStreamConnectionInitializer.java:105)
at com.mongodb.internal.connection.InternalStreamConnectionInitializer.initialize(InternalStreamConnectionInitializer.java:62)
at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:129)
at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:117)
at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):
File "/home/user/script.py",line 16,in <module>
mongo_df = sqlContext.read.format('com.mongodb.spark.sql.Defaultsource').option('uri',mongo_url).load()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",line 172,in load
File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",line 1257,in __call__
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",line 63,in deco
File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",line 328,in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o31.load.
: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches com.mongodb.client.internal.MongoClientDelegate$1@140f4273. Client view of cluster state is {type=REPLICA_SET,servers=[{address=<host>:<port>,state=CONNECTING,exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}]
at com.mongodb.internal.connection.Basecluster.createTimeoutException(Basecluster.java:408)
at com.mongodb.internal.connection.Basecluster.selectServer(Basecluster.java:123)
at com.mongodb.internal.connection.AbstractmultiserverCluster.selectServer(AbstractmultiserverCluster.java:54)
at com.mongodb.client.internal.MongoClientDelegate.getconnectedClusterDescription(MongoClientDelegate.java:147)
at com.mongodb.client.internal.MongoClientDelegate.createclientSession(MongoClientDelegate.java:100)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getclientSession(MongoClientDelegate.java:277)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:181)
at com.mongodb.client.internal.MongoDatabaseImpl.executeCommand(MongoDatabaseImpl.java:186)
at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:155)
at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:150)
at com.mongodb.spark.MongoConnector$$anonfun$1.apply(MongoConnector.scala:237)
at com.mongodb.spark.MongoConnector$$anonfun$1.apply(MongoConnector.scala:237)
at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:174)
at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:174)
at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:157)
at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:174)
at com.mongodb.spark.MongoConnector.hasSampleAggregateOperator(MongoConnector.scala:237)
at com.mongodb.spark.rdd.MongoRDD.hasSampleAggregateOperator$lzycompute(MongoRDD.scala:221)
at com.mongodb.spark.rdd.MongoRDD.hasSampleAggregateOperator(MongoRDD.scala:221)
at com.mongodb.spark.sql.MongoInferSchema$.apply(MongoInferSchema.scala:68)
at com.mongodb.spark.sql.Defaultsource.constructRelation(Defaultsource.scala:97)
at com.mongodb.spark.sql.Defaultsource.createRelation(Defaultsource.scala:50)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
at sun.reflect.NativeMethodaccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodaccessorImpl.invoke(NativeMethodaccessorImpl.java:62)
at sun.reflect.DelegatingMethodaccessorImpl.invoke(DelegatingMethodaccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
19/11/14 15:37:17 INFO SparkContext: Invoking stop() from shutdown hook
19/11/14 15:37:17 INFO MongoClientCache: Closing MongoClient: [<host>:<port>]
火花版本:2.4.4 Scala版本:2.11 Mongo Java驱动程序版本:3.11.2 Mongo Spark连接器版本:2.11-2.4.1