admin 管理员组

文章数量: 1184232

schedule是一个轻量级的Python定时任务库,语法简洁易懂,适合处理简单的定时任务需求。

该模块的设计理念是"human-friendly job scheduling",即对人类友好的任务调度,让开发者能够像读英语一样轻松地编写定时任务。

schedule模块本身代码量很小,依赖关系简单,不会给项目带来额外的负担。这对于那些不想引入重量级任务队列系统的轻量级应用来说非常合适。

安装

pip install schedule

基本用法

schedule模块通过job对象来表示一个定时任务。每个job对象都包含了任务的执行时间规则和要执行的具体操作。创建job的方式非常直观,通过schedule.every()方法开始定义。

1.简单示例

import schedule
import time

def job():
    print("Hello, World!")

def greet(name):
    print(f"Hello, {name}!")
	
# 每10秒执行一次
schedule.every(10).seconds.do(job)

# 每隔2分钟执行
schedule.every(2).minutes.do(job)

# 每隔3小时执行
schedule.every(3).hours.do(job)

# 每隔5天执行
schedule.every(5).days.do(job)

# 每隔1周执行
schedule.every().week.do(job)

# 每天上午10:30执行
schedule.every().day.at("10:30").do(job)

# 每周一上午9:00执行
schedule.every().monday.at("09:00").do(job)


# 每月1号执行(通过每天检查日期实现)
def monthly_job():
    import datetime
    if datetime.datetime.now().day == 1:
        print("每月1号执行的任务")

schedule.every().day.at("00:00").do(monthly_job)


# 传递参数
schedule.every(5).seconds.do(greet, "Alice")

while True:
    schedule.run_pending()
    time.sleep(1)

上面代码即说明了schedule的常见用法,但是使用过程中需要注意以下事项:

(1)阻塞问题: schedule是同步的,长时间运行的任务会阻塞其他任务

(2)精度限制: 依赖于time.sleep(),精度有限

(3)进程管理: 需要在主进程中持续运行

(4)异常处理: 需要手动处理任务中的异常

2.应用示例

"""
调度模块schedule
schedule可以定点定时执行,但是仍然需要while Ture配合,而且占用内存大。
"""
import schedule
import time
import threading
import datetime


def scheduled_task():
    """
    定时执行的任务函数
    """
    print(f"任务执行时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print("执行定时任务...")


def run_scheduler():
    """
    运行调度器
    """
    while True:
        schedule.run_pending()
        time.sleep(1)


if __name__ == "__main__":
    print("定时任务启动...")

    # 设置定时任务
    schedule.every(3).seconds.do(scheduled_task)

    # 在后台线程中运行调度器
    scheduler_thread = threading.Thread(target=run_scheduler, daemon=True)
    scheduler_thread.start()

    # 主程序可以继续做其他事情
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("程序退出")

3.问题说明

(1)time.sleep(1)问题

while True:
    schedule.run_pending()
    time.sleep(1)

run_scheduler运行调度器,需要重复执行time.sleep(1),原因:

防止CPU资源浪费:

  • schedule.run_pending()函数会检查是否有待执行的任务,如果有就执行,这个过程非常快速
  • 如果没有time.sleep(1),这个while True循环会持续不断地运行,导致CPU占用率飙升
  • time.sleep(1)让程序每次循环后暂停1秒钟,释放CPU资源给其他进程

控制检查频率:

  • time.sleep(1)将检查待执行任务的频率限制为每秒一次
  • 对于大多数定时任务场景来说,每秒检查一次已经足够了
  • 这样既保证了任务能及时执行,又不会过度消耗系统资源

避免忙等待(Busy Waiting):

  • 没有sleep的无限循环是一种忙等待状态,会持续占用CPU周期
  • 添加sleep后,程序进入等待状态,不占用CPU资源

总结来说,是为了在保证定时任务正常执行的前提下,避免过度消耗CPU资源,是一种标准的调度器实现方式。

(2)主线程的主循环问题

# 主程序可以继续做其他事情
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("程序退出")

这段代码是主线程的主循环,它的作用是保持主线程运行,防止程序退出

调度器是在一个后台线程(daemon thread)中运行的,当主线程结束时,所有后台线程会被强制终止。如果没有主线程的主循环,主线程会执行完所有代码后立即结束,整个程序退出,定时任务无法继续执行

4.项目级应用

from fastapi import FastAPI
import schedule
import time
import threading
import datetime
import uvicorn

app = FastAPI()


def scheduled_task():
    """
    定时执行的任务函数
    """
    global task_status
    current_time = datetime.datetime.now()
    print(f"任务执行时间: {current_time.strftime('%Y-%m-%d %H:%M:%S')}")
    print("执行定时任务...")


def run_scheduler():
    """
    运行调度器
    """
    while True:
        schedule.run_pending()
        time.sleep(1)


# 应用启动时执行
@app.on_event("startup")
async def startup_event():
    # 设置定时任务 - 每3秒执行一次
    schedule.every(3).seconds.do(scheduled_task)

    # 在后台线程中运行调度器
    scheduler_thread = threading.Thread(target=run_scheduler, daemon=True)
    scheduler_thread.start()
    print("定时任务调度器已启动...")


# 应用关闭时执行
@app.on_event("shutdown")
async def shutdown_event():
    print("应用正在关闭,定时任务已停止...")


# 实际接口
@app.get("/xxx")
async def xxx():
    return {"code": 200, "data": ""}


if __name__ == "__main__":
    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=False)

