Celery
1. Celry简介
Celery 是 Python 生态中成熟的分布式任务队列框架,核心用于异步处理任务、定时任务调度,依赖消息代理(Broker)和结果存储(Backend),适用于高并发、耗时任务场景。
简单说,Celery 能把 “耗时任务” 从主程序中剥离出来,放到独立的 “任务队列” 中,由专门的 “工作进程(Worker)” 异步执行,主程序无需等待任务完成,从而提升系统的响应速度和并发能力。
关键部分:
-
任务(Task):你定义的需要异步执行的函数(比如发送邮件、数据清洗、文件转换等)。
-
消息中间件(Broker):用于存储任务的 “队列”,负责接收主程序发送的任务,并分发给 Worker。
常用的 Broker 有:RabbitMQ(推荐,功能最完善)、Redis(轻量、常用)、Amazon SQS 等。
- 结果存储(Backend):用于保存任务的执行结果(如果需要获取结果的话)。
常用的 Backend 有:Redis、MySQL、PostgreSQL、MongoDB等(也可以不配置,即不存储结果)。
工作流程:
- 主程序(生产者)调用 Celery 定义的任务,将任务参数和执行指令发送给 Broker(任务入队)。
- Celery 的 Worker(消费者,独立进程)持续监听 Broker,一旦有任务,就取出并执行。
- 任务执行完成后,结果会被存储到 Backend(如果配置了的话),主程序可通过任务 ID 从 Backend 中查询结果。
适用场景:
- Web 应用异步任务:比如用户注册后发送验证邮件、上传文件后异步处理(压缩 / 转码)、生成大型报表等(避免阻塞 HTTP 响应)。
- 定时任务:比如每天凌晨 3 点备份数据库、每小时爬取一次数据、定期清理日志等(替代 Linux 的 crontab,更灵活)。
- 分布式计算:将一个大任务拆分成多个子任务,由多个 Worker 并行执行,提高效率。
环境准备:
- 核心依赖安装:
pip install celery
- 消息代理(Broker)选择:
- 推荐 Redis(轻量易部署):
pip install redis。 - 或 RabbitMQ(稳定性强):需单独安装 RabbitMQ 服务。
- 结果存储(Backend)选择:
- 简单场景用 Redis:与 Broker 共用,无需额外配置。
- 复杂场景用数据库(如 PostgreSQL):
pip install psycopg2-binary(PostgreSQL 驱动)。
2. 使用方法
1. 异步任务
1. 编写 Celery 实例与任务
创建文件 tasks.py,代码如下:
from celery import Celery
# 初始化 Celery:参数1为实例名,broker为消息代理地址,backend为结果存储地址
app = Celery(
'demo',
broker='redis://localhost:6379/0', # Redis 第0号数据库作为Broker
backend='redis://localhost:6379/1' # Redis 第1号数据库作为Backend
)
# 定义异步任务(用 @app.task 装饰)
@app.task
def add(x, y):
return x + y
2. 启动 Celery Worker
终端执行命令(需在 tasks.py 所在目录):
celery -A tasks worker --loglevel=info # -A 指定Celery实例所在文件,--loglevel=info 显示日志
3. 调用异步任务
打开 Python 终端,执行:
from tasks import add
# 异步调用:返回任务ID,任务在Worker中执行
result = add.delay(2, 3)
# 查看任务状态与结果
print(result.id) # 任务唯一ID(如 'd4e5f6...')
print(result.ready()) # 任务是否完成(True/False)
print(result.get()) # 获取结果(阻塞直到任务完成,返回 5)
print(result.status) # 任务状态(SUCCESS/FAILED/PENDING)
4. 概念解析
- Worker:任务执行者,监听 Broker 中的任务并执行,可多进程 / 多线程运行。
- Task:被异步执行的函数,用
@app.task装饰,支持参数传递、结果返回。 - Broker:消息中间件,存储待执行的任务队列(Worker 从这里取任务)。
- Backend:结果存储,保存任务执行后的返回值(供调用方查询)。
- Beat:定时任务调度器,按配置的时间规则触发任务。
2. 任务高级用法
1. 任务装饰器参数
@app.task(
retry_backoff=3, # 失败重试时,每次重试间隔按 2^0, 2^1, 2^2 秒递增(最多3次)
retry_kwargs={'max_retries': 5}, # 最大重试次数 5
ignore_result=True # 不需要存储任务结果(提升性能)
)
def send_email(to):
# 模拟邮件发送(失败时会自动重试)
if not to:
raise ValueError("收件人不能为空")
print(f"向 {to} 发送邮件成功")
2. 任务状态与进度跟踪
@app.task(bind=True) # bind=True 使任务函数第一个参数为 self(任务实例)
def long_task(self, total):
for i in range(total):
# 更新任务进度(可在前端查询)
self.update_state(state='PROGRESS', meta={'current': i, 'total': total})
time.sleep(1) # 模拟耗时操作
return "任务完成"
# 调用后查询进度
result = long_task.delay(10)
while not result.ready():
print(result.state) # 输出 PROGRESS
print(result.info) # 输出 {'current': 3, 'total': 10}(当前进度)
3. 任务错误处理
# 捕获任务执行异常
result = add.delay(2, '3') # 传入错误类型参数
try:
result.get(timeout=10) # 超时时间10秒
except Exception as e:
print(f"任务执行失败:{e}") # 输出类型错误信息
3. 定时任务
1. 配置定时任务
在 tasks.py 中添加配置:
from datetime import timedelta
app.conf.beat_schedule = {
# 定时任务1:每10秒执行一次 add 任务
'add-every-10-seconds': {
'task': 'tasks.add', # 任务路径(文件.函数名)
'schedule': timedelta(seconds=10),
'args': (10, 20) # 任务参数
},
# 定时任务2:每天固定时间执行(用 crontab 表达式)
'send-email-daily': {
'task': 'tasks.send_email',
'schedule': crontab(hour=8, minute=30), # 每天8:30执行
'args': ('user@example.com',)
}
}
2. 启动 Beat 调度器 + Worker
# 终端1:启动 Worker(执行任务)
celery -A tasks worker --loglevel=info
# 终端2:启动 Beat(调度定时任务)
celery -A tasks beat --loglevel=info
4. 部署与优化
1. Worker 并发配置
启动时指定并发数(默认等于 CPU 核心数):
celery -A tasks worker --concurrency=4 --loglevel=info # 4个并发进程
2. 日志配置
在 tasks.py 中添加日志配置,便于问题排查:
app.conf.update(
worker_log_format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
worker_task_log_format='%(asctime)s - %(name)s - %(levelname)s - %(task_id)s - %(message)s'
)
3. 监控工具
- 安装 Flower(Celery 官方监控工具):
pip install flower。 - 启动监控:
celery -A tasks flower,访问http://localhost:5555查看任务状态、Worker 状态。
5. 常见问题
- Broker 连接失败:检查 Redis/RabbitMQ 服务是否启动,地址是否正确。
- 任务结果查不到:确保配置了
backend,且任务没有设置ignore_result=True。 - Worker 不执行任务:检查 Worker 是否与 Broker 连接正常,任务函数是否被
@app.task装饰。
发表评论