HOOOS

Django + Celery 异步任务实战:邮件发送与图片处理全攻略

0 5 AsyncMaster DjangoCelery异步任务
Apple

当你的 Django 应用需要处理耗时操作,比如发送大量邮件、处理用户上传的图片,同步处理会严重影响用户体验。Celery 是一个强大的异步任务队列,可以轻松地将这些任务交给后台处理,让你的应用保持流畅。

本文将手把手教你如何在 Django 项目中集成 Celery,并提供邮件发送和图片处理的完整示例,最后还会介绍如何在 Django 管理界面监控 Celery 任务。

1. 环境准备

确保你已经安装了以下软件:

  • Python (3.6+)
  • Django (3.0+)
  • Redis 或 RabbitMQ (作为 Celery 的消息中间件)

如果还没有安装,可以使用 pip 安装 Django 和 Celery:

pip install django celery redis

2. 创建 Django 项目

首先,创建一个新的 Django 项目:

django-admin startproject myproject
cd myproject
python manage.py startapp myapp

myproject/settings.py 中,将 myapp 添加到 INSTALLED_APPS 中:

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'myapp',
]

3. 配置 Celery

3.1 安装 Celery

如果还没有安装,现在安装 Celery:

pip install celery redis  # 或者 pip install celery rabbitmq

3.2 定义 Celery 实例

myproject 目录下,创建 celery.py 文件,并添加以下内容:

import os

from celery import Celery

# 设置 Django 的 settings 模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')

# 从 Django 的 settings 文件中读取 Celery 配置
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动发现 tasks.py 文件中的任务
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

这个文件定义了一个 Celery 实例,并配置 Celery 从 Django 的 settings.py 文件中读取配置。autodiscover_tasks() 函数会自动查找所有已安装的 app 下的 tasks.py 文件,并将其中定义的 Celery 任务注册到 Celery 实例中。

3.3 修改 __init__.py

myproject/__init__.py 文件中,添加以下内容,确保 Celery 应用在 Django 启动时被加载:

from .celery import app as celery_app

__all__ = ('celery_app',)

3.4 配置 Django Settings

myproject/settings.py 文件中,添加 Celery 的配置信息。这里以 Redis 作为消息中间件为例:

CELERY_BROKER_URL = 'redis://localhost:6379/0'  # Redis 地址
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'  # 存储任务结果的 backend
CELERY_ACCEPT_CONTENT = ['application/json']  # 允许的任务内容类型
CELERY_TASK_SERIALIZER = 'json'  # 任务序列化方式
CELERY_RESULT_SERIALIZER = 'json'  # 结果序列化方式
CELERY_TIMEZONE = 'Asia/Shanghai'  # 时区设置

根据你的实际情况修改 CELERY_BROKER_URL。如果使用 RabbitMQ,则配置如下:

CELERY_BROKER_URL = 'amqp://user:password@localhost:5672//'

4. 定义 Celery 任务

myapp 目录下,创建 tasks.py 文件,并添加以下内容:

from celery import shared_task
from django.core.mail import send_mail
from django.conf import settings
from PIL import Image
import os


@shared_task
def send_email_task(subject, message, recipient_list):
    """异步发送邮件的任务"""
    send_mail(
        subject, message, settings.DEFAULT_FROM_EMAIL, recipient_list
    )


@shared_task
def process_image_task(image_path):
    """异步处理图片的任务"""
    try:
        img = Image.open(image_path)
        # 进行图片处理,例如调整大小、添加水印等
        img = img.resize((200, 200))
        # 保存处理后的图片
        new_image_path = image_path.replace(os.path.splitext(image_path)[1], '_processed.jpg')
        img.save(new_image_path)
        return new_image_path
    except Exception as e:
        print(f"图片处理失败: {e}")
        return None

@shared_task 装饰器将函数转换为 Celery 任务。send_email_task 任务用于异步发送邮件,process_image_task 任务用于异步处理图片。

5. 调用 Celery 任务

在 Django 视图中,你可以像调用普通函数一样调用 Celery 任务,但是需要使用 .delay() 方法来将任务放入 Celery 队列中。

from django.shortcuts import render
from .tasks import send_email_task, process_image_task
from django.http import HttpResponse


def my_view(request):
    # 异步发送邮件
    send_email_task.delay(
        'Hello from Celery!',
        'This is a test email sent from a Celery task.',
        ['recipient@example.com']
    )

    # 异步处理图片
    image_path = '/path/to/your/image.jpg'
    process_image_task.delay(image_path)

    return HttpResponse("任务已提交!")

6. 启动 Celery Worker

打开一个新的终端窗口,进入 Django 项目根目录,运行以下命令启动 Celery Worker:

celery -A myproject worker -l info

-A myproject 指定 Celery 应用,-l info 设置日志级别为 INFO。

7. 运行 Django 开发服务器

在另一个终端窗口中,运行 Django 开发服务器:

python manage.py runserver

访问你的 Django 视图,例如 http://localhost:8000/my_view/,你应该会看到 “任务已提交!” 的提示。Celery Worker 会在后台异步执行邮件发送和图片处理任务。

8. 监控 Celery 任务

8.1 安装 django-celery-results

可以使用 django-celery-results 扩展来在 Django 管理界面中监控 Celery 任务的执行情况。

pip install django-celery-results

8.2 配置 Django Settings

myproject/settings.py 文件中,将 django_celery_results 添加到 INSTALLED_APPS 中:

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'myapp',
    'django_celery_results',
]

运行数据库迁移:

python manage.py migrate

8.3 注册 Celery 任务结果 backend

确保 CELERY_RESULT_BACKEND 设置正确:

CELERY_RESULT_BACKEND = 'django-db'

8.4 配置 Celery 任务结果清理

可选地,可以配置 Celery 任务结果的清理策略,避免数据库中存储过多的任务结果。

CELERY_RESULT_EXPIRES = 3600  # 任务结果过期时间,单位为秒

8.5 在 Django Admin 中查看任务状态

现在,你可以访问 Django 管理界面(通常是 http://localhost:8000/admin/),你应该会看到 “Celery 任务结果” 的选项。点击进入,可以查看所有 Celery 任务的执行状态、开始时间、结束时间、结果等信息。

9. 总结

通过本文,你已经学会了如何在 Django 项目中集成 Celery,并使用 Celery 异步发送邮件和处理图片。同时,你也学会了如何使用 django-celery-results 扩展来在 Django 管理界面中监控 Celery 任务的执行情况。希望这些知识能帮助你更好地构建高性能的 Django 应用。 记住,在实际开发中,根据你的业务需求和硬件资源,合理配置 Celery 的参数,例如并发数、预取数等,以达到最佳的性能。

点评评价

captcha
健康