使用Python的schedule轻松管理定时任务

使用Python的schedule轻松管理定时任务

计划任务不仅是编程中的常见特征,而且是高效企业的基石和自动化的基本组成部分。以业务的月度报告生成为例。每个月依靠某人手工撰写这份报告不仅容易出错,而且是一种不必要的负担。

通过设置在每个月末自动执行的任务,你可以简化流程并降低人为错误的风险。现在,你不用再为最后期限而汗流浃背,你可以一边喝着咖啡,一边放心地知道你的自动化系统正在处理繁重的工作。

生成报告只是一个例子,还有许多其他的操作通常是定期执行的。例如:

  • 备份数据库
  • 执行健康检查
  • 重新训练机器学习模型
  • 将数据从一个服务移动到另一个服务,例如从热存储移动到冷存储
  • 重新抓取网站以获取新信息
  • 发送电子邮件

在本文中,我将深入探讨Python中schedule包的使用,并解释它如何简化你的任务调度需求。如果你想了解更多关于Python的相关内容,可以阅读以下这些文章:
Meta正在做上帝的工作:向世界发布令人震惊的优秀编程模型!
畅销编程书籍中的10个编码秘密
Mojo:比Python快35000倍的AI编程语言
作为一个数据科学家/分析师,不要重复这5个编程错误

为什么不只使用time.sleep呢?

我们已经讨论了许多不同的任务,这些任务通常是定期执行的。现在,我们如何编写一个程序来定期执行一个任务呢?

最简单的方法是在while循环中使用内置的time.sleep方法。以下是一个示例:

import time

def perform_task():
    print("Performing task...")
    ...
    print("Task completed!")

while True:
    perform_task()
    print("Sleeping...")
    time.sleep(60)

输出:

Performing task...
Task completed!
Sleeping...
Performing task...
Task completed!
Sleeping...
...

程序立即执行任务,休眠60秒,然后重复该过程,简单。

有时这就是你所需要的,在这些情况下,我们不需要schedule或任何其他包。原因是time.sleep在这里的作用是,我们只需要控制任务相对于前一次运行的时间。在更复杂的场景中,我们将不得不添加更多的代码。以下是一些需要更多代码才能实现的功能示例:

  • 把任务安排在特定的时间、特定的日子、特定的工作日等等。
  • 并行安排多个任务
  • 取消任务
  • 以随机间隔运行任务
  • 列出计划任务
  • 在限定时间内运行任务

调试、测试和实现这些特性可能非常耗时,特别是如果你打算在将来扩展软件功能和复杂性。相反,使用经过实战测试且可读的库通常是一种更简单、更可靠的方法。

Schedule——用于人类的Python作业调度库

Schedule是一个Python包,用于简化在Python中描述和运行计划的任务。该软件包的核心是可读性和易用性。我们可以使用pip来安装它:

pip install schedule

为了说明使用起来有多简单,让我们以创建一个程序为例,该程序将在每周一的00:00执行,并在那时写入hello world:

import schedule
import time

def task():
    print("hello world!")

job = schedule.every().monday.at("00:00").do(task)

while True:
    print("checking")
    schedule.run_pending()
    time.sleep(1)

这将导致程序输出:

checking
checking
checking
...
hello world! <- at monday 00:00
...

描述schedule的关键表达式是:

schedule.every().monday.at("00:00").do(task)

正如你所看到的,即使你不懂Python或编程,也可以立即清楚任务将在何时运行,因为它读起来就像简单的英语。

你可能想知道为什么会有一个time.sleep在while循环中休眠。原因如下:调用schedule.run_pending()将检查一个作业是否应该运行,如果应该,则执行它。但如果没有任务,它会立即返回。为了防止while循环一遍又一遍地快速执行,从而占用大量CPU资源和时间。在两次循环判断之间用time.sleep引入1秒的延迟。你可以选择另一个睡眠时段,这将是进程可能偏离预定时间的最大时间错误。

更多的例子

让我们看几个例子,看看如何用schedule包来设置定期运行的任务。我假设每个例子都有一个while循环,就像上面那样。

每60秒运行一个脚本:

schedule.every(60).seconds.do(job)

每天13:37运行:

schedule.every().day.at("13:37").do(job)

每隔一周的周二下午13:00运行一次。

