Airflow 101: 隐藏小技巧帮你快速上手!

Airflow 101: 隐藏小技巧帮你快速上手!

 Airflow 是一个开源的workflow automation框架,能让开发人员高效的创作、调度和监控数据管道(Data Pipeline)。Airflow相关经验是数据工程和数据科学家最需要的技术技能之一,在许多数据职位招聘信息中都被列为要求的技能。如果你想了解更多数据分析相关内容,可以阅读以下这些文章:
数据工程必备技能:什么是Apache Airflow?
四个数据科学求职者的常见失误
跟数据科学家相比,数据工程师更需要哪些技能?
工程师行业危机:35岁就该退休了吗?

1.目标受众

本文的目标受众是那些对 Airflow 有少量经验、或没有经验,并希望快速上手的人。

2. 背景

Airflow

  • Airflow 是一个开源的工作流管理平台,用于管理复杂的管道
  • 它始于 Airbnb,可用于通过DAG(有向无环图)管理和调度 ETL 管道
  • 其中, Airflow 管道是定义 DAG 的 Python 脚本构成的
  • 许多人把Airflow作为调度数据管道的Cron的替代品

Crom不再用于 ETL 调度

  • Cron 可用于调度 ETL 作业,但它不提供管理 ETL 工作组件之间的相关性或故障的机制
  • 为了说明这一点,下面是Airflow DAG简单实例图
  • 本图的DAG 包含 了7 个 Airflow 任务,用于展示披萨烘焙中的逻辑。
图源:https://tech.curama.jp/entry/2018/03/23/130000

关键特性:任务相关性

  • 这个例子可能有点无聊,但上面强调的重点是相关性逻辑(dependency logic)
  • 可以看到,只有在 add_cheese、add_ham、add_pineapple 和 add_mushroom 全部完成后,才能执行执行bake_pizza

数据操作友好型:监控并提醒你的数据管道

  • Airflow 的一大卖点是,它可以用来创建自动化的、受监控的数据管道
  • 例如,请参考REA 公司博客文章中的 Airflow DAG 图例
  • DAG 中的最后 2 个任务的触发条件是根据任务的输出结果 ,即data_is_fresh :
图源:https://www.rea-group.com/blog/watching-the-watcher/

如果数据不是最新的,则触发 data_freshness_failure_alert,会让一条消息发布到slack平台:

使用 PostToSlack 操作符可以通过 Airflow 将消息发布至 Slack

我的动力/动机

在我过去参与的五六个项目中,同样的问题一次又一次地浮现在我的脑海中——“如果数据管道出现故障,我们是否可以设置某种监控和提醒来通知我们?比如,通过 slack 或电子邮件?”

Airflow 通常被人们推荐为候选解决方案,但讨论也到此为止了。但是,想法是永远不会自己实现的。

所以我决定自己动手……

3. 入门

3.1. 托管程序(Managed Provider)VS.本地安装(Local Install):本地安装可快速帮助你进行实际操作

  • 有多个托管提供程序可以用于运行 Airflow,例如 Astronomer, Cloud Composer, AWS MWAA等
  • 但根据我的经验,我强烈建议只在本地设置 Airflow ,这样能更方便了解概念和常见设计

要避免白忙一场

  • 由于每个托管服务解决方案都存在着细微的差别,我发现自己在评估每个托管服务时经常偏离主题,从而陷入困境。在AWS MWAA中尤其如此。
  • 我讲这些事情的目的是,在开始使用该工具之前,没有人想在设置/配置上浪费太多时间。

3.2. 本地安装步骤

  • Airflow 的网站文档中说明了如何在本地运行 Airflow。本地安装包括以下 5 个步骤:
安装 Airflow l 的步骤

接下来:我将介绍一些核心的Airflow概念/理念。

4. Airflow概念

Airflow 中的主要概念要么会被描述为核心理念,要么被描述为附加功能。

4.1. 核心理念

  • 从根本上说,Airflow 可用于构建和运行工作流
  • 这些工作流表示为 DAG(有向无环图),其中包含各个工作组件,我们称之为任务(Tasks)

DAG(有向无环图)

