DataJob入门 — 在 AWS 上构建和部署无服务器数据管道

DataJob入门 — 在 AWS 上构建和部署无服务器数据管道

数据工程师的核心工作包括了构建、部署、运行和监控数据管道。

但在数据和 ML 工程领域工作时,缺少一种工具,用于简化在 AWS 服务(如 Glue 和 Sagemaker)上部署数据管道过程,以及轻松通过 Step Functions 编排数据管道步骤。所以, DataJob诞生了!

在本文中,我将向你演示如何安装 DataJob,通过简单示例给予指导,展示 DataJob 的某些功能。如果你想了解更多数据分析相关内容,可以阅读以下这些文章:
使用Amazon SageMaker和AWS Lambda进行无服务器管道自动化模型训练和部署
DS vs DE:数据科学家与数据工程师的薪资对比
Airflow 101: 隐藏小技巧帮你快速上手!
Amazon商业分析师面试指南

安装DataJob

你可以在 PyPI 中安装 DataJob。DataJob 通过 AWS CDK 配置 AWS 服务,所以一定要安装此服务。如果你想动手实践本文的这个示例,你需要一个 AWS 账户 。

pip install --upgrade pip
pip install datajob
# take latest of v1, there is no support for v2 yet
npm install -g aws-cdk@1.134.0

一个简单的示例

该工具包含一个简单的数据管道,由 2 个打印“Hello World”的任务组成,这些任务需要按顺序进行编排。DataJob将这些任务部署到 Glue,并通过阶梯函数(Step Functions)进行编排。

from aws_cdk import core

from datajob.datajob_stack import DataJobStack
from datajob.glue.glue_job import GlueJob
from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow

app = core.App()

# The datajob_stack is the instance that will result in a cloudformation stack.
# We inject the datajob_stack object through all the resources that we want to add.
with DataJobStack(scope=app, id="data-pipeline-simple") as datajob_stack:
    # We define 2 glue jobs with the relative path to the source code.
    task1 = GlueJob(
        datajob_stack=datajob_stack, name="task1", job_path="glue_jobs/task.py"
    )
    task2 = GlueJob(
        datajob_stack=datajob_stack, name="task2", job_path="glue_jobs/task2.py"
    )

    # We instantiate a step functions workflow and orchestrate the glue jobs.
    with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn:
        task1 >> task2

app.synth()

我们将上述代码添加至项目根目录下名为 datajob_stack.py 的文件中。此文件包含配置 AWS 服务、部署代码和运行数据管道所需内容。

接着,克隆(Clone)存储库,并回到本例中。

git clone https://github.com/vincentclaes/datajob.git
cd datajob/examples/data_pipeline_simple

配置 CDK

要配置 CDK,你需要 AWS 的凭据。如果你不知道如何配置 AWS 凭据,请按照以下步骤操作。

export AWS_PROFILE=default
# use the aws cli to get your account number
export AWS_ACCOUNT=$(aws sts get-caller-identity --query Account --output text --profile $AWS_PROFILE)
export AWS_DEFAULT_REGION=eu-west-1
# bootstrap aws account for your region
cdk bootstrap aws://$AWS_ACCOUNT/$AWS_DEFAULT_REGION
    ⏳  Bootstrapping environment aws://01234567890/eu-west-1...
    CDKToolkit: creating CloudFormation changeset...
    ✅  Environment aws://01234567890/eu-west-1 bootstrapped.

部署

使用包含代码的 Glue 任务,将编排 Glue 任务的 Step Functions 状态机创建 DataJob 堆栈。

cdk deploy --app  "python datajob_stack.py" --require-approval never
     data-pipeline-simple: deploying...
     [0%] start: Publishing     
     [50%] success: Published 
     [100%] success: Published 
     data-pipeline-simple: creating CloudFormation changeset...
     ✅  data-pipeline-simple

当 cdk deploy 完成时,准备执行已配置好的服务。

执行

datajob execute --state-machine data-pipeline-simple-workflow
    executing: data-pipeline-simple-workflow
    status: RUNNING
    view the execution on the AWS console:
       <here will be a link to see the step functions workflow>