上述代码中直接集成了uvicorn.run()来启动FastAPI应用,应用会自动在0.0.0.0:8000上运行,定时任务仍然会在后台持续运行。运行时将“main:app”改成自己项目启动的文件名。

但是,还存在一些问题,以下是3个问题点:

(1)还缺少定时任务执行状态的监控和输出。

(2)且项目停止时没有做内存清理。

(3)另外项目中改用lifespan监听方式更优雅,lifespan参数是FastAPI中用于管理应用程序生命周期的现代方式。它替代了旧版本中的@app.on_event("startup")和@app.on_event("shutdown")装饰器方法,且on_event 装饰器方法已被标记为弃用。

from fastapi import FastAPI
import schedule
import time
import threading
import datetime
import uvicorn
from contextlib import asynccontextmanager

# 全局变量用于存储任务执行状态
task_status = {
    "last_execution": None,
    "execution_count": 0,
    "is_running": False
}

# 全局调度器线程变量
scheduler_thread = None


def scheduled_task():
    """
    定时执行的任务函数
    """
    global task_status
    current_time = datetime.datetime.now()
    task_status["last_execution"] = current_time.strftime('%Y-%m-%d %H:%M:%S')
    task_status["execution_count"] += 1
    task_status["is_running"] = True

    print(f"任务执行时间: {task_status['last_execution']}")
    time.sleep(1)  # 假定为定时任务耗时逻辑
    print(f"任务执行完成,当前总执行次数: {task_status['execution_count']}")


def run_scheduler():
    """
    运行调度器
    """
    while True:
        schedule.run_pending()
        time.sleep(1)


@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动时执行
    global scheduler_thread
    # 设置定时任务 - 每5秒执行一次
    schedule.every(5).seconds.do(scheduled_task)

    # 在后台线程中运行调度器
    scheduler_thread = threading.Thread(target=run_scheduler, daemon=True)
    scheduler_thread.start()
    task_status["is_running"] = True
    task_status["last_execution"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print("定时任务调度器已启动...")

    yield  # 应用程序运行期间

    # 关闭时执行 - 清理资源
    print("正在关闭应用,开始清理资源...")

    # 清理 schedule 中的所有任务
    schedule.clear()
    print("已清理所有定时任务")

    # 更新任务状态
    task_status["is_running"] = False
    print("应用正在关闭,定时任务已停止...")

    # 注意:由于 scheduler_thread 是守护线程(daemon=True),它会在主线程结束时自动终止
    # 如果需要更精确的控制,可以考虑使用线程停止标志


# 使用 lifespan 参数创建 FastAPI 应用
app = FastAPI(lifespan=lifespan)


if __name__ == "__main__":
    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=False)

yield说明

yield是 lifespan 上下文管理器的关键部分,它将函数执行分为三个阶段。

(1)yield 之前的部分 - 启动阶段

这部分代码在 FastAPI 应用启动时执行,用于初始化资源和启动服务。

(2)yield - 应用程序运行阶段

  • 执行到这里时,将控制权交还给 FastAPI 框架
  • FastAPI 应用开始正常运行,处理 HTTP 请求
  • 应用会一直运行在这个 yield 点,直到收到关闭信号
  • 这期间定时任务在后台线程中持续执行

(3) yield 之后的部分 - 关闭阶段

