我有一个数据流作业,
尽管我的工作“成功”,但在ReadAllFromParquet步骤之后,基本上没有输出集合。
我已成功读取列表中的文件,例如:['gs://my_bucket/my_file1.snappy.parquet','gs://my_bucket/my_file2.snappy.parquet','gs://my_bucket/my_file3.snappy.parquet']
我还确认此列表是正确的,并且在ReadAllFromParquet之前的步骤中使用记录器,这些文件的GCS路径正确。
这就是我的管道的样子(为简洁起见,省略了完整的代码,但我相信它通常可以正常工作,因为我使用ReadAllFromText具有与.csv完全相同的管道,并且可以正常工作):
with beam.Pipeline(options=pipeline_options_batch) as pipeline_2:
try:
final_data = (
pipeline_2
|'Create empty PCollection' >> beam.Create([None])
|'Get accepted batch file: {}'.format(runtime_options.complete_batch) >> beam.ParDo(OutputvalueProviderFn(runtime_options.complete_batch))
|'Read all filenames into a list'>> beam.ParDo(FileIterator(runtime_options.files_bucket))
|'Read all files' >> beam.io.ReadAllFromParquet(columns=['locationItemId','deviceId','timestamp'])
|'Process all files' >> beam.ParDo(ProcessSch2())
|'Transform to rows' >> beam.ParDo(BlisDictSch2())
|'Write to BigQuery' >> beam.io.WriteToBigQuery(
table = runtime_options.comp_table,schema = SCHEMA_2,project = pipeline_options_batch.view_as(GooglecloudOptions).project,#options.display_data()['project'],create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,#'CREATE_IF_NEEDED',#create if does not exist.
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND #'WRITE_APPEND' #add to existing rows,partitoning
)
)
except Exception as exception:
logging.error(exception)
pass
这就是我的工作图的样子:
有人知道这里可能出什么问题了,什么是调试的最佳方法? 目前我的想法:
-
存储桶权限问题。我注意到我正在读取的存储桶很奇怪,因为尽管我是项目所有者,但我仍然无法下载文件。项目的所有者只有“存储旧版存储桶所有者”。我添加了“ Storage Admin”,然后当使用我自己的帐户手动下载文件时,它运行良好。根据数据流文档,我已确保默认计算服务帐户以及数据流帐户在此存储桶上均具有“存储管理员”。但是,也许所有这一切都是一团糟,因为最终如果出现权限问题,我应该在日志中看到它,并且作业将失败?
-
ReadAllFromParquet是否期望文件格式为其他格式?我已经显示了列表的示例(在上面的图表中,我可以看到输入集合正确显示了列表中48个文件的添加元素= 48)。我知道这种格式适用于ReadAllFromText,所以我假设它们是等效的,应该可以使用。
=========
编辑: 注意到其他可能导致的后果。与我的其他使用ReadAllFromText且工作正常的工作相比,我注意到令人担忧的命名略有不匹配。
请特别注意
Read all files/ReadAllFiles/ReadRange.out0
vs
Read all files/Read all files/ReadRange.out0
路径的第一部分是这两个作业的步骤名称。 但是我相信第二个是apache_beam.io.filebasedsource(https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filebasedsource.py)中的ReadAllFiles类,该类同时调用ReadAllFromText和ReadAllFromParquet。
看起来像是潜在的错误,但似乎无法在源代码中对其进行跟踪。
============== 编辑2
经过更多的挖掘后,似乎ReadAllFromParquet仍然无法正常工作。 ReadFromParquet调用apache_beam.io.parquetio._Parquetsource,而ReadAllFromParquet只是调用
apache_beam.io.filebasedsource._ReadRange。
我想知道是否有一种方法可以将其打开?