我需要从HDFS文件夹中读取二进制文件,并将其转换为Pyspark中的DataFrame。
我希望DataFrame列之一将是源文件名。
我的二进制文件包含标头,并且所有内容都是double类型的数字
这是我所做的:
def getBinFilesSchema():
schema = StructType([StructFiled('C1',DoubleType(),False),...
StructFiled('C15',StructFiled('srcfileName',StringType(),False)])
def readBinaryfilesContent(rdd):
fname = rdd[0]
bdata = bytes(rdd[1])
array = np.frombuffer(bdata[headerSize:],dtype=np.dtype([('C1',np.double),('C2',...('C15',np.double)]))
array=array.newbyteorder().byteswap()
#add file name
array = np.array(array.tolist())
colFileName = np.array([fname]*len(array))
arayWithFname = np.hstack((array,np.atleast_2d(colFileName).T))
return arrarayWithFname.tolist()
sc = getSparkContext()
sparkSession = getSparkSession()
fileNameRdd = sc.binaryfiles(path_to_folder_on_hdfs + "/*")
filesContentAsList = fileNameRdd.flatMap(readBinaryfilesContent)
#This leads to error because the schema is incorrect,adding the fileName column change all dtype from Double to 'unicode'
df = sparkSession.createDataFrame(filesContentAsList,getBinFilesSchema())
df.show()
如果不将所有其他列从double更改为unicode,我将无法成功将文件名添加到DF。
如果我不将文件名添加到RDD,则getBinFilesSchema_withoutFname()可以正常工作,并且所有数据均保持Double类型,
那么,如何在不更改数据原始类型的情况下将二进制文件读入DataFrame,并将源文件名添加为新列?