-
如何使用Google Dataflow Python按数据字段中的字段执行分区gsink(镶木地板)
我正在尝试从GS存储桶中读取数据,并将其输出到另一个GS存储桶(按自定义列,arrival_date划分)到另一 -
数据流:流式传输流式的Windmill RPC错误
我的波束数据流尝试从GCS读取数据并将数据写入Pub / Sub。 但是,管道挂起并出现以下错误 <pre> -
带DirectRunner的Apache Beam(SUBPROCESS_SDK)仅使用一个工作程序,如何强制它使用所有可用工作程序?
以下代码: <pre><code>def get_pipeline(workers): pipeline_options = PipelineOptions(['--direct_num_workers', str(wo -
如何使用BigQuery和Apache Beam将SQL表转换为行序列列表?
我有一个很大的表,其中每一行代表一个称为Trip的抽象。行程由数字列组成,例如车辆ID,行程ID,开始 -
运行光束管道时,“ PBegin”对象没有属性“ windowing”
在运行数据流作业时,我得到“ PBegin”对象没有“ windowing”属性。 我在pardo函数中调用connectclass类。</p -
如何在我在Google Dataflow上写Beam管道的地方运行.jar文件?
我用Apache Beam管道(用Java编写)编写了一个.jar文件,该文件要运行Google Dataflow。我把它装进了水桶。当 -
从DynamoDB迁移到Spanner / BigTable
我有一个用例,需要将70 TB的数据从DynamoDB迁移到BigTable和Spanner。具有单个索引的表将进入BigTable,否则 -
成功的数据流管道通过Airflow中的PythonVirtualenvOperator运行了多次
我正在运行一个Apache Beam管道(与Google Dataflow一起部署),该管道正在与Apache Airflow协调。 DAG文 -
什么环境_config用于Beam启动flink
我希望在运行Beam wordcount.py演示时获得有关如何设置<code>--environment_config</code>的指导。 在DirectRunner -
有没有一种方法可以在Beam的ParDo转换中创建用于写入Parquet文件的SpecificRecord列表?
我试图用Beam / Java编写Dataflow作业,以处理来自Pub / Sub并写入Parquet的一系列事件。 Pub / Sub中的事件采用JS -
几个小时后,Google Cloud DataFlow作业会发出警报
使用2.11.0版本运行DataFlow流作业。 几个小时后,我收到以下身份验证错误: <pre class="lang-py prettyprint- -
Apache Beam:将密钥的值,值对根据密钥写入文件
我想在Apache Beam(使用Java)中使用<code>FileIO</code>和<code>writeDynamic()</code>通过密钥将密钥,值对中的值写 -
当第一个包含某个事件的时间范围和第二个时间戳时,如何联接两个PCollection?
我有一个用Dataflow编写的批处理管道。我想在数据上实现以下联接。 我有两个PCollection。首先是代 -
读取或写入主题时记录错误消息
在读取或写入主题时如何记录错误消息。我们将使用Apache Beam API读取或写入主题。因此,我生成了任何 -
Apache Beam管道未使用Python并行运行任务
我刚刚开始使用Python使用Apache Beam。我有一个希望并行运行的任务,但由于某种原因,它可以串行运行。 -
apache光束火花流转JobService端点永远不会开始卡在98%
在尝试执行此步骤时,我正在使用python sparkRunner: <code>./gradlew :runners:spark:job-server:runShadow</code> </ -
从AINotebook / Jupyter运行的Apache Beam / GCP数据流
我们最近将基础结构迁移到了GCP,我们热衷于将<em> DataProc </em>(Spark)和<em> DataFlow </em>(Apache Beam)用 -
如何在csv文件内容的多列上进行聚合(例如Sum,Avg等)?
我有一个CSV文件,其中包含多列(标识符和数字值)。我想对数字变量进行一些基础统计(我是Apache Beam -
如何从GCP存储桶读取Apache Beam中的多个文件
我正在尝试使用Apache Beam在GCP中的多个文件上读取并应用一些子设置。我准备了两个仅对一个文件有效的 -
Apache Beam Python SDK版本上的Wait.On()
我在Python上使用Apache Beam,想问一下python SDK上的Apache Beam Java <code>Wait.on()</code>等同于什么? 目前 -
Apache Beam流处理事件时间
我正在尝试使用apache beam创建事件处理流。 我的信息流中发生的步骤: <ol> <li>以avro格式阅读ka -
使用模式自动检测将数据流作业写入BigQuery
当前,我们正在寻找将原始数据转换为通用结构以进行进一步分析的最佳方法。我们的数据是JSON文件, -
从Java中的Eclipse执行Apache Beam WordCount示例时,参数不以'-'错误开头
我正在尝试按照Eclipse中的说明执行示例WordCount(Java代码) <a href="https://cloud.google.com/dataflow/docs/quickstarts -
什么是管道|和>>在python中?
最近我正在学习apache Beam,并找到一些类似这样的python代码: <pre><code>lines = p | 'read' >> Read -
Google数据流作业在writeToBiqquery步骤上失败:“列表”对象和“ str”对象没有属性“ items”
我有一个使用数据流运行器运行的光束管道。它接受XML并输出一个JSON,然后必须将其存储在Bigquery表中 -
如何对基于KinesisRecord的DoFn进行单元测试?
我要开始从AWS Kinesis读取的Beam项目,因此我有一个简单的DoFn,它接受KinesisRecord并记录内容。我想编写一 -
数据流:无工作人员活动
我在从AI Platform Notebook运行相对原始的Dataflow作业时遇到一些问题(该作业旨在从BigQuery中获取数据>清理 -
是否可以从Apache Beam中的Java ParDo调用Python DoFn?
假设我正在用Java构建Beam管道,该管道将在DataflowRunner上运行。是否可以从Java ParDo调用Python DoFn? (或在 -
读取大量文件时,withHintMatchesManyFiles是否可以真正提高TextIO的性能?
在此<a href="https://stackoverflow.com/questions/45362108/how-can-i-improve-performance-of-textio-or-avroio-when-reading-a-very-large-n -
如何将我的腌制ML模型从GCS加载到Dataflow / Apache Beam
我已经在本地开发了一个Apache光束管道,在这里我可以对样本文件进行预测。 在本地计算机上,