DAG 是 Airflow 的核心概念,它将任务组合在一起,通过相关性和关系组织起来,用于说明如何运行各项任务。这是从 Airflow 的文档中提取的一个基本示例图:

这个 DAG示例图定义了四项任务(A、B、C 和 D),并规定了运行的顺序,并描述了任务相关性。同时,还需要定义DAG的运行频率,例如“从明天开始每 5 分钟一次”。

运算符

运算符定义了针对某项任务而执行的操作。Airflow 中两个捆绑运算符包括:

  • BashOperator:执行shell命令/脚本
  • PythonOperator:执行Python代码

如果你需要的操作符没有默认安装在 Airflow里面,你可以在庞大的社区 Provider Package中找到。有一些运算符非常受人们欢迎,包括:

  • 支持 DBMS 相关的操作:MySqlOperator / PostgresOperator / MsSqlOperator / OracleOperator
  • JdbcOperator
  • SlackAPIOperator
  • S3FileTransformOperator

4.2. 附加功能

除了Airflow 的核心对象之外,还有许多相应的功能用于支持某些操作。

连接(Connections)和挂钩(Hooks)

连接(Connections)

  • 连接本质上是一组参数(例如:用户名、密码、主机名),以及它连接到的系统类型和唯一名称(称为 conn_id)
  • 最终,这些参数会用于存储某外部平台的预设凭据(predefined credentials)
  • 例如,你可以创建一个连接,用来捕获用于连接至特点 AWS 账户的相关参数

挂钩(Hooks)

  • 挂钩(Hooks)是一个外部平台的高级接口,可让你无需编写访问API /使用特殊库的代码,就能快速轻松地与这些平台沟通
  • 挂钩(Hooks)与连接(Connections)集成,共同收集凭证
  • 你可以在 Airflow 的 API 文档中查看Airflow挂钩的完整列表

XComs

  • XComs 是一种让任务相互交流的机制,因为默认情况下,任务是相互隔离开的
  • 当试图在某个DAG中获取之前的任务输出时,XComs会非常有用。
  • 更多详细信息,请参阅:https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html

变量(Varibles)

  • 变量(Varibles)是一个通用的键/值存储(Key/value store),可从你的任务中查询,并可通过 Airflow 的用户界面或 CLI 轻松设置
  • 你只需在变量模型上导入并调用它,即可使用:
from airflow.models import Variable
# Normal call style
foo = Variable.get("foo")
# Auto-deserializes a JSON value
bar = Variable.get("bar", deserialize_json=True)
# Returns the value of default_var (None) if the variable is not set
baz = Variable.get("baz", default_var=None)

接下来,我将展示一些DAG示例 ,例如让 Airflow 和Slack 、MSSQL 交互。

5. DAG 示例

在你设置了 Airflow 环境(本地安装或托管提供程序都可)之后,你就可以继续创建你的Airflow 工作流程了。

在开始之前,我们可以查看一些 Airflow DAG示例,了解每个 DAG 所涉及的内容。在这篇文章中,我将简单介绍一下一下我整理的DAGs:

  • 1.样板(模板)DAG
  • 2.MSSQL 查询 DAG
  • 3.“发布至 Slack” DAG
  • 4.任务失败时的“发布至 Slack”DAG

现在,让我们从样板 DAG 开始。

5.1. 样板 DAG

下面显示的是我开发 Airflow DAG 的初步操作:

#!/usr/bin/env python3
import os
import logging
from time import time
from airflow import DAG
from airflow.utils.dates import days_ago
# from airflow.operators.python_operator import PythonOperator
# from airflow.models import Variable
# Set up a specific logger with our desired output level
logging.basicConfig(format='%(message)s')
logger = logging.getLogger('airflow.task')
logger.setLevel(logging.INFO)
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'start_date': days_ago(1)
}
def eg_python_function(**kwargs):
    # function logic here
    return
with DAG(
        dag_id=os.path.basename(__file__).replace(".py", ""),
        default_args=default_args,
        schedule_interval=None,
        tags=['template']
    ) as dag:
    # operators here, e.g.:
    """
    example_task = PythonOperator(
        task_id="eg_task", python_callable=eg_python_function, provide_context=True
    )
    """

