当你的 Django 应用需要处理耗时操作,比如发送大量邮件、处理用户上传的图片,同步处理会严重影响用户体验。Celery 是一个强大的异步任务队列,可以轻松地将这些任务交给后台处理,让你的应用保持流畅。
本文将手把手教你如何在 Django 项目中集成 Celery,并提供邮件发送和图片处理的完整示例,最后还会介绍如何在 Django 管理界面监控 Celery 任务。
1. 环境准备
确保你已经安装了以下软件:
- Python (3.6+)
- Django (3.0+)
- Redis 或 RabbitMQ (作为 Celery 的消息中间件)
如果还没有安装,可以使用 pip 安装 Django 和 Celery:
pip install django celery redis
2. 创建 Django 项目
首先,创建一个新的 Django 项目:
django-admin startproject myproject
cd myproject
python manage.py startapp myapp
在 myproject/settings.py
中,将 myapp
添加到 INSTALLED_APPS
中:
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'myapp',
]
3. 配置 Celery
3.1 安装 Celery
如果还没有安装,现在安装 Celery:
pip install celery redis # 或者 pip install celery rabbitmq
3.2 定义 Celery 实例
在 myproject
目录下,创建 celery.py
文件,并添加以下内容:
import os
from celery import Celery
# 设置 Django 的 settings 模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
# 从 Django 的 settings 文件中读取 Celery 配置
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现 tasks.py 文件中的任务
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
这个文件定义了一个 Celery 实例,并配置 Celery 从 Django 的 settings.py
文件中读取配置。autodiscover_tasks()
函数会自动查找所有已安装的 app 下的 tasks.py
文件,并将其中定义的 Celery 任务注册到 Celery 实例中。
3.3 修改 __init__.py
在 myproject/__init__.py
文件中,添加以下内容,确保 Celery 应用在 Django 启动时被加载:
from .celery import app as celery_app
__all__ = ('celery_app',)
3.4 配置 Django Settings
在 myproject/settings.py
文件中,添加 Celery 的配置信息。这里以 Redis 作为消息中间件为例:
CELERY_BROKER_URL = 'redis://localhost:6379/0' # Redis 地址
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 存储任务结果的 backend
CELERY_ACCEPT_CONTENT = ['application/json'] # 允许的任务内容类型
CELERY_TASK_SERIALIZER = 'json' # 任务序列化方式
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方式
CELERY_TIMEZONE = 'Asia/Shanghai' # 时区设置
根据你的实际情况修改 CELERY_BROKER_URL
。如果使用 RabbitMQ,则配置如下:
CELERY_BROKER_URL = 'amqp://user:password@localhost:5672//'
4. 定义 Celery 任务
在 myapp
目录下,创建 tasks.py
文件,并添加以下内容:
from celery import shared_task
from django.core.mail import send_mail
from django.conf import settings
from PIL import Image
import os
@shared_task
def send_email_task(subject, message, recipient_list):
"""异步发送邮件的任务"""
send_mail(
subject, message, settings.DEFAULT_FROM_EMAIL, recipient_list
)
@shared_task
def process_image_task(image_path):
"""异步处理图片的任务"""
try:
img = Image.open(image_path)
# 进行图片处理,例如调整大小、添加水印等
img = img.resize((200, 200))
# 保存处理后的图片
new_image_path = image_path.replace(os.path.splitext(image_path)[1], '_processed.jpg')
img.save(new_image_path)
return new_image_path
except Exception as e:
print(f"图片处理失败: {e}")
return None
@shared_task
装饰器将函数转换为 Celery 任务。send_email_task
任务用于异步发送邮件,process_image_task
任务用于异步处理图片。
5. 调用 Celery 任务
在 Django 视图中,你可以像调用普通函数一样调用 Celery 任务,但是需要使用 .delay()
方法来将任务放入 Celery 队列中。
from django.shortcuts import render
from .tasks import send_email_task, process_image_task
from django.http import HttpResponse
def my_view(request):
# 异步发送邮件
send_email_task.delay(
'Hello from Celery!',
'This is a test email sent from a Celery task.',
['recipient@example.com']
)
# 异步处理图片
image_path = '/path/to/your/image.jpg'
process_image_task.delay(image_path)
return HttpResponse("任务已提交!")
6. 启动 Celery Worker
打开一个新的终端窗口,进入 Django 项目根目录,运行以下命令启动 Celery Worker:
celery -A myproject worker -l info
-A myproject
指定 Celery 应用,-l info
设置日志级别为 INFO。
7. 运行 Django 开发服务器
在另一个终端窗口中,运行 Django 开发服务器:
python manage.py runserver
访问你的 Django 视图,例如 http://localhost:8000/my_view/
,你应该会看到 “任务已提交!” 的提示。Celery Worker 会在后台异步执行邮件发送和图片处理任务。
8. 监控 Celery 任务
8.1 安装 django-celery-results
可以使用 django-celery-results
扩展来在 Django 管理界面中监控 Celery 任务的执行情况。
pip install django-celery-results
8.2 配置 Django Settings
在 myproject/settings.py
文件中,将 django_celery_results
添加到 INSTALLED_APPS
中:
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'myapp',
'django_celery_results',
]
运行数据库迁移:
python manage.py migrate
8.3 注册 Celery 任务结果 backend
确保 CELERY_RESULT_BACKEND
设置正确:
CELERY_RESULT_BACKEND = 'django-db'
8.4 配置 Celery 任务结果清理
可选地,可以配置 Celery 任务结果的清理策略,避免数据库中存储过多的任务结果。
CELERY_RESULT_EXPIRES = 3600 # 任务结果过期时间,单位为秒
8.5 在 Django Admin 中查看任务状态
现在,你可以访问 Django 管理界面(通常是 http://localhost:8000/admin/
),你应该会看到 “Celery 任务结果” 的选项。点击进入,可以查看所有 Celery 任务的执行状态、开始时间、结束时间、结果等信息。
9. 总结
通过本文,你已经学会了如何在 Django 项目中集成 Celery,并使用 Celery 异步发送邮件和处理图片。同时,你也学会了如何使用 django-celery-results
扩展来在 Django 管理界面中监控 Celery 任务的执行情况。希望这些知识能帮助你更好地构建高性能的 Django 应用。 记住,在实际开发中,根据你的业务需求和硬件资源,合理配置 Celery 的参数,例如并发数、预取数等,以达到最佳的性能。