作者:Hernan Garcia 和 Parnab Basak,2024年7月9日 发布于 Amazon 管理工作流 Apache AirflowAmazon MWAA 公告 中级 (200) 永久链接 评论
Amazon 管理工作流 Apache AirflowAmazon MWAA是一个为 Apache Airflow 提供的托管编排服务,它显著提高了安全性和可用性,减少了在云中设置和操作端到端数据管道时的基础设施管理开销。
今天,我们宣布 Apache Airflow 292 环境在 Amazon MWAA 上可用。 Apache Airflow 292 引入了几个显著的增强功能,例如改进数据集管理的新 API 端点、包括数据集依赖关系的条件表达式的高级调度选项、组合数据集与时间表调度的能力以及动态任务映射中的自定义名称,以提高 DAG 的可读性。
在这篇文章中,我们将带您了解其中的一些新功能和特性,您可以如何使用它们,以及如何设置或升级您的 Amazon MWAA 环境到 Airflow 292。
随着每个新版本的发布,Apache Airflow 社区不断创新,使 Airflow 更加关注数据,这使得您可以构建反应式、事件驱动的工作流,以应对数据集的变化,无论是在 Airflow 环境之间,还是在外部系统中。下面我们来深入了解这些新功能。
在这项功能引入之前,用户在处理涉及多个数据集的复杂调度场景时面临着显著的限制。Airflow 的调度能力仅限于数据集的逻辑与组合,意味着只有在所有指定数据集自上次运行以来均已更新后,才会创建一个 DAG 运行。这种严格的方法对需要更细致触发条件的工作流造成了挑战,例如,当任一多个数据集更新时,或在特定的数据集更新组合发生时运行 DAG。
通过 Airflow 292 的发布,您现在可以使用逻辑运算符与和或和条件表达式来定义基于数据集更新的复杂调度条件。此功能允许对工作流触发器进行细粒度控制,使 DAG 在特定数据集或多个数据集组合更新时进行调度。
例如,在金融服务行业,当区域市场的交易数据刷新时,可能需要运行风险管理流程,或者在交易和监管更新都可用时也需要运行。Amazon MWAA 中的新调度能力使您能够使用简单的表达式来表达这样的复杂逻辑。以下是我们需要建立的依赖关系的图示。
下面的 DAG 代码包含实现这些依赖关系的逻辑操作:
pythonfrom airflowdecorators import dag taskfrom airflowdatasets import Datasetfrom pendulum import datetime
tradingdataasia = Dataset(s3//trading/asia/dataparquet)tradingdataeurope = Dataset(s3//trading/europe/dataparquet)tradingdataamericas = Dataset(s3//trading/americas/dataparquet)regulatoryupdates = Dataset(s3//regulators/updatesjson)
@dag( dagid=riskmanagementtradingdata startdate=datetime(2023 5 1) schedule=((tradingdataasia tradingdataeurope tradingdataamericas) amp regulatoryupdates) catchup=False)def riskmanagementpipeline() @task def riskanalysis() # 风险分析任务
@taskdef reporting() # 报告任务 @taskdef notifications() # 通知任务 analysis = riskanalysis()report = reporting()notify = notifications()
riskmanagementpipeline()
要了解有关此功能的更多信息,请参阅 数据集的逻辑运算符 在 Airflow 文档中。
使用 Airflow 292 环境,Amazon MWAA 现在提供了一种更全面的调度机制,将数据驱动执行的灵活性与基于时间的调度的一致性相结合。
考虑一个场景,您的团队负责管理一个生成每日销售报告的数据管道。该管道依赖于来自多个来源的数据。虽然每天生成这些销售报告以便为业务利益相关者提供及时的见解至关重要,但您同样需要确保报告是最新的,并尽可能快地反映出重要数据变化。例如,在促销活动期间,如果订单激增或库存水平发生意外变化,报告应包含这些更新以保持相关性。
仅依赖于基于时间的调度可能导致潜在的问题,例如信息陈旧和基础设施资源浪费。
在 Airflow 29 中引入的 DatasetOrTimeSchedule 特性,增加了结合条件数据集表达式与基于时间的调度的能力。这意味着您的工作流不仅可以在预定的时间间隔内被调用,只要指定数据集有更新,并且可以在它们之间建立特定的依赖关系。以下图示说明了如何利用这一能力来适应此类场景。
以下是实现示例的 DAG 代码:
pythonfrom airflowdecorators import dag taskfrom airflowtimetablesdatasets import DatasetOrTimeSchedulefrom airflowtimetablestrigger import CronTriggerTimetablefrom airflowdatasets import Datasetfrom datetime import datetime
ordersdataset = Dataset(s3//path/to/orders/data)inventorydataset = Dataset(s3//path/to/inventory/data)customerdataset = Dataset(s3//path/to/customer/data)
combineddataset = (ordersdataset amp inventorydataset) customerdataset
@dag( dagid=datasettimescheduling startdate=datetime(2024 1 1) schedule=DatasetOrTimeSchedule( timetable=CronTriggerTimetable(0 0 timezone=UTC) # 每天午夜 datasets=combineddataset ) catchup=False)def datasettimeschedulingpipeline() @task def processorders() # 处理订单的任务逻辑 pass
@taskdef updateinventory() # 更新库存的任务逻辑 pass@taskdef updatecustomerdata() # 更新客户数据的任务逻辑 passorderstask = processorders()inventorytask = updateinventory()customertask = updatecustomerdata()
datasettimeschedulingpipeline()
在此示例中,DAG 将在以下两种情况下运行:
当基于时间的调度条件满足时每天午夜 UTC当组合数据集条件满足时,即在更新了订单和库存数据时,或在更新客户数据时,无论其他数据集状态如何这种灵活性使您能够创建复杂的调度规则,以满足数据管道的独特要求,从而在必要时运行并包含来自多个来源的最新数据更新。
有关数据感知调度的更多详细信息,请参阅 数据感知调度 在 Airflow 文档中。
在引入此功能之前,让 Airflow 环境感知外部系统中数据集的变化是一项挑战没有选项可将数据集标记为外部更新。有了新的数据集事件端点功能,您可以通过编程方式启动与数据集相关的事件。REST API 具有创建、列出和删除数据集事件的端点。
此功能使外部系统和应用程序能够无缝集成并与您的 Amazon MWAA 环境交互。它大大提高了您扩展数据管道的能力,以进行动态数据管理。
例如,从外部系统运行以下代码,可以在目标 Amazon MWAA 环境中触发数据集事件。该事件可以由下游流程或工作流处理,从而增强数据驱动工作流的连接性和响应能力,这些工作流依赖于及时的数据更新和交互。
bashcurl X POST lthttps//{webserverhostname}gt/api/v1/datasets/events H ContentType application/json d {dataseturi s3//bucketname/bucketkey extra { }}
以下图示说明了场景中的不同组件之间如何相互作用。
有关如何在 Amazon MWAA 中使用 Airflow REST API 的更多详细信息,请参见 引入 Amazon MWAA 对 Airflow REST API 和 Web 服务器自动扩展的支持。要了解有关数据集事件 REST API 端点的更多信息,请参考 Airflow 文档中的 数据集 UI 增强。
Airflow 292 还包括简化环境操作和监控的功能。让我们探索一些这些新能力。
客户正利用 Amazon MWAA 构建复杂的数据管道,包含多个相互连接的任务和依赖关系。当这些管道中的一个遇到问题或失败时,可能会导致一系列不必要和冗余的任务执行,从而导致资源浪费。在高频率如每小时或每天运行的场景中,此问题尤其突出。一个常见的场景是,关键管道在晚上启动失败,由于失败,它继续运行并重复失败,直到有人在第二天早上手动干预。这可能导致许多不必要的任务,消耗宝贵的计算资源,并可能导致数据损坏或不一致。

DAG 自动暂停功能旨在通过引入两个新的配置参数来解决此挑战:
maxconsecutivefaileddagrunsperdag 这是一个全局的 Airflow 配置设置。它允许您指定在 DAG 自动暂停之前,最大允许的连续失败 DAG 运行次数。maxconsecutivefaileddagruns 这是一个 DAG 级别的参数。它覆盖之前的全局配置,允许您为每个 DAG 设置自定义阈值。在以下代码示例中,我们定义了一个包含单个 PythonOperator 的 DAG。failingtask 的设计目的是通过引发 ValueError 来失败。DAG 自动暂停的关键配置是 DAG 对象中设置的 maxconsecutivefaileddagruns 参数。通过将 maxconsecutivefaileddagruns=3,我们指示 Airflow 在连续失败三次后自动暂停该 DAG。
pythonfrom airflowdecorators import dag taskfrom datetime import datetime timedelta
@taskdef failingtask() raise ValueError(此任务设计为失败)
@dag( dagid=autopause startdate=datetime(2023 1 1) scheduleinterval=timedelta(minutes=1) # 每分钟运行一次 catchup=False maxconsecutivefaileddagruns=3 # 设置允许的最大连续失败 DAG 运行次数)def exampledagwithautopause() failingtaskinstance = failingtask()
exampledagwithautopause()
通过这个参数,您现在可以配置您的 Airflow DAG 在达到指定数量的连续失败后自动暂停。
要了解更多信息,请参阅 Airflow 文档中的 DAG 自动暂停。
加速器安卓免费随着您环境中 DAG 数量的增加,管理它们变得越来越具有挑战性。无论是为了升级或迁移环境,还是其他操作活动,您可能需要暂停或恢复多个 DAG。这一过程可能变得无比繁重,因为您需要通过 Airflow UI 逐个手动暂停或恢复 DAG。这些手动操作耗时且增加了人为错误的风险,这可能导致失误,并导致数据不一致或管道中断。之前的 CLI 命令仅能处理一个 DAG,使其效率低下。
Airflow 292 改进了这些 CLI 命令,增加了将 DAG ID 视为正则表达式的能力,从而允许用户通过单个命令暂停或恢复多个 DAG。此新特性消除了重复的手动干预或单个 DAG 操作的需要,显著降低了人为错误的风险,并在数据管道中提供了可靠性和一致性。
例如,您可以使用以下 CLI 命令暂停所有生成每日流动性报告的 DAG,使用 Amazon Redshift 作为数据源:
bashairflow dags pause treatdagidasregex y (redshiftdailyliquidityreporting)
动态任务映射在 Airflow 23 中添加。这一强大功能允许工作流在运行时根据数据动态创建任务。不必依赖 DAG 编写者提前预测所需任务的数量,调度器可以基于先前任务的输出生成适当数量的任务副本。当然,强大的功能需要强大的责任。默认情况下,动态映射的任务被分配数字索引作为名称。在涉及大量映射任务的复杂工作流中,定位需要关注的特定任务变得越来越具有挑战性,因此可能导致延误和数据工作流的管理效率降低。
Airflow 29 引入了mapindextemplate 参数,这是一项受到高度请求的特性,解决了在动态任务映射中任务识别的挑战。通过此能力,您现在可以为动态映射的任务提供自定义名称,提高 Airflow UI 中的可见性和可管理性。
以下是示例代码:
pythonfrom airflowdecorators import dagfrom airflowoperatorspython import PythonOperatorfrom datetime import datetime timedelta
def processdata(data) # 执行数据处理逻辑 print(f正在处理数据 {data})
@dag( dagid=customtaskmappingexample startdate=datetime(2023 1 1) scheduleinterval=None catchup=False)def customtaskmappingexample() mappedprocesses = PythonOperatorpartial( taskid=processdatasource pythoncallable=processdata mapindextemplate=处理源={{ taskopargs[0] }} )expand(opargs=[[sourcea] [sourceb] [sourcec]])
customtaskmappingexample()
代码中的关键是 PythonOperatorpartial 调用中指定的 mapindextemplate 参数。这个 Jinja 模板 指示 Airflow 使用 opsargs 环境变量的值作为每个动态映射任务实例的映射索引。在 Airflow UI 中,您将看到三个任务实例,索引分别