PySpark-如何读取二进制文件并添加源文件名

我需要从HDFS文件夹中读取二进制文件,并将其转换为Pyspark中的DataFrame。

我希望DataFrame列之一将是源文件名。

我的二进制文件包含标头,并且所有内容都是double类型的数字

这是我所做的:

def getBinFilesSchema():
  schema = StructType([StructFiled('C1',DoubleType(),False),...
                       StructFiled('C15',StructFiled('srcfileName',StringType(),False)])

def readBinaryfilesContent(rdd):
  fname = rdd[0]
  bdata = bytes(rdd[1])
  array = np.frombuffer(bdata[headerSize:],dtype=np.dtype([('C1',np.double),('C2',...('C15',np.double)]))
  array=array.newbyteorder().byteswap()
  #add file name
  array = np.array(array.tolist())
  colFileName = np.array([fname]*len(array))
  arayWithFname = np.hstack((array,np.atleast_2d(colFileName).T))
  return arrarayWithFname.tolist()  

sc = getSparkContext()
sparkSession = getSparkSession()
fileNameRdd = sc.binaryfiles(path_to_folder_on_hdfs + "/*")

filesContentAsList = fileNameRdd.flatMap(readBinaryfilesContent)
#This leads to error because the schema is incorrect,adding the fileName column change all dtype from Double to 'unicode' 
df = sparkSession.createDataFrame(filesContentAsList,getBinFilesSchema())
df.show()

如果不将所有其他列从double更改为unicode,我将无法成功将文件名添加到DF。

如果我不将文件名添加到RDD,则getBinFilesSchema_withoutFname()可以正常工作,并且所有数据均保持Double类型,

那么,如何在不更改数据原始类型的情况下将二进制文件读入DataFrame,并将源文件名添加为新列?

djdz123 回答:PySpark-如何读取二进制文件并添加源文件名

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/3104143.html

大家都在问