基于事件触发将文件放入S3存储桶中并运行气流任务

是否仅在发生特定事件(例如将文件放入特定S3存储桶的事件)时才运行气流任务。类似于AWS Lambda事件

S3KeySensor,但我不知道它是否满足我的要求(仅在事件发生时才运行Task)

以下是使问题更清楚的示例:

我有一个如下的传感器对象

sensor = S3KeySensor(
    task_id='run_on_every_file_drop',bucket_key='file-to-watch-*',wildcard_match=True,bucket_name='my-sensor-bucket',timeout=18*60*60,poke_interval=120,dag=dag
)

使用上述传感器对象,传感器任务的气流行为如下:

  • 如果已经存在与之匹配的对象名称,则运行任务 S3存储桶my-sensor-bucket中的通配符,甚至早于DAG 在气流管理用户界面中切换了ON(我不想运行该任务,因为 到过去的s3对象)
  • 运行一次后,传感器任务将不会再次运行 是一个新的S3文件对象放置(我想每次在存储桶my-sensor-bucket中放置一个新的S3文件对象时,都在DAG中运行传感器任务和后续任务)
  • 如果配置计划程序,则任务将根据计划运行 但不是基于事件。所以调度程序似乎不是这个的选择 情况

我试图了解气流中的任务是否只能基于调度(例如cron作业)或传感器(仅基于传感标准一次)运行,还是不能像基于事件的管道一样进行设置(类似于AWS) Lambda

lililili521 回答:基于事件触发将文件放入S3存储桶中并运行气流任务

气流从根本上围绕基于时间的调度进行组织。

您可以通过以下几种方法来获得所需的东西:

  1. 假设您在S3上有一个SQS事件,它会触发一个AWS Lambda,该Lambda会调用气流API来触发dag运行。
  2. 您可以使DAG从SQS传感器开始,当它收到s3更改事件时,它将与其他DAG一起继续(请参阅3_1和3_2进行重新安排)。
  3. 您可以使DAG从传感器开始(就像您显示的那样),它不选择要运行的任务,它只是传递到下一个相关任务或超时。您必须删除使传感器匹配的密钥。
    1. 您通过使最终任务重新触发DAG来重新运行。
    2. 或将调度间隔设置为每分钟一次,不追赶,最大活动DAG运行设置为1。这样一来,将运行一次,传感器将保持运行直到超时。如果完成或超时,则下一分钟将在一分钟内开始。

如果使用路线3,则将删除在下次运行DAG及其传感器之前通过传感器的键。请注意,由于S3最终的一致性,路由1和2更可靠。

本文链接:https://www.f2er.com/3166240.html

大家都在问