这里需要为新手指出一些关键的部分:

  • 1. Python Airflow模块
  • 2. default_args(默认参数)
  • 3. Airflow任务(例如,example_task)

1. Python Airflow模块

你通常至少会使用到以下 Python Airflow 模块:

from airflow import DAG
from airflow.utils.dates import days_ago
# from airflow.operators.python_operator import PythonOperator
# from airflow.models import Variable

请注意,我把导入 PythonOperator 和 Airflow 变量模块的调用comment掉了,因为虽然这两者非常常见,但并非总是这么用(具体取决于你的用法)

2.default_args(默认参数)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'start_date': days_ago(1)
}
  • 为 Airflow DAG 指定默认参数是常见的做法,这里的关键是实际参数本身
  • 我经常使用上面这 5 个参数

3.Airflow任务(例如,example_task)

def eg_python_function(**kwargs):
    # function logic here
    return
...
    
    # operators here, e.g.:
    """
    example_task = PythonOperator(
        task_id="eg_task", python_callable=eg_python_function, provide_context=True
    )
    """
  • 在 Airflow DAG 的底部,你可以列出 你的Airflow 任务
  • Airflow 任务要使用某种操作符,例如:在这个例子中,操作符是PythonOperator
  • 该任务是调用 Python 函数,eg_python_function
  • 同样,值得指出的是参数 provide_context

5.2. 样板 MSSQL 查询 DAG

下面显示的是 Airflow DAG 执行 MSSQL DB 查询所需的骨架结构:

import os
import sys
import logging
import pymssql
from time import time
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
# Set up a specific logger with our desired output level
logging.basicConfig(format='%(message)s')
logger = logging.getLogger('airflow.task')
logger.setLevel(logging.INFO)
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'start_date': days_ago(1)
}
def mssql_query(**kwargs):
    """ generic mssql query function """
    START_TIME = time()
    logger.debug(f"Function called: mssql_query()")
    db_name, db_schema, ip_tbl_list, db_host, username, password = get_user_ips()
    # initialise the var
    query_result_to_return = ''
    sql_query = f"SELECT * FROM {db_name}.{db_schema}.{ip_tbl_list}"
    try:
        conn = pymssql.connect(
            server=f'{db_host}',
            user=f'{username}',
            password=f'{password}',
            database=f'{db_name}'
        )
        
        # Create a cursor from the connection
        with conn.cursor() as cursor:
            logger.info(f"running query: {sql_query}",)
            cursor.execute(sql_query)
            query_result = cursor.fetchall()
            for item in query_result:
                query_result_to_return += f"{item[1]}\n"
        logger.info("finished SQL query")
    except:
        logger.error(f"Error when creating pymssql database connection: {sys.exc_info()[0]}", )
    logger.debug(f"Function finished: main() finished in {round(time() - START_TIME, 2)} seconds")
    return query_result_to_return
def get_user_ips():
    """ fetch user inputs """
    START_TIME = time()
    logger.debug(f"Function called: get_user_ips()")
    db_name = Variable.get("db_name")
    db_schema = Variable.get("db_schema")
    ip_tbl_list = Variable.get("ip_tbl_list")
    db_host = Variable.get("host")
    username = Variable.get("username")
    password = Variable.get("password")
    logger.debug(f"Function finished: get_user_ips() finished in {round(time() - START_TIME, 2)} seconds")
    return db_name, db_schema, ip_tbl_list, db_host, username, password
with DAG(
        dag_id=os.path.basename(__file__).replace(".py", ""),
        default_args=default_args,
        schedule_interval=None,
        tags=['python','template']
) as dag:
    mssql_select_all_query = PythonOperator(
        task_id='mssql_select_all_query',
        python_callable=mssql_query
    )

其中需要注意的重要事项为:

1. 任务:mssql_select_all_query

上述 DAG 的关键是在任务 mssql_select_all_query 中执行(显示在脚本底部,如下所示),其中,脚本使用了 Python 模块 pymssql 执行 MSSQL 查询。

mssql_select_all_query = PythonOperator(
    task_id='mssql_select_all_query',
    python_callable=mssql_query
)

