在Hive上使用Parquet增加写并行性

tl; dr-我正在Hive上将大量数据写入新的Parquet格式表中,但是该作业使用的reducer比指定的要少得多,因此写入花费的时间比我想要的长得多。

我正在构建旨在通过Spark创建快速读取的数据湖表,但是我正在使用hive写入数据,因此a)hive可以读取存储分区的表,b)因此我可以将统计信息写入hive metastore 。

我像这样从python创建表:

hivecur.execute("set hive.cbo.enable=true")
hivecur.execute("set hive.compute.query.using.stats=true")
hivecur.execute("set hive.stats.fetch.column.stats=true")
hivecur.execute("set hive.stats.fetch.partition.stats=true")

hivecur.execute("set hive.vectorized.execution.enabled=true")
hivecur.execute("set hive.vectorized.execution.reduce.enabled=true")

hivecur.execute('set mapred.reduce.tasks=100')

hivecur.execute(f"set dfs.block.size={1024*1024*128}")
hivecur.execute(f"set parquet.block.size={1024*1024*128}")

hivecur.execute(f"drop table if exists {TABLE_NAME}")

table_create_qry = f"""
create table {TABLE_NAME} (
    {schema.dice}
)
partitioned by (process_date_z int,dataset string)
clustered by (id) sorted by (source_id,type,id) into 200 buckets
stored as parquet
TBLPROPERTIES ("comment" = "{git_hash()}","parquet.compress" = "snappy")

然后当我插入时:

qry = f"""
        insert overwrite table {TABLE_NAME} partition (process_date_z,dataset)
        select ...
            source_id,process_date_z,'{dataset}' as dataset
        from {source_table}
        where process_date_z = {d}
        and pmod(hash(id),100) in ({",".join([str(x) for x in id_filters])})"""

通过设置mapred.reduce.tasks=100,我希望可以强制每个分区包含100个文件。相反,尽管创建了100个任务,但92个任务很快完成,而八个reduce任务的运行时间更长,只写了几十个(但不是100个)大小大致相等的文件。

这样做的问题是减少写入过程中的重大瓶颈。我可以设置什么参数来提高性能?

zqf123456zqf 回答:在Hive上使用Parquet增加写并行性

我认为我的问题来自对哈希函数的愚蠢选择。

我怀疑按ID进行存储分区的算法与我对ID进行子集处理时使用的哈希相同,因此它为所有可能的输入ID创建了存储分区,但是pmod WHERE只允许填充其中的几个。 >

为解决这个问题,我用Brickhouse的Murmurhash3 UDF切换了pmod中的哈希。

https://github.com/klout/brickhouse

本文链接:https://www.f2er.com/3127807.html

大家都在问