如何为数据流安装python依赖项

我有一个非常小的python数据流软件包,该软件包的结构如下所示

.
├── __pycache__
├── pubsubtobigq.py
├── requirements.txt
└── venv

requirements.txt的内容是

protobuf==3.11.2
protobuf3-to-dict==0.1.5

我使用此代码运行了pipline

python -m pubsubtobigq \
  --input_topic "projects/project_name/topics/topic_name" \
  --job_name "job_name" \
  --output "gs://mybucket/wordcount/outputs" \
  --runner DataflowRunner \
  --project "project_name"  \
  --region "us-central1" \
  --temp_location "gs://mybucket/tmp/" \
  --staging_location "gs://mybucket/staging" \
  --requirements_file requirements.txt \
  --streaming True

使用该库的代码就像

from protobuf_to_dict import protobuf_to_dict

def parse_proto(message):
    dictinoary = protobuf_to_dict(message)

但是此行无法说明protobuf_to_dict是未知符号。即使我尝试使用MessageToDict中的内置方法google.protobuf.json_format的google,我也会遇到相同的错误。

我该如何解决?我需要安装这些库中的任何一个

编辑

当我使用MessageToDict中的google.protobuf.json_format时出现错误消息

Error processing instruction -31. Original traceback is Traceback (most recent call last): File 
"apache_beam/runners/common.py",line 813,in apache_beam.runners.common.DoFnRunner.process File 
"apache_beam/runners/common.py",line 449,in 
apache_beam.runners.common.SimpleInvoker.invoke_process File "/Users/username/repos/
dataflow-pipeline/venv/lib/python3.7/site-packages/apache_beam/transforms/core.py",line 1415,in 
wrapper = lambda x: [fn(x)] File "/Users/username/repos/dataflow-pipeline/pubsubtobigq.py",line 
16,in parse_proto NameError: name 'MessageToDict' is not defined During handling of the above 
exception,another exception occurred: Traceback (most recent call last): File "/usr/local/lib/
python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",line 143,in _execute response 
= task() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",line 193,in lambda: self.create_worker().do_instruction(request),request) File "/usr/local/lib/
python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",line 291,in do_instruction 
request.instruction_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/
sdk_worker.py",line 317,in process_bundle bundle_processor.process_bundle(instruction_id)) File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",line 675,in process_bundle data.transform_id].process_encoded(data.data) File "/usr/local/lib/python3.7/
site-packages/apache_beam/runners/worker/bundle_processor.py",line 146,in process_encoded 
self.output(decoded_value) File "apache_beam/runners/worker/operations.py",line 258,in 
apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/
operations.py",line 259,in apache_beam.runners.worker.operations.Operation.output File 
"apache_beam/runners/worker/operations.py",in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/
worker/operations.py",line 596,in apache_beam.runners.worker.operations.DoOperation.process File 
"apache_beam/runners/worker/operations.py",line 597,in 
apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py",line 809,in apache_beam.runners.common.DoFnRunner.receive File "apache_beam/runners/common.py",line 815,in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py",line 882,in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/
python3.7/site-packages/future/utils/init.py",line 421,in raise_with_traceback raise 
exc.with_traceback(traceback) File "apache_beam/runners/common.py",in 
apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py",in 
wrapper = lambda x: [fn(x)] File 
"/Users/username/repos/dataflow-pipeline/pubsubtobigq.py",line 16,in parse_proto NameError: 
name 'MessageToDict' is not defined [while running 'generatedPtransform-23']

我使用protobuf_to_dict

时出现错误消息
Error processing instruction -32. Original traceback is Traceback (most recent call last): File 
"apache_beam/runners/common.py",line 
21,in parse_proto NameError: name 'protobuf_to_dict' is not defined During handling of the above 
exception,in parse_proto NameError: name 'protobuf_to_dict' is not defined [while running 
'generatedPtransform-22']
xiongming321 回答:如何为数据流安装python依赖项

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/2681545.html

大家都在问