from datetime import datetime

def job():
    # get current week number
    current_week_number = datetime.now().isocalendar()[1]
    
    # do not run on odd numbered weeks
    # alternatively change to == 0
    if current_week_number % 2 == 1:
        return

    # perform job here...

schedule.every().tuesday.at("13:00").do(job)

每三小时运行一次:

schedule.every(3).hour.do(job)

每30 ~ 60秒运行一次(每次随机选择):

schedule.every(30).to(60).seconds.do(my_job)

每小时第五分钟运行一次;

schedule.every().hour.at(":5").do(job)

你可以使用这个包做更多的事情,我将为你提供一个使用schedule的有趣示例。有关其所有功能的更详细信息,请参考此处提供的文档。

快速捕获异常

想象一下,你有一个传感器,正在从某种现象中收集数据。该传感器具有很高的采样率,但存储单个值既昂贵又不必要。但有时,当发生一些奇怪的事情时,你希望更频繁地保存数据以调查该事件。

计划是这样的,数据以3.33 Hz的常规频率保存(或每隔0.3秒保存一次),但当检测到异常时,将采样频率增加到20hz(或相当于每0.05秒保存一次),然后恢复正常,我们在Python中使用schedule进行编码。

首先,让我们导入包:

import schedule
import matplotlib.pyplot as plt
from datetime import datetime
import time
import random
import pandas as pd
import logging
import sys

值得注意的是,除了schedule之外,我还使用了pandas和matplotlib。然后定义常数。它们用于决定我们采样的频率(根据采样之间的时间),采样的时间(总的和“快速采样”),以及如何定义异常。在这种情况下,异常被定义为偏离预期平均值0.15的平均值。

# how often to sample
NORMAL_SAMPLING_INTERVAL = 0.3
FAST_SAMPLING_INTERVAL = 0.05
ANOMALY_CHECK_SAMPLING_INTERVAL = 3

# how long to sample
FAST_SAMPLING_TIME = 10
STOP_IN_MINUTES = 1

# how to detect anomaly
DEVIATION_THRESHOLD = 0.15
LAST_N_VALUES = 10

然后我定义了一个基本的日志记录器,它将使所有日志看起来像这样:

[hh:mm] <message>

并将它们打印到stdout。

# logger "[00:00]"
logging.basicConfig(
level=logging.INFO,
stream=sys.stdout,
format="[%(asctime)s]: %(message)s",
datefmt="%M:%S" # minute:second
)

现在,让我们定义任务。首先,有一种方法可以生成带有时间戳的模拟数据,然后有一种处理方法可以简单地保存数据并记录数据。当它在“快速模式”下进行采样时,它将更改日志信息。

# method to generate from a "hypothetical" data stream
def get_latest_value():
return (datetime.now(), random.random())

def process(store, fast=False):
time, value = get_latest_value()
logging.info(show_sample_message(value, fast))
store.append(dict(time=time, value=value))

异常检查器是一个单独的任务,运行频率较低:

def check_for_anomaly(store):
    # take the last n values
    last_n_values = pd.DataFrame(store[-LAST_N_VALUES:])

    # no mean when zero length
    if last_n_values.shape[0] == 0:
        return

    mean = last_n_values["value"].mean()
    # minimum for check
    if len(last_n_values) < LAST_N_VALUES:
        deviation = None
    else:
        # deviation from mean
        deviation = abs(mean - 0.5)

    # log the check nicely
    logging.info(show_check_anomaly_message(mean, deviation, DEVIATION_THRESHOLD))
    if deviation is not None and deviation > DEVIATION_THRESHOLD:
        # if there is a deviation we:
        # 1. clear normal data processing task
        # 2. clear the current anomaly checker
        # 3. start a faster data processing task
        # 4. schedule a switch back to normal anomaly checking and sampling
    
        schedule.clear("data_processing")
        schedule.clear("anomaly_check")
        logging.info("Anomalous mean detected!")
        logging.info("Switching to faster sampling")
        (schedule
            .every(FAST_SAMPLING_INTERVAL)
            .seconds
            .do(process, store=store, fast=True)
            .tag("faster_data_processing"))
        # clear normal process
        # Schedule to switch back to normal processing after FAST_SAMPLING_TIME seconds
        schedule.every(FAST_SAMPLING_TIME).seconds.do(switch_to_normal, data=data)

