我正在尝试使用Pyspark从HDFS将数据索引到Elastic搜索。当数据大于50 mb小于100 mb时,作业将失败。
有人可以指导我优化流程吗?我有四个节点ES集群,并且与Spark相同,每个集群都有4g堆。
以下是例外情况:
Caused by: org.apache.spark.util.TaskCompletionListenerException: Could not write all entries for bulk operation [1/1]. Error sample (first [5] error messages):
org.elasticsearch.hadoop.rest.EsHadoopRemoteException: mapper_parsing_exception: failed to parse;org.elasticsearch.hadoop.rest.EsHadoopRemoteException: not_x_content_exception: not_x_content_exception: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes
{"index":{}}
我使用的脚本如下:
df = spark.read.csv("hdfs://xxxxxx:9000/SOMEPATH",mode="DROPMALFORMED",header=True)
df_1.write.format("org.elasticsearch.spark.sql").option('es.nodes','xxxxxx').option('es.port',9200).option("es.node","lb")\
.option("es.net.http.auth.user","xx").option("es.net.http.auth.pass","xx").option("es.input.json","true")\
.option('es.batch.write.retry.wait','60s').option('es.batch.size.entries','100').option('es.batch.write.retry.count','6').option('es.resource','xx/xy').save()
注意:在我的情况下,我已经经历了可能的解决方案而没有任何效果。
谢谢