尽管所有端口都已明确设置,但Spark执行程序将结果发送到随机端口

我试图通过在Docker中运行的Jupyter笔记本电脑与PySpark进行火花作业。工作人员位于同一网络中的不同计算机上。我正在RDD上执行take操作:

data.take(number_of_elements)

number_of_elements为2000时,一切正常。如果是20000,则会发生异常。从我的角度来看,当结果的大小超过2GB时,它会中断(或者对我来说似乎如此)。关于2GB的想法来自于火花可以在一个块内发送小于2GB的结果,而当结果大于2GB时,另一种机制开始起作用,并且在那里出现故障(see here)。这是执行者日志中的例外情况:

19/11/05 10:27:14 INFO CodeGenerator: Code generated in 205.7623 ms
19/11/05 10:27:40 INFO PythonRunner: Times: total = 25421,boot = 3,init = 1751,finish = 23667
19/11/05 10:27:42 INFO MemoryStore: Block taskresult_4 stored as bytes in memory (estimated size 927.7 MB,free 6.4 GB)
19/11/05 10:27:42 INFO Executor: Finished task 0.0 in stage 3.0 (TID 4). 972788748 bytes result sent via BlockManager)
19/11/05 10:27:49 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1585998572000,chunkIndex=0},buffer=org.apache.spark.storage.BlockManagerManagedBuffer@4399ad49} to /10.0.0.9:56222; closing connection
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
    at sun.nio.ch.IOUtil.write(IOUtil.java:65)
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
    at org.apache.spark.util.io.ChunkedByteBufferFileRegion.transferTo(ChunkedByteBufferFileRegion.scala:64)
    at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:121)
    at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
    at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
    at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
    at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
    at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
    at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
    at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
    at io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:748)

从日志执行器可以看出,尝试将结果发送到10.0.0.9:56222。由于端口未在docker compose中打开而失败。 10.0.0.9是主节点的IP地址,但是端口56222是随机的,尽管我明确设置了在documentation中可以找到的所有端口以禁用随机端口选择:

spark = SparkSession.builder\
.master('spark://spark.cyber.com:7077')\
.appName('My App')\
.config('spark.task.maxFailures','16')\
.config('spark.driver.port','20002')\
.config('spark.driver.host','spark.cyber.com')\
.config('spark.driver.bindAddress','0.0.0.0')\
.config('spark.blockManager.port','6060')\
.config('spark.driver.blockManager.port','6060')\
.config('spark.shuffle.service.port','7070')\
.config('spark.driver.maxResultSize','14g')\
.getOrCreate()

我用docker compose映射了这些端口:

version: "3"
services:
  jupyter:
    image: jupyter/pyspark-notebook:latest
    ports:
      - "4040-4050:4040-4050"
      - "6060:6060"
      - "7070:7070"
      - "8888:8888"
      - "20000-20010:20000-20010"
liaotengfei2008 回答:尽管所有端口都已明确设置,但Spark执行程序将结果发送到随机端口

您可能应该配置Spark驱动程序内存以遵循Docker容器内存设置

,

我添加了

.config('spark.driver.memory','14g')

按照@ML_TN的建议,现在一切正常。

从我的角度来看,奇怪的是内存设置会影响spark使用的端口。

本文链接:https://www.f2er.com/3158992.html

大家都在问