作者:Macey Neff,2024年2月21日,发布于 Amazon EMR、AWS Outposts、AWS Secrets Manager、最佳实践、计算、客户解决方案。
在本文中,您将学习如何在 AWS Outposts 上部署亚马逊 EMR 集群,并利用它处理来自本地数据库的数据。许多组织有规范、合同或企业政策要求在特定地理位置处理和存储数据。这样严格的要求使组织在寻求合规性与云服务灵活性之间找到平衡方案时面临挑战。借助 AWS Outposts 上的 Amazon EMR,您可以在本地环境中无缝处理数据,而无需将其迁移到云端。
在此架构中,AWS Outposts 子网中创建了一个 Amazon EMR 集群。该集群从本地 PostgreSQL 数据库中提取数据,使用 PySpark 进行数据处理,然后将结果存储在同一数据库的新表中。以下图展示了该架构。
本地网络流量: EMR 集群与本地 PostgreSQL 数据库之间的通信通过 Local Gateway 进行。EMR 集群的核心 Amazon Elastic Compute Cloud (Amazon EC2) 实例与 客户拥有的 IP 地址 (CoIP) 相关联,每个实例拥有两个 IP 地址:内部 IP 和 CoIP IP。内部 IP 用于与子网中的其他实例进行本地通信,而 CoIP IP 则用于与本地网络通信。
Amazon VPC 端点: Amazon EMR 通过一个接口 VPC 端点与 VPC 进行通信。这种通信是私有的,完全在 AWS 网络内部,而不需通过互联网。在此架构中,VPC 端点是在 AWS 区域 的子网中创建的。
用于创建 EMR 集群的支持文件存储在一个 Amazon Simple Storage Service (Amazon S3) 桶中。VPC 与 Amazon S3 之间的通信保持在 AWS 网络内。以下文件存储在此 S3 桶中:
文件名描述getpostgresqldriversh这是一个启动脚本,用于下载 PostgreSQL 驱动以允许 Spark 步骤通过 JDBC 与 PostgreSQL 数据库进行通信。您可以通过 此 GitHub 仓库 下载它。postgresql4260jarPostgreSQL 二进制 JAR 文件,用于 JDBC 驱动。sparkstepexamplepyPySpark 中的步骤应用示例,用以模拟与 PostgreSQL 数据库的连接。配置 AWS Systems Manager 来管理属于 EMR 集群的 EC2 实例,使用一个接口 VPC 端点允许 VPC 与 Systems Manager 之间进行私密通信。
连接 PostgreSQL 数据库的数据库凭证存储在 AWS Secrets Manager 中。Amazon EMR 与 Secrets Manager 集成,可以通过其 ARN 在集群配置中使用该密钥。在 EMR 集群创建期间,可以通过接口 VPC 端点私有访问密钥,并将其存储在 EMR 集群的变量 DBCONNECTION 中。
在本解决方案中,我们创建一个小型 EMR 集群,包含一个主节点和一个核心节点。如需正确计算您的集群大小,请参阅 评估 Amazon EMR 集群容量。
对于使用 AWS Control Tower 登陆区 和 AWS 组织 的组织,提供了额外的信息来增强安全性。文章 在 AWS Outposts rack 和登陆区保护下进行数据驻留架构设计 是一个很好的起点。
在 AWS Outposts 上部署 EMR 集群之前,您需要确保在您的 AWS 账户中创建并配置以下资源:
Outposts rack 已安装并正常运行。创建 Amazon EC2 密钥对。有关创建过程,请参考 使用 Amazon EC2 创建密钥对 中的说明。您可以使用 此 AWS CloudFormation 模板 来创建 EMR 集群的基础设施。有关创建堆栈的说明,请参阅 在 AWS CloudFormation 控制台上创建堆栈 的说明。
2 创建 EMR 集群要通过控制台启动一个已安装 Spark 的集群:
步骤 1:配置名称和应用程序
登录到 AWS 管理控制台,然后打开 Amazon EMR 控制台。在 EMR on EC2 下,在左侧导航窗格中选择 集群,然后选择 创建集群。在 创建集群 页面中,输入一个唯一的集群名称。对于 Amazon EMR 发行版,选择 emr6130。在 应用程序包 字段中,选择 Spark 341 和 Zeppelin 0101,并取消选择其他所有选项。对于 操作系统选项,选择 Amazon Linux 版本。步骤 2:选择集群配置方法
在 集群配置 中,选择 均匀实例组。对于 主节点 和 核心节点,选择 Outposts rack 中可用的且 由 EMR 集群支持的 EC2 实例类型。移除实例组 任务 1 的 1。步骤 3:设置集群缩放和配置、网络、集群终止
在 集群缩放和配置选项 中,选择 手动设置集群大小 并在 核心节点 字段中输入 1。在 网络 中,选择 VPC 和 Outposts 子网。对于 集群终止,选择 手动终止集群。步骤 4:配置引导操作
A 在 引导操作 中,添加一个操作,填写以下信息:
名称:copypostgresqldriversh脚本位置:s3// /copypostgresqldriversh。修改 ltbucketnamegt 变量为您在步骤 1 中指定的桶名称。步骤 5:配置集群日志和标签
a 在 集群日志 中,选择 将集群特定日志发布到 Amazon S3,并输入 s3// /logs 作为 Amazon S3 位置 字段。修改 ltbucketnamegt 变量为您在步骤 1 中指定的桶名称。
b 在 标签 中,添加新标签。您需要为 Key 字段输入 forusewithamazonemrmanagedpolicies,并在 Value 字段中输入 true。
步骤 6:设置软件设置和安全配置及 EC2 密钥对
a 在 软件设置 中,输入以下配置,替换步骤 1 中创建的 Secret ARN:
json[ { Classification sparkdefaults Properties { sparkdriverextraClassPath /opt/spark/postgresql/driver/postgresql4260jar sparkexecutorextraClassPath /opt/spark/postgresql/driver/postgresql4260jar EMRsecret@sparkyarnappMasterEnvDBCONNECTION arnawssecretsmanagerltregiongtltaccountidgtsecretltsecretnamegt } }]
这是假设替换了 Secret ARN 的示例:
b 对于 安全配置和 EC2 密钥对,选择 SSH 密钥对。
步骤 7:选择身份和访问管理 (IAM) 角色
a 在 身份和访问管理 (IAM) 角色 中:
在 Amazon EMR 服务角色下:选择 AmazonEMRoutpostsclusterrole 作为服务角色。在 EC2 实例配置文件 下:选择 AmazonEMRoutpostsEC2role。步骤 8:创建集群
选择 创建集群 启动集群,并打开集群详情页面。
此时 EMR 集群正在启动。当集群准备好处理任务时,其状态将更改为 等待中。这意味着集群已启动并随时准备接受工作。
3 为 EMR 核心节点添加 CoIP您需要从 CoIP 池中分配一个弹性 IP 并将其关联到 EMR 核心节点的 EC2 实例上。这是为了允许核心节点访问本地环境。分配弹性 IP 的说明请参阅 分配弹性 IP 地址 中的内容。在步骤 5 中,选择 客户拥有的 IPV4 地址池。
一旦成功分配 CoIP IP,将其与每个 EMR 核心节点的 EC2 实例关联。请遵循 将弹性 IP 地址与实例或网络接口关联 中的说明。
使用 Systems Manager 连接到核心节点 EC2 实例并 ping PostgreSQL 数据库的 IP 地址。
确保 EMR 集群的状态为 等待中。苹果加速器免费版您可以使用以下 Spark 应用程序来模拟从 PostgreSQL 数据库的数据处理。
sparkstepexamplepy:
pythonimport osfrom pysparksql import SparkSession
if name == main
# # 步骤 1:从 EMR 集群配置中获取数据库连接信息dbconnection = osenvironget(DBCONNECTION)# 去掉括号dbconnectioninfo = (dbconnection[11])split()# 初始化变量dbusername = dbpassword = dbhost = dbport = dbname = dburl = # 解析数据库连接信息for dbconnectionattribute in dbconnectioninfo (keydata keyvalue) = dbconnectionattributesplit( 1) if keydata == username dbusername = keyvalue elif keydata == password dbpassword = keyvalue elif keydata == host dbhost = keyvalue elif keydata == port dbport = keyvalue elif keydata == dbname dbname = keyvaluedburl = jdbcpostgresql// dbhost dbport / dbname# # 步骤 2:连接 PostgreSQL 数据库并选择数据sparkdb = SparkSessionbuilderconfig(sparkdriverextraClassPath /opt/spark/postgresql/driver/postgresql4260jar) appName(连接到 PostgreSQL) getOrCreate()# 连接到数据库datadb = sparkdbreadformat(jdbc) option(url dburl) option(driver orgpostgresqlDriver) option(query select count() from pgcatalogpgtables) option(user dbusername) option(password dbpassword) load()# # 步骤 3:进行数据处理## 待处理内容# # 步骤 4:将数据保存到 PostgreSQL 数据库中的新表datadbwrite format(jdbc) option(url dburl) option(dbtable resultsproc) option(user dbusername) option(password dbpassword) save()# # 步骤 5:关闭 Spark 会话sparkdbstop()#
您必须在提交 Spark 应用程序到 EMR 集群之前,将文件 sparkstepexamplepy 上传至步骤 1 中创建的桶。您可以在 此 GitHub 仓库 中获取该文件。
要将 Spark 应用程序提交到 EMR 集群,请按照 使用控制台提交 Spark 步骤的说明 中的内容操作。在 [这篇 Amazon EMR 指南](https//docsawsamazoncom/emr/latest/ReleaseGuide/emrsparksubmitstep