使用PySpark将数据从HDFS索引到Elastic Search

我正在尝试使用Pyspark从HDFS将数据索引到Elastic搜索。当数据大于50 mb小于1​​00 mb时,作业将失败。

有人可以指导我优化流程吗?我有四个节点ES集群,并且与Spark相同,每个集群都有4g堆。

以下是例外情况:

Caused by: org.apache.spark.util.TaskCompletionListenerException: Could not write all entries for bulk operation [1/1]. Error sample (first [5] error messages):
        org.elasticsearch.hadoop.rest.EsHadoopRemoteException: mapper_parsing_exception: failed to parse;org.elasticsearch.hadoop.rest.EsHadoopRemoteException: not_x_content_exception: not_x_content_exception: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes
        {"index":{}}

我使用的脚本如下:

df = spark.read.csv("hdfs://xxxxxx:9000/SOMEPATH",mode="DROPMALFORMED",header=True)
df_1.write.format("org.elasticsearch.spark.sql").option('es.nodes','xxxxxx').option('es.port',9200).option("es.node","lb")\
    .option("es.net.http.auth.user","xx").option("es.net.http.auth.pass","xx").option("es.input.json","true")\
    .option('es.batch.write.retry.wait','60s').option('es.batch.size.entries','100').option('es.batch.write.retry.count','6').option('es.resource','xx/xy').save()

注意:在我的情况下,我已经经历了可能的解决方案而没有任何效果。

谢谢

xiaoxianchao 回答:使用PySpark将数据从HDFS索引到Elastic Search

问题出在Pyspark ES罐子上。 如果我们使用Scala罐子就可以了。因此,将我的管道代码更改为scala,它可以完美运行。

,

它在我删除代码时起作用:.option("es.input.json",true)

本文链接:https://www.f2er.com/3108264.html

大家都在问