该任务简单使用了 PythonOperator 去调用 Python 函数 mssql_query(代码如下所示)

def mssql_query(**kwargs):
    """ generic mssql query function """
    START_TIME = time()
    ...

2. DAG 脚本与用于 Python mssql_query() 函数的代码非常相似

  • Python 函数 mssql_query() 使用的语法和在普通 Python 脚本中使用 Python 模块 pymssql 时使用的语法相同
  • 我展示这一点的目的,只是为了强调在 Airflow 中是否使用现有的 Python 脚本根本不重要

3. Python Airflow模块

请注意脚本顶部使用的 2 个 Airflow 模块:

from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable

正如在第一个样板 dag 的注释中所提到的,这两个模块很常用,但在该脚本中的注释为占位符。而在这里,确实使用了这两个模块!

特别值得指出的是,变量模块已用于检索数据库凭据,这些凭据可用于查询。

5.2. “发布到 Slack” DAG

这是一个非常简单/精简的脚本,用于将所需内容发布到 slack。例如:

import os
import logging
from airflow.models.dag import DAG
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
# Set up a specific logger with our desired output level
logging.basicConfig(format='%(message)s')
logger = logging.getLogger('airflow.task')
logger.setLevel(logging.INFO)
slack_token = Variable.get("slack_token")
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'start_date': days_ago(1)
}
with DAG(
        dag_id=os.path.basename(__file__).replace(".py", ""),
        default_args=default_args,
        schedule_interval=None,
        tags=['python','template']
    ) as dag:
    slack_test = SlackWebhookOperator(
        task_id='slack_message',
        http_conn_id='slack_connection',
        webhook_token=f'{slack_token}',
        message='Hello, World!',
        channel='#airflow-integration'
    )

1. SlackWebhookOperator

要在Slack上发布消息,你需要使用 Slack Webhook。此外,你还需要使用 Python Airflow 运算符—SlackWebhookOperator,在脚本顶部导入:

from airflow.contrib.operators.slack_webhook_operator 
import SlackWebhookOperator

2. slack_token

  • 重要的是,要在Slack上发布消息,你需要一个 slack token。其中有很多步骤/细微差别,例如,你需要在 Slack 中为你的公司创建一个“应用程序”,然后创建一个 webhook 等。
Host: https://hooks.slack.com/services/
Conn Type: HTTP
Password: /T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX

3. 用于Airflow连接的密码字符串

  • 你的 Slack webhook URL 会类似于://hooks.slack.com/services//T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX
  • 你的密码将跟在 https://hooks.slack.com/services 之后
  • 重要提示:这个值就是你的 slack token 。因此,你会在 DAG 脚本中用到这个token。我会把值存在一个变量中:slack_token = Variable.get(“slack_token”)

5.3. 任务失败时的“发布到 Slack”DAG

之前的 DAG 只是演示了如何从将Airflow 的消息发送至 Slack,而下面的 DAG 则包含Airflow 任务失败发送消息至 Slack 的功能。

下面的 DAG 是一个简单的例子,如果函数 read_op() 中变量 i 的值为 1,则调用任务失败。

当任务失败时,会向 Slack 发布一条消息:

import os
import logging
from time import time
from airflow import DAG
from airflow import AirflowException
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.models import Variable
slack_token = Variable.get("slack_token")
def gen_op():
    """ generate sample op """
    START_TIME = time()
    logger.debug(f"Function called: get_user_ips()")
    # initialise the list
    eg_op = []
    for i in range(0,4):
        eg_op.append(i)
    logger.info(f"eg_op = {eg_op}")
    logger.debug(f"Function finished: gen_op() finished in {round(time() - START_TIME, 2)} seconds")
    return eg_op
def read_op(**kwargs):
    """ read in sample op from subsequent step """
    START_TIME = time()
    logger.debug(f"Function called: get_user_ips()")
    ti = kwargs['ti']
    ip = ti.xcom_pull(task_ids='gen_op_eg')
    for i in ip:
        logger.info(f"i = {i}")
        if i == 1:
            raise AirflowException(f"Example task error. i = 1")
    logger.debug(f"Function finished: read_op() finished in {round(time() - START_TIME, 2)} seconds")
    return
