开放表格格式OTFs如 Apache Iceberg 正在日益广泛使用,例如改善数据湖的交易一致性,或将批处理和串流数据管道整合到单一文件格式中以降低复杂性。然而,架构师需要将所选格式与现代数据平台的不同层级集成,但不同 OTF 的支持程度在各常见分析服务之间差异颇大。
商业供应商和开源社群对此局面表示关注,并致力于提高表格格式之间的互操作性。一个解决方案是通过翻译元数据,使单一物理数据集以不同格式可读,并避免对实际数据文件的重处理。Apache XTable 正是一个遵循此方法的开源解决方案,提供了开放表格格式元数据翻译的抽象和工具。
在本篇文章中,我们将展示如何在 AWS 上开始使用 Apache XTable,以及如何在由 Amazon Managed Workflows for Apache Airflow (Amazon MWAA) 编排的批处理管道中利用它。为了理解 XTable 和类似解决方案的工作原理,我们将先介绍 OTF 的元数据管理的高层次背景,然后深入研究 XTable 及其用法。
开放表格格式克服了传统数据湖存储格式如 Apache Hive 表的不足,提供了来自关联数据库的抽象和功能,例如交易一致性以及创建、更新或删除单个记录的能力。此外,它们还帮助管理模式演变。
为了理解 XTable 的元数据翻译方法如何运作,首先须了解 OTF 的元数据是如何在存储层上表示的。
OTF 包括数据层和元数据层,这两者都作为文件在存储中表示。数据层包含数据文件,而元数据层则包含跟踪数据文件及其变更的交易一致序列的元数据文件。下图显示了这一配置:
加速器安卓免费检查存储中的 Iceberg 表文件时,我们可以通过文件夹 metadata 辨识元数据层。与此相邻的是数据文件此示例中,数据文件为压缩的 Parquet 格式:

