pyspark rdd在一个rdd中合并多个json文件数据

我正在尝试将两个文件数据合并到一个rdd中。可以说我有两个文件file1.txt是大json格式的文件,file2.txt是小型csv格式的文件。

file1.txt(json格式)如下:

{

{"a":1,"b":{"ba":1,"bb":"babc","bc":"babc2","d":"babc3"},"c":"abc2"},{"a":2,"b":{"ba":2,"bb":"babc2","bc":"babc22","d":"babc32"},"c":"abc22"}

}

file2.txt(csv格式)如下:

key   value

xyz   xyz1

pqr   pqr1

现在,我希望这样输出rdd:

{

{"a":1,"d":"babc3","e":{{"key": "xyz","value":"xyz1"},{"key": "pqr","value":"pqr1"}},"d":"babc32","c":"abc22"}

}

我尝试过的是将file2.txt转换为json格式,然后执行如下操作:

output_rdd = file1_rdd.map(lambda x: joinfunc(x,file2_rdd))

然后尝试写output_rdd

但这给了我

这样的错误
  

cPickle.PicklingError:无法序列化对象:Py4JError:错误   发生在调用o72时。 getnewargs 。跟踪:py4j.Py4JException:   方法 getnewargs ([])不存在

有没有建议将这两个文件合并为一个输出rdd?任何建议都会有所帮助。

PS:我是新手。

编辑:

我的joinfunc看起来像这样:

def joinfunc(file1_json,file2_json):

    file1_detail = json.loads(file1_json)
    file2_detail = json.loads(file2_json)

    b_file1_detail_list = file1_detail["b"]
    append_detail =[]

    if b_file1_detail_list is not None:
        for b_file1_detail in b_file1_detail_list:
            append_detail = {"e" : file2_detail}
            b_file1_detail.append(append_detail)

    return json.dumps(dict(file1_detail))
laozi8huai 回答:pyspark rdd在一个rdd中合并多个json文件数据

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/3148088.html

大家都在问