这部分代码在应用关闭时执行,用于清理资源和执行关闭操作。

  • FastAPI 应用启动时,调用 lifespan 函数
  • 执行 yield 之前的代码(初始化定时任务和启动调度器)
  • 执行到 yield,应用开始正常运行并处理请求
  • 应用持续运行,定时任务在后台执行
  • 当应用收到关闭信号时,继续执行 yield 之后的代码(清理工作)
  • 应用完全关闭

通过 yield 清晰地分隔不同阶段,使得启动和关闭逻辑在同一个函数中更容易维护,启动阶段定义的变量可以在关闭阶段使用,可以在一个地方处理整个生命周期的异常

内存清理

使用 schedule.clear() 方法清除所有已注册的定时任务,释放相关资源。

5.异步方式项目级应用

当前代码还有一个问题,schedule.every(5).seconds.do(scheduled_task)让定时任务5秒执行一次,但是若定时任务本身需要耗时2秒,实际就是7秒执行一次!!!这不就和最初的设计理念不符了?改用异步方式即可解决。

from fastapi import FastAPI
import schedule
import time
import threading
import datetime
import uvicorn
import asyncio
from contextlib import asynccontextmanager


# 全局变量用于存储任务执行状态
task_status = {
    "last_execution": None,
    "execution_count": 0,
    "is_running": False
}

# 全局调度器线程变量
scheduler_thread = None
# 事件循环
event_loop = None


async def async_task_logic():
    """
    异步任务逻辑
    """
    # 模拟耗时的异步操作
    await asyncio.sleep(1)  # 异步等待1秒,不阻塞事件循环
    print(f"异步任务执行完成,当前总执行次数: {task_status['execution_count']}")


def scheduled_task():
    """
    定时执行的任务函数 - 以异步方式执行
    """
    global task_status, event_loop
    current_time = datetime.datetime.now()
    task_status["last_execution"] = current_time.strftime('%Y-%m-%d %H:%M:%S')
    task_status["execution_count"] += 1
    task_status["is_running"] = True

    print(f"任务触发时间: {task_status['last_execution']}")
    print(f"任务开始异步执行,当前总执行次数: {task_status['execution_count']}")

    # 将异步任务提交到事件循环中执行,不阻塞当前线程
    if event_loop:
        asyncio.run_coroutine_threadsafe(async_task_logic(), event_loop)


def run_scheduler():
    """
    运行调度器
    """
    while True:
        schedule.run_pending()
        time.sleep(1)


@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动时执行
    global scheduler_thread, event_loop
    # 保存当前事件循环引用
    event_loop = asyncio.get_running_loop()

    # 设置定时任务 - 每5秒执行一次
    schedule.every(5).seconds.do(scheduled_task)

    # 在后台线程中运行调度器
    scheduler_thread = threading.Thread(target=run_scheduler, daemon=True)
    scheduler_thread.start()
    task_status["is_running"] = True
    task_status["last_execution"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print("定时任务调度器已启动...")

    yield  # 应用程序运行期间

    # 关闭时执行 - 清理资源
    print("正在关闭应用,开始清理资源...")

    # 清理 schedule 中的所有任务
    schedule.clear()
    print("已清理所有定时任务")

    # 更新任务状态
    task_status["is_running"] = False
    print("应用正在关闭,定时任务已停止...")



# 使用 lifespan 参数创建 FastAPI 应用
app = FastAPI(lifespan=lifespan)

if __name__ == "__main__":
    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=False)

继续优化

上面示例中用过的是这种手动处理事件循环的创建和管理的方式,会复杂和麻烦一点。

# 手动方式(不推荐)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(async_task_logic())
loop.close()

asyncio模块还可以自动创建事件循环,直接用使用 asyncio.run() 可以省去手动创建和管理事件循环的步骤。

asyncio.run() 是 Python 3.7+ 引入的一个高级接口,它会自动创建一个新的事件循环,运行指定的协程,并在完成后关闭事件循环。

优化后的代码:

from fastapi import FastAPI
import schedule
import time
import threading
import datetime
import uvicorn
import asyncio
from contextlib import asynccontextmanager


# 全局变量用于存储任务执行状态
task_status = {
    "last_execution": None,
    "execution_count": 0,
    "is_running": False
}

# 全局调度器线程变量
scheduler_thread = None


async def async_task_logic():
    """
    异步任务逻辑
    """
    # 模拟耗时的异步操作
    await asyncio.sleep(1)  # 异步等待1秒,不阻塞事件循环
    print(f"异步任务执行完成,当前总执行次数: {task_status['execution_count']}")