这里的重要部分是,当发现异常时,将停止旧任务并启动新任务。为此,使用.tag()标记每个任务。这使我们能够稍后使用.clear()停止它。此外,为了切换回来,即停止我们添加的新任务并重新启动旧任务,我们有一个名为switch_to_normal的“运行一次”计划任务。请注意,尽管此任务运行一次,但它仍然使用.every()进行调度。这是因为要停止一个任务,你需要从任务中返回值schedule.CancelJob。也就是说:

 def my_task():
  ...
  return schedule.CancelJob

然后它就不会再运行了。下面是switch_to_normal的实现:

def switch_to_normal(data):
    schedule.clear("faster_data_processing")
    logging.info("Switching back to normal sampling")
    schedule_normal_job(data)
    schedule_anomaly_check_job(data)
    return schedule.CancelJob

我们所要做的就是清除更快的采样过程并重新安排采样过程。我们也不会忘记通过返回schedule.CancelJob来取消切换任务。

现在,让我们看一下主程序:

def schedule_normal_job(data):
    return schedule.every(NORMAL_SAMPLING_INTERVAL).seconds.do(process, store=data, fast=False).tag("data_processing")

def schedule_anomaly_check_job(data):
    return schedule.every(ANOMALY_CHECK_SAMPLING_INTERVAL).seconds.do(check_for_anomaly, store=data).tag("anomaly_check")

def finish():
    # as we clear it will only run once
    schedule.clear()

if __name__ == "__main__":
    data = []

    schedule_normal_job(data)
    schedule_anomaly_check_job(data)
    # clear all jobs after STOP_IN_MINUTES
    schedule.every(STOP_IN_MINUTES).minutes.do(finish)

    while True:
        schedule.run_pending()
        # if it's finished, break
        if len(schedule.get_jobs()) == 0:
            break
        # needs to be fast enough to not miss any scheduled jobs
        time.sleep(0.01)


    # plot
    srs = pd.DataFrame(data).set_index("time")
    srs.index = pd.to_datetime(srs.index)
    srs.plot()
    plt.show()

由于正常作业和异常检查被多次调度,因此将它们包装在一个方法中以避免代码重复。添加一个列表来保存数据。然后调度初始作业,包括使用schedule.clear()清除所有任务的完成作业,这将导致while循环中断,因为它检查有多少任务正在运行。值得注意的是,由于任务以高频率运行,这里添加了短睡眠时间。

最后,绘制数据。

在结果图中,我们可以看到在检测到异常平均值后,中间的高频采样:

结果图

如果你对此代码感兴趣,它们如下所示:

# format messages nicely
def show_check_anomaly_message(value, deviation, threshold):
    if deviation is None or deviation < threshold:
        # green checkmark
        status = "\033[92m✔\033[0m"
        msg = "OK"
    else:
        # red x
        status = "\033[91m✘\033[0m"
        msg = "ANOMALY"
    diff_string = f"{deviation:.2f}" if deviation is not None else "N/A"
    return f"{status}  {msg}  [MEAN:{value:.2f} - DEVIATION:{diff_string}]"

def show_sample_message(value, fast=False):
    if fast:
        # lightning icon for fast sampling
        msg = "\033[93m⚡\033[0m QUICK"
    else:
        # wave icon for normal sampling
        msg = "\033[96m🌊\033[0m NORMAL"
    return f"{msg} [VALUE:{value:.2f}]"

总结

Python中的schedule 库是一个方便的工具,用于管理程序中重复出现的任务。与cron表达式或从头开始使用time.sleep()相比,它提供了许多特性和一种干净、易读的方法。下次你需要在Python中自动执行任务时,请尝试一下schedule。

感谢阅读!你还可以订阅我们的YouTube频道,观看大量大数据行业相关公开课:https://www.youtube.com/channel/UCa8NLpvi70mHVsW4J_x9OeQ;在LinkedIn上关注我们,扩展你的人际网络!https://www.linkedin.com/company/dataapplab/

原文作者:Jacob Ferus
翻译作者:过儿
美工编辑:过儿
校对审稿:Chuang
原文链接:https://itnext.io/automate-everything-effortless-task-scheduling-with-pythons-schedule-package-f75b891bd39b