Django异步任务神器:Celery

摘要

Celery,是Python程序员的好帮手,它能够帮你解决每日任务的烦恼。它就像是一个美食家,把每日任务推送到消息队列,然后让勤奋的厨师们来完成。Celery不仅简单易用,而且能够处理大量的任务。它就像是一位勤劳的工人,每天都能够完成数百个任务。

正文

Django(41)详细说明多线程每日任务架构Celery

celery详细介绍

  Celery是由Python开发设计、简易、灵便、靠谱的分布式系统每日任务序列,是一个解决多线程每日任务的架构,其实质是经营者顾客实体模型,经营者推送每日任务到消息队列,顾客承担解决每日任务。Celery偏重于即时实际操作,但对生产调度适用也很好,其每日能够 解决数以百计的每日任务。特性:

  • 简易:了解celery的工作内容后,配备应用简易
  • 高可用性:当每日任务实行不成功或实行全过程中产生联接终断,celery会全自动试着再次执行任务
  • 迅速:一个单过程的celery每分可解决几百万个每日任务
  • 灵便:基本上celery的每个部件都能够被拓展及自订制

Celery由三一部分组成:

  • 消息中间件(Broker):官方网给予了许多候选计划方案,适用RabbitMQRedisAmazon SQSMongoDBMemcached 等,官方网强烈推荐RabbitMQ
  • 每日任务实行模块(Worker):每日任务实行模块,承担从消息队列中取下每日任务实行,它能够 运行一个或是好几个,还可以运行在不一样的设备连接点,这就是其完成分布式系统的关键
  • 結果储存(Backend):官方网给予了众多的储存方法适用:RabbitMQ、 RedisMemcached,SQLAlchemy, Django ORMApache CassandraElasticsearch


原理:

  1. 每日任务控制模块Task包括多线程每日任务和计划任务。在其中,多线程每日任务一般在领域模型中被开启高并发往消息队列,而计划任务由Celery Beat过程周期性地将每日任务发往消息队列;
  2. 每日任务实行模块Worker即时监控消息队列获得序列中的每日任务实行;
  3. Woker实行完每日任务后将結果储存在Backend中;

 

django运用Celery

  django架构要求/回应的全过程是同歩的,架构自身没法完成多线程回应。可是我们在新项目全过程中会常常会碰到一些用时的每日任务, 例如:邮件发送、发送信息、大数据统计这些,这种实际操作用时长,同歩实行对客户体验十分不友善,那麼在这类状况下就必须完成多线程实行。多线程实行前面一般应用ajax,后端开发应用Celery
 

新项目运用

  django新项目运用celery,关键有二种每日任务方法,一是多线程每日任务(上传者每日任务),一般是web要求,二是计划任务
 

多线程每日任务redis

 

1.安裝celery

pip3 install celery

 

2.celery.py

在主新项目文件目录下,新创建 celery.py 文档:

import os
import django
from celery import Celery
from django.conf import settings


# 设定系统软件系统变量,安裝django,务必设定,不然在运行celery的时候会出错
# celery_study 是当今新项目名
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_demo.settings.py')
django.setup()

