Beam / Dataflow ReadAllFromParquet什么都没读,但是我的工作仍然成功?

我有一个数据流作业,

  1. 从GCS读取文本文件,其中包含其他文件名
  2. 将文件名传递给ReadAllFromParquet以读取.parquet文件
  3. 写入BigQuery

尽管我的工作“成功”,但在ReadAllFromParquet步骤之后,基本上没有输出集合。 我已成功读取列表中的文件,例如:['gs://my_bucket/my_file1.snappy.parquet','gs://my_bucket/my_file2.snappy.parquet','gs://my_bucket/my_file3.snappy.parquet'] 我还确认此列表是正确的,并且在ReadAllFromParquet之前的步骤中使用记录器,这些文件的GCS路径正确。

这就是我的管道的样子(为简洁起见,省略了完整的代码,但我相信它通常可以正常工作,因为我使用ReadAllFromText具有与.csv完全相同的管道,并且可以正常工作):

 with beam.Pipeline(options=pipeline_options_batch) as pipeline_2:

    try:

        final_data = (
        pipeline_2
        |'Create empty PCollection' >> beam.Create([None])
        |'Get accepted batch file: {}'.format(runtime_options.complete_batch) >> beam.ParDo(OutputvalueProviderFn(runtime_options.complete_batch))
        |'Read all filenames into a list'>> beam.ParDo(FileIterator(runtime_options.files_bucket))
        |'Read all files' >> beam.io.ReadAllFromParquet(columns=['locationItemId','deviceId','timestamp'])
        |'Process all files' >> beam.ParDo(ProcessSch2())
        |'Transform to rows' >> beam.ParDo(BlisDictSch2())
        |'Write to BigQuery' >> beam.io.WriteToBigQuery(
                table = runtime_options.comp_table,schema = SCHEMA_2,project = pipeline_options_batch.view_as(GooglecloudOptions).project,#options.display_data()['project'],create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,#'CREATE_IF_NEEDED',#create if does not exist.
                write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND    #'WRITE_APPEND' #add to existing rows,partitoning
                )
        )
    except Exception as exception:
        logging.error(exception)
        pass

这就是我的工作图的样子:

Beam / Dataflow ReadAllFromParquet什么都没读,但是我的工作仍然成功?

有人知道这里可能出什么问题了,什么是调试的最佳方法? 目前我的想法:

  1. 存储桶权限问题。我注意到我正在读取的存储桶很奇怪,因为尽管我是项目所有者,但我仍然无法下载文件。项目的所有者只有“存储旧版存储桶所有者”。我添加了“ Storage Admin”,然后当使用我自己的帐户手动下载文件时,它运行良好。根据数据流文档,我已确保默认计算服务帐户以及数据流帐户在此存储桶上均具有“存储管理员”。但是,也许所有这一切都是一团糟,因为最终如果出现权限问题,我应该在日志中看到它,并且作业将失败?

  2. ReadAllFromParquet是否期望文件格式为其他格式?我已经显示了列表的示例(在上面的图表中,我可以看到输入集合正确显示了列表中48个文件的添加元素= 48)。我知道这种格式适用于ReadAllFromText,所以我假设它们是等效的,应该可以使用。

=========

编辑: 注意到其他可能导致的后果。与我的其他使用ReadAllFromText且工作正常的工作相比,我注意到令人担忧的命名略有不匹配。

这是我的工作的输出集合的名称:

Beam / Dataflow ReadAllFromParquet什么都没读,但是我的工作仍然成功?

那是我的实木复合地板工作上的名字,实际上什么都没读:

Beam / Dataflow ReadAllFromParquet什么都没读,但是我的工作仍然成功?

请特别注意

Read all files/ReadAllFiles/ReadRange.out0

vs

Read all files/Read all files/ReadRange.out0

路径的第一部分是这两个作业的步骤名称。 但是我相信第二个是apache_beam.io.filebasedsource(https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filebasedsource.py)中的ReadAllFiles类,该类同时调用ReadAllFromText和ReadAllFromParquet。

看起来像是潜在的错误,但似乎无法在源代码中对其进行跟踪。

============== 编辑2

经过更多的挖掘后,似乎ReadAllFromParquet仍然无法正常工作。 ReadFromParquet调用apache_beam.io.parquetio._Parquetsource,而ReadAllFromParquet只是调用
apache_beam.io.filebasedsource._ReadRange。

我想知道是否有一种方法可以将其打开?

lengci 回答:Beam / Dataflow ReadAllFromParquet什么都没读,但是我的工作仍然成功?

您没有提到是否使用的是最新的Beam SDK,请尝试使用SDK 2.16测试最后的更改。

该文档指出ReadAllFromParquet和ReadFromParquet一样都是实验功能;尽管如此,据报告ReadFromParquet正在此线程Apache-Beam: Read parquet files from nested HDFS directories中工作,您可能想尝试使用此功能。

本文链接:https://www.f2er.com/2904294.html

大家都在问