def slack_failure_msg(context):
    """ post message to slack on failure """
    failure_alert = SlackWebhookOperator(
        task_id='slack_failure_msg',
        http_conn_id='slack_connection',
        webhook_token=f'{slack_token}',
        channel='#airflow-integration',
        message = """
        :red_circle: Airflow Task Failed.
        *Dag*:\t\t\t\t\t\t {dag}
        *Task*:\t\t\t\t\t\t{task}
        *Execution Time*:\t {exec_date}
        *Log Url*:\t\t\t\t\t{log_url}
        """.format(
            task=context.get('task_instance').task_id,
            dag=context.get('task_instance').dag_id,
            ti=context.get('task_instance'),
            exec_date=context.get('execution_date').astimezone().strftime('%Y-%m-%d %H:%M:%S %p'),
            log_url=context.get('task_instance').log_url,
        )
    )
    return failure_alert.execute(context=context)
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'start_date': days_ago(1),
    'on_failure_callback': slack_failure_msg
}
with DAG(
        dag_id=os.path.basename(__file__).replace(".py", ""),
        default_args=default_args,
        schedule_interval=None,
        tags=['template', 'python']
    ) as dag:
    gen_op_tsk = PythonOperator(
        task_id="gen_op_eg", python_callable=gen_op, provide_context=True
    )
    read_op_tsk = PythonOperator(
        task_id="read_op_in_sub_task", python_callable=read_op, provide_context=True
    )
gen_op_tsk >> read_op_tsk

上面有一些关键事项:

1. on_failure_callback

这个脚本中要指出的关键是一个任务选项,on_failure_callback

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'start_date': days_ago(1),
    'on_failure_callback': slack_failure_msg
}

该默认选项表示,“如果 Airflow 任务失败,则调用 Python 函数 slack_failure_msg”(如下所示)

def slack_failure_msg(context):
    """ post message to slack on failure """
    failure_alert = SlackWebhookOperator(
        task_id='slack_failure_msg',
        http_conn_id='slack_connection',
        webhook_token=f'{slack_token}',
        channel='#airflow-integration',
        message = """
        :red_circle: Airflow Task Failed.
        *Dag*:\t\t\t\t\t\t {dag}
        *Task*:\t\t\t\t\t\t{task}
        *Execution Time*:\t {exec_date}
        *Log Url*:\t\t\t\t\t{log_url}
        """.format(
            task=context.get('task_instance').task_id,
            dag=context.get('task_instance').dag_id,
            ti=context.get('task_instance'),
            exec_date=context.get('execution_date').astimezone().strftime('%Y-%m-%d %H:%M:%S %p'),
            log_url=context.get('task_instance').log_url,
        )
    )
    return failure_alert.execute(context=context)

2. 返回 failure_alert.execute(context=context)

这与最初的“post to DAG”脚本之间的另一个区别在于返回的对象。可以看到,在调用 Python 函数时,就会执行 Airflow failure_alert。最终发布类似于以下内容的帖子:

6. Airflow CLI

Airflow 的 CLI 可以让操作在UI上通过终端执行。这对于编写脚本/自动化活动非常有用。下表描述了一些我觉得很有用的 CLI 命令示例:

7. 注意事项、建议和一些提示

1. Airrflow 2.0 的变化——Airflow 核心和提供程序

在 Airflow 版本 1 中,有一个“统一”包,用于构建外部服务的导入模块,例如AWS、GCP 等。

但是,在版本 2 更改中,Airflow 的程序包结构(package structure)重新成为设计的核心,目前为止,有61个不同的程序包。每个程序包能用于某个外部服务(Google、Amazon、Microsoft、Snowflake)、数据库(Postgres、MySQL)或协议(HTTP/FTP)

因此,我们在利用外部服务库提出的在线解决方案需要特别注意,并注意它们所针对的 Airflow 版本。

