-
使用xcom pull检索从其他dag推送的变量
我对气流非常陌生,并使用“ xcom_push”和“ xcom_pull”功能。 我有两个任务d1和任务t2,第二个d2和 -
基于事件触发将文件放入S3存储桶中并运行气流任务
是否仅在发生特定事件(例如将文件放入特定S3存储桶的事件)时才运行气流任务。类似于AWS Lambda事件</ -
使用hiveOperator在气流中使用SELECT配置单元查询获取数据
在气流中,如果我想使用hiveOperator进行蜂巢查询,并且该查询的结果想要转移到另一个任务。 使用hiveOpe -
如何在气流中使用PythonVirtualenvOperator?
基本上,我正在使用气流,并制定了一个任务,可以从外部来源下载文件。 <pre><code>t1 = PythonOperator( -
气流中的全局变量
我正在尝试使用Airflow来实现基本的ETL工作,但停留在一点: 我有3个功能。我想为每个变量定义全 -
气流+气流为何返回不清楚的错误
我正在Linux服务器上使用气流应用程序 注意-Airflow是一个以编程方式编写,安排和监视工作流的平 -
气流测试模式xcom拉/推不起作用
我尝试通过气流cli <code>test</code>命令测试2个任务。 第一个任务运行,将最后一个控制台自动推送 -
如何将值从Xcom传递给另一个运算符?
<code>Input</code>具有参数<code>DockerOperator</code>,该参数设置后会将Docker容器的输出推送到Xcom: <pre><cod -
在气流UI中运行dag时遇到问题
从气流ui提交dag时,我遇到一个奇怪的问题。当我提交dag时,它显示为正在运行,而不显示日志,并且dag -
气流中的全局变量
我正在使用XCom在任务之间进行通信,但是我的代码无法正常工作。 根据教程,我的代码是: < -
Airflow XCom中的元组索引超出范围
我正在使用Airflow和XComs,并且想从该函数返回多个值,但是,我遇到了问题。以下是我的代码: <pre> -
如何将xcom变量推送到现有的dag id?
我目前在Airflow中有一个DAG,它带有Python运算符和可调用的相关python,例如: <pre><code>def push_xcom(**kwar -
推送输出后如何删除Docker容器?
我正在使用<code>xcom_push</code>设置为<code>True</code>的DockerOperator,以便可以获取Docker容器的控制台输出并通 -
将变量从Spark推送到气流
我有一个变量,希望将其值推送到Airflow,以便将其用作下一个任务的输入。我知道我必须使用xcoms,但 -
通过xcom push无法获得返回值
在sh运算符的帮助下,我正在通过xcom推送一个值,并尝试拉取并将其分配给get_master_ip变量。但是它没有 -
自定义操作员XCom在Airflow运行期间
我了解<code>PythonOperator/BashOperator</code>,我们可以使用Xcom进行通信。 例如 <pre><code>def func(**conte -
气流-如何从ECS运营商推送XCOM?
在气流控制中,我有一个ecs_operator任务,其后是python运算符任务。我想使用xcom将一些消息从ECS任务推送 -
用气流清洁过去的AI平台型号
我正在使用气流来安排在gcloud AI平台中训练模型Verison的时间 我设法安排了模型的培训,版本的创建,然 -
如何确保Airflow在回填期间仍运行预定的dagrun(并且不会将其放到队列的末尾)?
我们构建了一个气流回填插件,该插件可以在开始日期和结束日期之间清除给定任务的任务实例。 <p -
气流SimpleHttpOperator
嗨,我正在经历来自SimpleHttpOperator的奇怪行为。 我已经像这样扩展了该运算符: <pre><code>class EPOHttpO -
如何在启动气流时跳过dag中的api调用
我有一个dag,它在任务内部进行api回调。像 <pre><code>events_sensor = DataCloudEtlManifestFileSensor( task_i -
如何从以前的运行中拉xcom变量
如何从以前的气流中提取xcom变量?有可能吗? 我想将先前run_id中相同task_id的值用作SimpeHttpOperator -
如何使用Airflow存储SQL查询结果并在其他情况下使用结果?
我试图使用XCOM,但它没有显示任何值 <pre><code>class DWPostgresReturn(DWPostgresOperator): def execute(self, conte -
尝试从XCom传递值时与PostgresOperator一起使用时,气流模板未呈现
我编写了一个非常简单的DAG,首先执行一些python操作,然后再使用返回的值更新Database-Table。这是DAG代码 -
气流操作员从外部Rest API提取数据
我正在尝试从外部API提取数据并将其转储到S3上。我正在考虑编写和Airflow Operator rest-to-s3.py,它将从外部 -
Apache Airflow:有关动态任务和并行性的问题
我被要求为ETL脚本编写一个更“专业”的版本。我所说的专业是:监督(日志和电子邮件警报),并行 -
如何通过Airflow中的task_id获取上游任务的任务实例?
是否可以从传递给<code>/home/crash/1234: user.exported: No such attribut ./crash_remove.sh: Line 20: rm -rfv /home/crash/1234: Fil -
如何使用Airflow处理批处理新数据?
我们要使用Airflow处理批量新数据,首先,我们的dag运行命令,每15分钟检查一次CRM系统中是否有新数据 -
将OracleOperator的输出发送到气流中的另一个任务
我需要在另一个任务中使用oracleOperator的输出来进一步执行。我遇到的麻烦是,当我将数据拉入另一个任 -
如何在PostgresOperator中提取XCOM值
我在这里推XCOM值: <pre><code>task_get_username_bash = BashOperator( task_id='execute_bash',