def scheduled_task():
    """
    定时执行的任务函数 - 以异步方式执行
    """
    global task_status
    current_time = datetime.datetime.now()
    task_status["last_execution"] = current_time.strftime('%Y-%m-%d %H:%M:%S')
    task_status["execution_count"] += 1
    task_status["is_running"] = True

    print(f"任务触发时间: {task_status['last_execution']}")
    print(f"任务开始异步执行,当前总执行次数: {task_status['execution_count']}")

    asyncio.run(async_task_logic())


def run_scheduler():
    """
    运行调度器
    """
    while True:
        schedule.run_pending()
        time.sleep(1)


@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动时执行
    global scheduler_thread

    # 设置定时任务 - 每5秒执行一次
    schedule.every(5).seconds.do(scheduled_task)

    # 在后台线程中运行调度器
    scheduler_thread = threading.Thread(target=run_scheduler, daemon=True)
    scheduler_thread.start()
    task_status["is_running"] = True
    task_status["last_execution"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print("定时任务调度器已启动...")

    yield  # 应用程序运行期间

    # 关闭时执行 - 清理资源
    print("正在关闭应用,开始清理资源...")

    # 清理 schedule 中的所有任务
    schedule.clear()
    print("已清理所有定时任务")

    # 更新任务状态
    task_status["is_running"] = False
    print("应用正在关闭,定时任务已停止...")



# 使用 lifespan 参数创建 FastAPI 应用
app = FastAPI(lifespan=lifespan)

if __name__ == "__main__":
    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=False)

asyncio.run() 会自动处理这些步骤,使代码更简洁、更不容易出错,例如忘记关闭事件循环。

不过需要注意的是,asyncio.run() 只能在没有正在运行的事件循环时使用,也就是说它通常用于程序的主入口点。在已有事件循环的环境中(如 Jupyter Notebook 或某些 Web 框架中),可能需要使用 await 或 asyncio.create_task() 等其他方式。
以上代码场景中,由于定时任务是在同步函数中触发的,并且是通过 schedule 库调度的,使用 asyncio.run() 是一个合适的解决方案。


优缺点

优点

1.语法简洁易读:

  • 使用自然语言风格的API,如schedule.every(3).seconds.do(scheduled_task),易于理解和维护

2.灵活性高:

  • 支持多种时间间隔设置(秒、分、小时、天等)
  • 可以设置特定时间点执行任务

3.非阻塞执行:

  • 通过在独立线程中运行调度器,主线程可以继续执行其他任务
  • 使用daemon=True参数确保程序退出时调度线程也会自动结束

4.轻量级:

  • 相比于重量级的任务调度框架,schedule库非常轻量

缺点

1.资源消耗:

  • 需要持续运行一个线程来检查任务执行时间
  • 即使没有任务需要执行,也会每秒检查一次(通过time.sleep(1)控制)

2.精度限制:

  • 最小检查间隔为1秒,对于需要更高精度的任务调度可能不够

3.无持久化:

  • 任务调度信息仅保存在内存中,程序重启后需要重新设置

4.缺乏分布式支持:

  • 仅适用于单机环境,不支持分布式任务调度

5.无任务监控和管理界面:

  • 没有内置的Web界面或管理工具来监控任务执行状态

6.不适用于复杂调度场景:

  • 对于依赖关系复杂、需要失败重试、任务分片等高级功能的场景支持有限

适用场景

这种调度方式适用于:

  • 简单的定时任务
  • 单机应用程序
  • 对调度精度要求不高的场景
  • 快速原型开发和小型项目

对于更复杂的企业级任务调度需求,可能需要考虑使用如APScheduler、Celery或分布式任务调度系统。

实践总结

性能考虑

虽然schedule模块使用简单,但在实际应用中还是需要注意一些性能方面的考虑。对于简单的定时任务,schedule的表现非常出色。但如果需要处理大量的并发任务或者对时间精度要求极高的场景,可能需要考虑更专业的任务调度系统。

与其他调度工具的比较

相比于Celery这样的重量级任务队列系统,schedule更加轻量和简单;相比于APScheduler这样的功能全面的调度器,schedule更加专注和易用。选择哪种工具主要取决于具体的应用场景和需求复杂度。

最佳实践建议

在使用schedule模块时,建议遵循一些最佳实践。比如合理设置任务的执行间隔,避免过于频繁的任务执行影响系统性能;对于重要的定时任务,建议添加异常处理和日志记录机制;定期检查和维护定时任务的运行状态,确保任务按预期执行。

本文标签: 项目 schedule python