2. 在线提出解决方案时,注意所用的Airflow版本

  • 刚开始开发 DAG 时,由于旧版 Airflow API 的一些功能已弃用,我遇到了许多问题
  • 特别是,许多在线解决方案的案例都引用了不具备旧版本兼容性的功能
  • 但Airflow DAG有一点很好,那就是基本上不再使用 Airflow 连接—S3,而是使用“AWS”连接

从 S3 → AWS 连接类型的转变

尽管现在普遍改用 AWS 连接,但很多 Airflow Hooks 还在用 S3 连接。有大量挂钩尚未更新,提供断回兼容性(break-back compatibility),例如 MSSQL 插件操作符 mssql_to_s3_operator

同样,这会导致 redshift_to_s3_operator 出现问题

所以我的建议是,注意在线解决方案的日期/最新更新日期。

2. XComs

最初研究 Airflow 时,我们并没有真正关注 XComs 的功能。

但结果是,我经常将XComs用于任务之间的传递输出,并且经常这样做。

下面显示的是上面列出的 DAG 的片段,“在任务失败时发布到 Slack DAG”,这个过程就用到了 XComs:

def gen_op():
    ...
    eg_op = []
    for i in range(0,4):
        eg_op.append(i)
    return eg_op
def read_op(**kwargs):
    ...
    ti = kwargs['ti']
    ip = ti.xcom_pull(task_ids='gen_op_eg')
    for i in ip:
        logger.info(f"i = {i}")
        if i == 1:
            raise AirflowException(f"Example task error. i = 1")

作为初步操作和示例,Python 函数返回的输出(即上面代码中的 return eg_op)将用作 XCom 值示例,在这个过程中,Airflow 任务中将调用该函数。

在后续的 Python 函数中,需要执行以下操作,读取 Airflow 任务的输出:

  • 在函数定义中传入 **kwargs,将未指定数量的参数传入函数
  • 创建任务变量(命名规则通常为,ti)
  • 读取特定任务的输出,使用以下命令:ip = ti.xcom_pull(task_ids='[name of task]’)

3. 常用Airflow配置选项

你可以用 airflow.cfg 文件配置你的 Airflow 环境,该文件位于 ${AIRFLOW_HOME}/airflow.cfg 中。下面这些参数很有用:

通用Airflow配置选项

4. 时区感知 DAG

你可以在Airflow配置中设置默认时区。但是,如需在 DAG 代码中使用本地时区,你需要了解 DAG 时区。

Airflow 是一个调度程序,由于“下一个执行日期”和“开始日期”这些选项很重要,DAG 中的代码会使用 UTC 时区运行。要使你的DAG感知时区,你需要使用 Python 钟摆模块(pendulum module),它能提供start_date的时区感知。请参阅以下示例:

import pendulum
local_tz = pendulum.timezone("Europe/Amsterdam")
default_args=dict(
    start_date=datetime(2016, 1, 1, tzinfo=local_tz),
    owner='airflow'
)
dag = DAG('my_tz_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)
print(dag.timezone) # <Timezone [Europe/Amsterdam]>

5. 一些提示

1. Airflow:非常冷门的提示、技巧和最佳实践

* 使用默认参数避免重复

* 使用 PythonOperator 时的,在连接中存储敏感数据以及 “context”字典

2. Airflow注意事项

* “不要更改 start_date + interval”

* “开发过程中刷新 DAG”,“不要忘记启动调度程序”

不更改 start_date 和 interval ,对于新手来说尤其重要。

3. Airflow提示、技巧和注意事项

我建议你在构建第一个 DAG 并掌握主要概念以后阅读本文。本文提供了一些有用的概念,如任务状态回调(Tasks States Callbacks),并提供了关于 UTC 日期问题的描述,都非常有用。

感谢阅读!你还可以订阅我们的YouTube频道,观看大量数据科学相关公开课:https://www.youtube.com/channel/UCa8NLpvi70mHVsW4J_x9OeQ;在LinkedIn上关注我们,扩展你的人际网络!https://www.linkedin.com/company/dataapplab/

原文作者:Paul Fry
翻译作者:Lia
美工编辑:过儿
校对审稿:Jiawei Tong
原文链接:https://paulfry999.medium.com/airflow-101-hints-and-tips-to-quickly-get-started-b48fe8948602