触发编排数据管道的阶梯函数状态机(Step Functions state machine)

终端将显示阶梯函数网页链接,以跟进管道运行情况。如果单击此链接,你应该会看到如下内容:

删除

数据管道完成后,将其从 AWS 中删除。这样,你将拥有一个干净的 AWS 账户。

cdk destroy --app  "python datajob_stack.py"
    data-pipeline-simple: destroying...
    ✅  data-pipeline-simple: destroyed

DataJob 的一些功能

  • 1. 使用 Glue Pyspark 任务处理大数据
import pathlib

from aws_cdk import core

from datajob.datajob_stack import DataJobStack
from datajob.glue.glue_job import GlueJob
from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow

current_dir = str(pathlib.Path(__file__).parent.absolute())

app = core.App()

with DataJobStack(
    scope=app, id="datajob-python-pyspark", project_root=current_dir
) as datajob_stack:

    pyspark_job = GlueJob(
        datajob_stack=datajob_stack,
        name="pyspark-job",
        job_path="glue_job/glue_pyspark_example.py",
        job_type="glueetl",
        glue_version="2.0",  # we only support glue 2.0
        python_version="3",
        worker_type="Standard",  # options are Standard / G.1X / G.2X
        number_of_workers=1,
        arguments={
            "--source": f"s3://{datajob_stack.context.data_bucket_name}/raw/iris_dataset.csv",
            "--destination": f"s3://{datajob_stack.context.data_bucket_name}/target/pyspark_job/iris_dataset.parquet",
        },
    )

    with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn:
        pyspark_job >> ...
  • 2. 部署独立管道

在CDK中指定一个阶段作为上下文参数,部署一个独立管道。经常用到的有dev, prod,…

cdk deploy --app "python datajob_stack.py" --context stage=dev
  • 3. 并行编排 Step Function 任务

为了加速数据管道,您可能希望并行运行任务。DataJob可以帮助你执行此操作!我借用了 Airflow 的概念,您可以使用运算符 >> 协调不同的任务。

with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn:
    task1 >> task2
    task3 >> task4
    task2 >> task5
    task4 >> task5

DataJob 计算哪些任务可以并行运行,从而加快执行速度。

部署并触发后,你将在步骤函数执行中看到并行任务。

  • 4. 失败/成功时通知

在 StepfunctionsWorkflow 对象的构造函数中,提供带有电子邮件地址的参数notification。这会创建一个 SNS 主题,该主题将在失败或成功时触发,同时收件箱会收到通知。

with StepfunctionsWorkflow(datajob_stack=datajob_stack,
                           name="workflow",
                           notification="email@domain.com") as sfn:
    task1 >> task2
  • 5. 将项目打包为Wheel文件,并将其发送到 AWS

将项目及其所有依赖项发送到 Glue。通过在 DataJobStack 的构造函数中指定 project_root,DataJob 将在你的 project_root 的 dist/ 文件夹中查找Wheel文件(.whl 文件)

current_dir = str(pathlib.Path(__file__).parent.absolute())
with DataJobStack(
    scope=app, id="data-pipeline-pkg", project_root=current_dir
) as datajob_stack:
  • 6. 添加 Sagemaker,创建 ML Pipeline

通过 Glue、Sagemaker 和 Step 函数,可以查看 GitHub 存储库上端到端机器学习管道的新示例。

感谢你的阅读!你会尝试这个新工具吗?请在评论中告诉我您的想法。你还可以订阅我们的YouTube频道,观看大量数据科学相关公开课:https://www.youtube.com/channel/UCa8NLpvi70mHVsW4J_x9OeQ;在LinkedIn上关注我们,扩展你的人际网络!https://www.linkedin.com/company/dataapplab/

原文作者:Vicent Claes
翻译作者:Lia
美工编辑:过儿
校对审稿:Jiawei Tong
原文链接:https://towardsdatascience.com/datajob-build-and-deploy-a-serverless-data-pipeline-on-aws-18bcaddb6676