我正在尝试将两个文件数据合并到一个rdd中。可以说我有两个文件file1.txt
是大json格式的文件,file2.txt
是小型csv格式的文件。
file1.txt
(json格式)如下:
{
{"a":1,"b":{"ba":1,"bb":"babc","bc":"babc2","d":"babc3"},"c":"abc2"},{"a":2,"b":{"ba":2,"bb":"babc2","bc":"babc22","d":"babc32"},"c":"abc22"}
}
file2.txt
(csv格式)如下:
key value
xyz xyz1
pqr pqr1
现在,我希望这样输出rdd:
{
{"a":1,"d":"babc3","e":{{"key": "xyz","value":"xyz1"},{"key": "pqr","value":"pqr1"}},"d":"babc32","c":"abc22"}
}
我尝试过的是将file2.txt
转换为json格式,然后执行如下操作:
output_rdd = file1_rdd.map(lambda x: joinfunc(x,file2_rdd))
然后尝试写output_rdd
。
但这给了我
这样的错误cPickle.PicklingError:无法序列化对象:Py4JError:错误 发生在调用o72时。 getnewargs 。跟踪:py4j.Py4JException: 方法 getnewargs ([])不存在
有没有建议将这两个文件合并为一个输出rdd?任何建议都会有所帮助。
PS:我是新手。
编辑:
我的joinfunc看起来像这样:
def joinfunc(file1_json,file2_json):
file1_detail = json.loads(file1_json)
file2_detail = json.loads(file2_json)
b_file1_detail_list = file1_detail["b"]
append_detail =[]
if b_file1_detail_list is not None:
for b_file1_detail in b_file1_detail_list:
append_detail = {"e" : file2_detail}
b_file1_detail.append(append_detail)
return json.dumps(dict(file1_detail))