分布式任务队列框架——Celery


Celery

1. Celry简介

Celery 是 Python 生态中成熟的分布式任务队列框架,核心用于异步处理任务、定时任务调度,依赖消息代理(Broker)和结果存储(Backend),适用于高并发、耗时任务场景。

简单说,Celery 能把 “耗时任务” 从主程序中剥离出来,放到独立的 “任务队列” 中,由专门的 “工作进程(Worker)” 异步执行,主程序无需等待任务完成,从而提升系统的响应速度和并发能力。

关键部分

  1. 任务(Task):你定义的需要异步执行的函数(比如发送邮件、数据清洗、文件转换等)。

  2. 消息中间件(Broker):用于存储任务的 “队列”,负责接收主程序发送的任务,并分发给 Worker。

常用的 Broker 有:RabbitMQ(推荐,功能最完善)、Redis(轻量、常用)、Amazon SQS 等。

  1. 结果存储(Backend):用于保存任务的执行结果(如果需要获取结果的话)。

常用的 Backend 有:Redis、MySQL、PostgreSQL、MongoDB等(也可以不配置,即不存储结果)。

工作流程:

  1. 主程序(生产者)调用 Celery 定义的任务,将任务参数和执行指令发送给 Broker(任务入队)。
  2. Celery 的 Worker(消费者,独立进程)持续监听 Broker,一旦有任务,就取出并执行。
  3. 任务执行完成后,结果会被存储到 Backend(如果配置了的话),主程序可通过任务 ID 从 Backend 中查询结果。

适用场景:

  • Web 应用异步任务:比如用户注册后发送验证邮件、上传文件后异步处理(压缩 / 转码)、生成大型报表等(避免阻塞 HTTP 响应)。
  • 定时任务:比如每天凌晨 3 点备份数据库、每小时爬取一次数据、定期清理日志等(替代 Linux 的 crontab,更灵活)。
  • 分布式计算:将一个大任务拆分成多个子任务,由多个 Worker 并行执行,提高效率。

环境准备:

  • 核心依赖安装
pip install celery
  • 消息代理(Broker)选择:
  • 推荐 Redis(轻量易部署):pip install redis
  • 或 RabbitMQ(稳定性强):需单独安装 RabbitMQ 服务。
  • 结果存储(Backend)选择:
  • 简单场景用 Redis:与 Broker 共用,无需额外配置。
  • 复杂场景用数据库(如 PostgreSQL):pip install psycopg2-binary(PostgreSQL 驱动)。

2. 使用方法

1. 异步任务

1. 编写 Celery 实例与任务

创建文件 tasks.py,代码如下:

from celery import Celery

# 初始化 Celery:参数1为实例名,broker为消息代理地址,backend为结果存储地址
app = Celery(
    'demo',
    broker='redis://localhost:6379/0',  # Redis 第0号数据库作为Broker
    backend='redis://localhost:6379/1'   # Redis 第1号数据库作为Backend
)

# 定义异步任务(用 @app.task 装饰)
@app.task
def add(x, y):
    return x + y

2. 启动 Celery Worker

终端执行命令(需在 tasks.py 所在目录):

celery -A tasks worker --loglevel=info  # -A 指定Celery实例所在文件,--loglevel=info 显示日志

3. 调用异步任务

打开 Python 终端,执行:

from tasks import add

# 异步调用:返回任务ID,任务在Worker中执行
result = add.delay(2, 3)

# 查看任务状态与结果
print(result.id)          # 任务唯一ID(如 'd4e5f6...')
print(result.ready())     # 任务是否完成(True/False)
print(result.get())       # 获取结果(阻塞直到任务完成,返回 5)
print(result.status)      # 任务状态(SUCCESS/FAILED/PENDING)

4. 概念解析

  • Worker:任务执行者,监听 Broker 中的任务并执行,可多进程 / 多线程运行。
  • Task:被异步执行的函数,用 @app.task 装饰,支持参数传递、结果返回。
  • Broker:消息中间件,存储待执行的任务队列(Worker 从这里取任务)。
  • Backend:结果存储,保存任务执行后的返回值(供调用方查询)。
  • Beat:定时任务调度器,按配置的时间规则触发任务。

2. 任务高级用法

1. 任务装饰器参数

@app.task(
    retry_backoff=3,  # 失败重试时,每次重试间隔按 2^0, 2^1, 2^2 秒递增(最多3次)
    retry_kwargs={'max_retries': 5},  # 最大重试次数 5
    ignore_result=True  # 不需要存储任务结果(提升性能)
)
def send_email(to):
    # 模拟邮件发送(失败时会自动重试)
    if not to:
        raise ValueError("收件人不能为空")
    print(f"向 {to} 发送邮件成功")

2. 任务状态与进度跟踪

@app.task(bind=True)  # bind=True 使任务函数第一个参数为 self(任务实例)
def long_task(self, total):
    for i in range(total):
        # 更新任务进度(可在前端查询)
        self.update_state(state='PROGRESS', meta={'current': i, 'total': total})
        time.sleep(1)  # 模拟耗时操作
    return "任务完成"

# 调用后查询进度
result = long_task.delay(10)
while not result.ready():
    print(result.state)  # 输出 PROGRESS
    print(result.info)   # 输出 {'current': 3, 'total': 10}(当前进度)

3. 任务错误处理

# 捕获任务执行异常
result = add.delay(2, '3')  # 传入错误类型参数
try:
    result.get(timeout=10)  # 超时时间10秒
except Exception as e:
    print(f"任务执行失败:{e}")  # 输出类型错误信息

3. 定时任务

1. 配置定时任务

tasks.py 中添加配置:

from datetime import timedelta

app.conf.beat_schedule = {
    # 定时任务1:每10秒执行一次 add 任务
    'add-every-10-seconds': {
        'task': 'tasks.add',  # 任务路径(文件.函数名)
        'schedule': timedelta(seconds=10),
        'args': (10, 20)  # 任务参数
    },
    # 定时任务2:每天固定时间执行(用 crontab 表达式)
    'send-email-daily': {
        'task': 'tasks.send_email',
        'schedule': crontab(hour=8, minute=30),  # 每天8:30执行
        'args': ('user@example.com',)
    }
}

2. 启动 Beat 调度器 + Worker

# 终端1:启动 Worker(执行任务)
celery -A tasks worker --loglevel=info

# 终端2:启动 Beat(调度定时任务)
celery -A tasks beat --loglevel=info

4. 部署与优化

1. Worker 并发配置

启动时指定并发数(默认等于 CPU 核心数):

celery -A tasks worker --concurrency=4 --loglevel=info  # 4个并发进程

2. 日志配置

tasks.py 中添加日志配置,便于问题排查:

app.conf.update(
    worker_log_format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    worker_task_log_format='%(asctime)s - %(name)s - %(levelname)s - %(task_id)s - %(message)s'
)

3. 监控工具

  • 安装 Flower(Celery 官方监控工具):pip install flower
  • 启动监控:celery -A tasks flower,访问 http://localhost:5555 查看任务状态、Worker 状态。

5. 常见问题

  • Broker 连接失败:检查 Redis/RabbitMQ 服务是否启动,地址是否正确。
  • 任务结果查不到:确保配置了 backend,且任务没有设置 ignore_result=True
  • Worker 不执行任务:检查 Worker 是否与 Broker 连接正常,任务函数是否被 @app.task 装饰。

0 条评论

发表评论

暂无评论,欢迎发表您的观点!