app = Celery('celery_demo')
app.config_from_object('django.conf.settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

留意:是和settings.py文档同文件目录,一定不可以创建在新项目网站根目录,要不然会造成celery这一控制模块名的取名矛盾
另外,在主新项目的init.py中,加上以下编码:

from .celery import celery_app
__all__ = ['celery_app']

 

3.settings.py

在环境变量中配备相匹配的redis配备:

# Broker配备,应用Redis做为消息中间件
BROKER_URL = 'redis://127.0.0.1:6379/0' 

# BACKEND配备,这儿应用redis
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' 

# 結果实例化计划方案
CELERY_RESULT_SERIALIZER = 'json' 

# 每日任务結果到期時间,秒
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 

# 时区时间配备
CELERY_TIMEZONE='Asia/Shanghai'

更为详尽的配备可查询官方网文本文档:http://docs.celeryproject.org/en/latest/userguide/configuration.html
 

4.tasks.py

在子运用下创建分别相匹配的每日任务文档tasks.py(务必是tasks.py这一名称,不允许改动)

from celery import shared_task

@shared_task
def add(x, y):
    return x   y

 

5.启用每日任务

views.py 中,根据 delay 方式启用每日任务,而且回到每日任务相匹配的 task_id,这一id用以事后查看每日任务情况

from celery_app.tasks import add
def index(request):
    ar = add.delay(10, 6)
    return HttpResponse(f'早已实行celery的add每日任务启用,task_id:{ar.id}')

 

6.运行celery

在命令窗口中,转换到新项目根目录下,实行下列指令:

celery worker -A celery_demo -l info
  • -A celery_demo:特定新项目app
  • worker: 说明这是一个每日任务实行模块
  • -l info:特定日志輸出等级

輸出下列結果,意味着运行celery取得成功

大量celery指令的主要参数,能够 键入celery --help
 

7.获得每日任务結果

views.py 中,根据AsyncResult.get()获得結果

def get_result(request):
    task_id = request.GET.get('task_id')
    ar = result.AsyncResult(task_id)
    if ar.ready():
        return JsonResponse({"status": ar.state, "result": ar.get()})
    else:
        return JsonResponse({"status": ar.state, "result": ""})

AsyncResult类的常见的特性和方式:

  • state: 回到每日任务情况,等同于status
  • task_id: 回到每日任务id
  • result: 回到每日任务結果,同get()方式;
  • ready(): 分辨每日任务是不是实行及其有結果,有結果为True,不然False
  • info(): 获得每日任务信息内容,默认设置为結果;
  • wait(t): 等候t秒后获得結果,若每日任务实行结束,则不等候立即获得結果,若每日任务在实行中,则wait期内一直堵塞,直至超日报错;
  • successful(): 分辨每日任务是不是取得成功,取得成功为True,不然为False

编码的准备工作都做完了,大家逐渐浏览电脑浏览器127.0.0.1/celery_app/,获得下列結果

早已实行celery的add每日任务启用,task_id:b1e9096e-430c-4f1b-bbfc-1f0a0c98c7cb

这一步的功效:运行add每日任务,随后放到消息中间件中,这儿大家用的是redis,就可以根据redis专用工具查询,以下

随后大家以前运行的celeryworker过程会获得任务列表,逐一执行任务,实行完毕后会储存到backend中,最终根据前面ajax轮循一个插口,依据task_id获取每日任务的結果
下面大家浏览http://127.0.0.1:8000/celery_app/get_result/?task_id=b1e9096e-430c-4f1b-bbfc-1f0a0c98c7cb,就能从网页页面上查询到結果,以下

{
"status": "SUCCESS",
"result": 16
}

表明计划任务实行取得成功,回到結果为16
 

计划任务

在第一步的多线程每日任务的基本上,开展一部分改动就可以在
 

1.settings.py

settings文档,配备以下编码就可以

from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
    'mul_every_10_seconds': {
         # 每日任务途径
        'task': 'celery_app.tasks.mul',
         # 每10秒实行一次
        'schedule': 10,
        'args': (10, 5)
    },
    'xsum_week1_20_20_00': {
         # 每日任务途径
        'task': 'celery_app.tasks.xsum',
        # 每星期一20点20分实行
        'schedule': crontab(hour=20, minute=20, day_of_week=1),
        'args': ([1,2,3,4],),
    },
}