plaintext
metadata # 包含元数据文件 000006c64b16daffa4f0e8635a996ec13a7fametadatajson 23ba9e9478194661b698f512f5b51054m0avro snap5514083086862319381123ba9e9478194661b698f512f5b51054avro part00011587322f110074500a5cf8022f6e7fa3cc000snappyparquet # 数据文件与 Iceberg 类似,Delta Lake 中的元数据层通过文件夹 deltalog 表示:plaintext deltalog # 包含元数据文件 00000000000000000000json part00011587322f110074500a5cf8022f6e7fa3cc000snappyparquet # 数据文件虽然元数据层在不同 OTF 之间的结构和功能各异,但最终它们仅仅是存储中的文件。一般来说,元数据层位于表的基层目录中,与数据文件相邻。现在,问题出现了:如果对于同一表格,在不同格式中平行存储多个元数据文件会怎样呢?当前的互操作性方法正是这样做的,下一节将进一步探讨。## Apache XTableXTable 目前作为独立的 Java 可执行文件提供。它能够在 Apache Hudi、Apache Iceberg 或 Delta Lake 之间翻译元数据层,而无需重写数据文件,并与 Iceberg 兼容的目录集成,如 [AWS Glue Data Catalog](https//awsamazoncom/glue)。在实践中,XTable 读取输入表的最新快照并为可配置的目标格式创建额外的元数据。它将这些额外的元数据添加到存储层上的表格中,并保持现有元数据不变。通过这一方式,你可以选择任何格式源或目标,读取相应的元数据,从而获得表格数据的一致视图。以下图示显示了元数据翻译的过程。假设已有一个 Delta Lake 表,想将其转换为 Iceberg 表。要运行 XTable,你需要调用其 Java 可执行文件并提供一个配置文件,该文件指定源和目标格式以及源表路径:bashjava jar utilities010SNAPSHOTbundledjar datasetConfig datasetConfigyaml一个最小的 datasetConfigyaml 配置文件如下,假设表存储在 [Amazon Simple Storage Service (Amazon S3)](http//awsamazoncom/s3) 中:yamlsourceFormat DELTAtargetFormats ICEBERGdatasets tableBasePath s3//lt表的基层目录 URIgt tableName lt表名gt正如以下列所示,XTable 在表的基层目录中添加了 Iceberg 特定的 metadata 文件夹,以及现有的 deltalog 文件夹。现在,客户端可以以 Delta Lake 或 Iceberg 格式读取该表。plaintext deltalog # 先前存在的 Delta Lake 元数据 metadata # XTable 添加的:Apache Iceberg 元数据 part00011587322f110074500a5cf8022f6e7fa3cc000snappyparquet # 数据文件为了在数据目录中注册 Iceberg 表格,需将另一个配置文件传递给 XTable,该文件负责 Iceberg 目录:bashjava jar utilities010SNAPSHOTbundledjar datasetConfig datasetConfigyaml icebergCatalogConfig glueDataCatalogyamlglueDataCatalogyaml 的最小内容如下。它配置 XTable 使用由 icebergaws 模块提供的特定于数据目录的 IcebergCatalog 实现,该模块是 Apache Iceberg 核心项目的组成部分:yamlcatalogImpl orgapacheicebergawsglueGlueCatalogcatalogName gluecatalogOptions warehouse s3// catalogimpl orgapacheicebergawsglueGlueCatalog ioimpl orgapacheicebergawss3S3FileIO## 将 Apache XTable 作为 Airflow 操作运行你可以在批数据管道中使用 XTable,这些管道会在数据湖中写入表格,并确保这些表格可以用不同的文件格式进行读取。例如,在 Delta Lake 生态系统中,数据管道可能会创建 Delta 表,这些表也需能够以 Iceberg 表的形式访问。AWS 上的一个工具是 Amazon MWAA,这是 Apache Airflow 的一项托管服务。在接下来的几节中,我们将探讨如何在 Amazon MWAA 上自定义 Airflow 操作来运行 XTable。我们会更详细地说明这种操作的初步设计,并演示它在 Amazon MWAA 上的部署。为什么要自定义操作?虽然 XTable 也可以直接从 BashOperator 调用,但我们选择将这一步包装在一个自定义操作中,以便通过原生 Airflow 程式语言Python和操作参数进行配置。欲了解如何编写自定义操作的背景,请参见 [Creating a custom operator](https//airflowapacheorg/docs/apacheairflow/stable/howto/customoperatorhtml)。以下图示显示了操作与 XTable 的二进制文件之间的依赖关系。### 操作的输入参数XTable 的主要输入是基于 YAML 的配置文件: 数据集配置 包含源格式、目标格式和源表 Iceberg 目录配置可选 包含指向外部 Iceberg 目录的引用,以在目标格式中注册该表我们选择在操作的输入参数中反映 YAML 文件的数据结构,如下表所示。 参数 类型 值 datasetconfig dict 数据集配置的内容作为字典文字 icebergcatalogconfig dict Iceberg 目录配置的内容作为字典文字 随著操作的运行,YAML 文件将基于输入参数生成。参考 [XTable 的 GitHub 仓库](https//githubcom/apache/incubatorxtabletab=readmeovfile#runningthebundledjar) 获取完整的字典键参考。### 参数化示例以下示例显示了从 Delta Lake 转换表格到 Iceberg 和 Hudi 的配置。datasetconfig 属性通过 Python 字典文字反映数据集配置文件的结构:pythonfrom mwaapluginplugin import XtableOperatoroperator = XtableOperator( taskid=xtableTask datasetconfig={ sourceFormat DELTA targetFormats [ICEBERG HUDI] datasets [ { tableBasePath s3//datalake/sales tableName sales namespace table } ] })范例代码:完整的示例 XtableOperator 源码以及本文使用的其余代码可在此 [GitHub 仓库](https//githubcom/awssamples/apachextableonawssamples) 中找到。## 解决方案概述为了将自定义操作部署到 Amazon MWAA,我们需要将其与 DAG 一起上传到配置的 DAG 目录中。除了操作本身,我们还需要上传 XTable 的可执行 JAR。截至撰写本文时,该 JAR 需要用户从源码编译。为了简化这一过程,我们提供了一个基于容器的构建脚本。## 前提条件我们假设你至少拥有一个环境,该环境由 Amazon MWAA 本身、一个 S3 桶,以及一个对 Amazon MWAA 有读取该桶的权限的 [AWS 身份与访问管理](https//awsamazoncom/iam/)IAM角色组成,并且该角色可选择性地对 AWS Glue 数据目录具有写入访问权限。此外,你需要下列任一容器运行环境来运行提供的 XTable 构建脚本: Finch Docker### 构建和部署 XTableOperator要编译 XTable,你可以使用提供的构建脚本并按照以下步骤完成:1 从 GitHub 克隆示例代码: bash git clone https//githubcom/awssamples/apachextableonawssamplesgit cd apachextableonawssamples 2 运行构建脚本: bash /buildairflowoperatorsh 3 由于 Airflow 操作使用了函式库 [JPype](https//githubcom/jpypeproject/jpype) 来调用 XTable 的 JAR,因此请在 Amazon MWAA 的 requirementtxt 文件中添加依赖: plaintext JPype1==150 有关在 Amazon MWAA 上安装其他 Python 库的背景,请参见 [Installing Python dependencies](https//docsawsamazoncom/mwaa/latest/userguide/workingdagsdependencieshtml)。 因为 XTable 是基于 Java 的,所以在 Amazon MWAA 上需要 Java 11 执行时环境JRE。你可以使用 Amazon MWAA 启动脚本来安装 JRE。4 将以下行添加到现有启动脚本中,或创建一个新脚本,如本文的示例代码中提供的那样: bash if [[ {MWAAAIRFLOWCOMPONENT} != webserver ]] then sudo yum install y java11amazoncorrettoheadless fi 有关此机制的更多信息,请参见 [Using a startup script with Amazon MWAA](https//docsawsamazoncom/mwaa/latest/userguide/usingstartupscripthtml)。5 上传 xtableoperator/、requirementstxt、startupsh 及 airflowignore 至 S3 桶及各自的路径,Amazon MWAA 将从中读取文件。 确保 Amazon MWAA 的 IAM 角色拥有适当的读取权限。 针对自定义操作,需确保将本地文件夹 xtableoperator/ 和 [airflowignore](https//airflowapacheorg/docs/apacheairflow/stable/coreconcepts/dagshtml#airflowignore) 上传到配置的 DAG 目录中。  6 更新你的 Amazon MWAA 环境配置,如下所示并开始更新过程: 1 通过 Requirements file configuration option 添加或更新 requirementstxt 文件的 S3 URI。 2 通过 Startup script configuration option 添加或更新 startupsh 脚本的 S3 URI。 7 可选地,你可以将 AWS Glue 数据目录作为 Iceberg 目录使用。如果你创建了 Iceberg 元数据并希望在 AWS Glue 数据目录