主要参数表明以下:

  • task:每日任务涵数
  • schedule:实行頻率,能够 是整形(分秒),还可以是timedelta目标,还可以是crontab目标,还可以是自定类(承继celery.schedules.schedule
  • args:部位主要参数,目录或元组
  • kwargs:关键词主要参数,词典
  • options:可选主要参数,词典,一切 apply_async() 适用的主要参数
  • relative:默认设置是False,取相对性于beat的开始时间;设定为True,则取设定的timedelta時间

更为详尽的表明参照官方网文本文档:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#crontab-schedules
 

2.运行celery

各自运行workerbeat

celery worker -A celery_demo -l debug 
celery beat -A celery_demo -l debug

我们可以见到计划任务会每过10s就运作每日任务

运作完的結果会储存在redis

 

每日任务关联

Celery可根据task关联到案例获得到task的前后文,那样我们可以在task运作情况下获得到task的情况,纪录有关日志等
我们可以想像那样一个情景,当每日任务碰到难题,实行不成功时,大家必须开展再试,完成编码以下

@shared_task(bind=True)
def add(self, x, y):
    try:
        logger.info('-add' * 10)
        logger.info(f'{self.name}, id:{self.request.id}')
        raise Exception
    except Exception as e:
        # 错误每4秒试着一次,一共试着4次
        self.retry(exc=e, countdown=4, max_retries=4)
    return x   y

表明以下:

  • 在装饰器中添加主要参数 bind=True
  • task涵数中的第一个基本参数为self
    self目标是celery.app.task.Task的案例,能够 用以完成再试等多种多样作用

然后我们在views.py文档中,载入以下主视图涵数

def get_result(request):
    task_id = request.GET.get('task_id')
    ar = result.AsyncResult(task_id)
    if ar.successful():
        return JsonResponse({"status": ar.state, "result": ar.get()})
    else:
        return JsonResponse({"status": ar.state, "result": ""})

然后大家浏览http://127.0.0.1:8000/celery_app/,建立一个每日任务id,回到以下結果

早已实行celery的add每日任务启用,task_id:f55dcfb7-e184-4a29-abe9-3e1e55a2ffad

随后运行celery指令:

celery worker -A celery_demo -l info 

大家会发觉celery中的每日任务会抛出去一个出现异常,而且再试了4次,这是由于我们在tasks每日任务中积极抛出去了一个出现异常

[2021-06-02 11:27:55,487: INFO/MainProcess] Received task: celery_app.tasks.add[f55dcfb7-e184-4a29-abe9-3e1e55a2ffad]  ETA:[2021-06-02 11:27:59.420668 08:00] 
[2021-06-02 11:27:55,488: INFO/ForkPoolWorker-11] Task celery_app.tasks.add[f55dcfb7-e184-4a29-abe9-3e1e55a2ffad] retry: Retry in 4s: Exception()

最终大家浏览http://127.0.0.1:8000/celery_app/get_result/?task_id=f55dcfb7-e184-4a29-abe9-3e1e55a2ffad,查看每日任务的結果

{
"status": "FAILURE",
"result": ""
}

因为大家积极抛出异常(为了更好地仿真模拟实行全过程中的不正确),这就造成了大家的情况为FAILURE
 

每日任务勾子

  Celery在执行任务时,给予了勾子方式用以在每日任务实行进行情况下开展相匹配的实际操作,在Task源代码中给予了许多情况钩子函数如:on_success(取得成功后实行)、on_failure(不成功情况下实行)、on_retry(每日任务再试情况下实行)、after_return(每日任务回到情况下实行)

  1. 根据承继Task类,调用相匹配方式就可以,实例:
class MyHookTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        logger.info(f'task id:{task_id} , arg:{args} , successful !')

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.info(f'task id:{task_id} , arg:{args} , failed ! erros: {exc}')

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        logger.info(f'task id:{task_id} , arg:{args} , retry !  erros: {exc}')
  1. 在相匹配的task涵数的装饰器中,根据 base=MyHookTask 特定
@shared_task(base=MyHookTask, bind=True)
def mul(self, x, y):
	......

 

每日任务编辑

  在许多状况下,一个每日任务必须由好几个子每日任务或是一个每日任务必须许多流程才可以进行,Celery也可以完成那样的每日任务,进行这种类的每日任务根据下列控制模块进行:

  • group: 并行处理生产调度每日任务
  • chain: 链条式线程同步
  • chord: 相近group,但分headerbody两个一部分,header能够 是一个group每日任务,实行进行后启用body的每日任务
  • map: 投射生产调度,根据键入好几个入参来数次生产调度同一个每日任务
  • starmap: 相近map,入参相近*args
  • chunks: 将每日任务依照一定总数开展排序

1.group

最先在urls.py中载入以下编码:

path('primitive/', views.test_primitive),

然后在views.py中载入主视图涵数

from celery import result, group
def test_primitive(request):
    lazy_group = group(mul.s(i, i) for i in range(10))  # 转化成10个每日任务
    promise = lazy_group()
    result = promise.get()
    return JsonResponse({'function': 'test_primitive', 'result': result})

tasks.py文档中载入以下编码

@shared_task
def mul(x, y):
    return x * y

表明:
根据task涵数的 s 方式传到主要参数,运行每日任务,大家浏览http://127.0.0.1:8000/celery_app/primitive/,会获得下列結果

{
  "function": "test_primitive",
  "result": [
        0,
        1,
        4,
        9,
        16,
        25,
        36,
        49,
        64,
        81
    ]
}

上边这类方式必须开展等候,假如仍然想完成多线程的方法,那麼就务必在tasks.py中新创建一个task方式,启用group,实例以下:
tasks.py

from celery.result import allow_join_result
@shared_task
def first_group():
    with allow_join_result():
        return group(mul.s(i, i) for i in range(10))().get()

urls.py

path('group_task/', views.group_task),

views.py

def group_task(request):
    ar = first_group.delay()
    return HttpResponse(f'早已实行celery的group_task每日任务启用,task_id:{ar.id}')

 

2.chain

默认设置上一个每日任务的結果做为下一个每日任务的第一个主要参数

def test_primitive(request):
    promise = chain(mul.s(2, 2), mul.s(5), mul.s(8))()  #  160
    result = promise.get()
    return JsonResponse({'function': 'test_primitive', 'result': result})

 

3.chord

每日任务切分,分成headerbody两一部分,hearder每日任务实行完再实行body,在其中hearder回到結果做为参数传递给body

def test_primitive(request):
    # header:  [3, 12] 
    # body: xsum([3, 12])
    promise = chord(header=[tasks.add.s(1,2),tasks.mul.s(3,4)],body=tasks.xsum.s())()
    result = promise.get()
    return JsonResponse({'function': 'test_primitive', 'result': result})

 

celery管理方法和监管

celery根据flower部件完成管理方法和监管作用 ,flower部件不仅给予监管作用,还给予HTTP API可完成对wokertask的管理方法
官方网站:https://pypi.org/project/flower/
文本文档:https://flower.readthedocs.io/en/latest

1.安裝flower

pip3 install flower

2.运行flower

flower -A celery_demo--port=5555   
  • -A:新项目名
  • –port: 端口

3.在电脑浏览器键入:http://127.0.0.1:5555,可以见到以下网页页面

4.根据api实际操作

curl http://127.0.0.1:5555/api/workers

关注不迷路

扫码下方二维码,关注宇凡盒子公众号,免费获取最新技术内幕!

温馨提示:如果您访问和下载本站资源,表示您已同意只将下载文件用于研究、学习而非其他用途。
文章版权声明 1、本网站名称:宇凡盒子
2、本站文章未经许可,禁止转载!
3、如果文章内容介绍中无特别注明,本网站压缩包解压需要密码统一是:yufanbox.com
4、本站仅供资源信息交流学习,不保证资源的可用及完整性,不提供安装使用及技术服务。点此了解
5、如果您发现本站分享的资源侵犯了您的权益,请及时通知我们,我们会在接到通知后及时处理!提交入口
0

评论0

请先

站点公告

🚀 【宇凡盒子】全网资源库转储中心

👉 注册即送VIP权限👈

👻 全站资源免费下载✅,欢迎注册!

记得 【收藏】+【关注】 谢谢!~~~

立即注册
没有账号?注册  忘记密码?

社交账号快速登录