摘要
Celery,是Python程序员的好帮手,它能够帮你解决每日任务的烦恼。它就像是一个美食家,把每日任务推送到消息队列,然后让勤奋的厨师们来完成。Celery不仅简单易用,而且能够处理大量的任务。它就像是一位勤劳的工人,每天都能够完成数百个任务。
正文
Django(41)详细说明多线程每日任务架构Celery
celery详细介绍
Celery
是由Python
开发设计、简易、灵便、靠谱的分布式系统每日任务序列,是一个解决多线程每日任务的架构,其实质是经营者顾客实体模型,经营者推送每日任务到消息队列,顾客承担解决每日任务。Celery
偏重于即时实际操作,但对生产调度适用也很好,其每日能够 解决数以百计的每日任务。特性:
- 简易:了解
celery
的工作内容后,配备应用简易 - 高可用性:当每日任务实行不成功或实行全过程中产生联接终断,
celery
会全自动试着再次执行任务 - 迅速:一个单过程的
celery
每分可解决几百万个每日任务 - 灵便:基本上
celery
的每个部件都能够被拓展及自订制
Celery由三一部分组成:
- 消息中间件(Broker):官方网给予了许多候选计划方案,适用
RabbitMQ
、Redis
、Amazon SQS
、MongoDB
、Memcached
等,官方网强烈推荐RabbitMQ
- 每日任务实行模块(Worker):每日任务实行模块,承担从消息队列中取下每日任务实行,它能够 运行一个或是好几个,还可以运行在不一样的设备连接点,这就是其完成分布式系统的关键
- 結果储存(Backend):官方网给予了众多的储存方法适用:
RabbitMQ
、Redis
、Memcached
,SQLAlchemy
,Django ORM
、Apache Cassandra
、Elasticsearch
等
原理:
- 每日任务控制模块
Task
包括多线程每日任务和计划任务。在其中,多线程每日任务一般在领域模型中被开启高并发往消息队列,而计划任务由Celery Beat
过程周期性地将每日任务发往消息队列; - 每日任务实行模块
Worker
即时监控消息队列获得序列中的每日任务实行; Woker
实行完每日任务后将結果储存在Backend
中;
django运用Celery
django
架构要求/回应的全过程是同歩的,架构自身没法完成多线程回应。可是我们在新项目全过程中会常常会碰到一些用时的每日任务, 例如:邮件发送、发送信息、大数据统计这些,这种实际操作用时长,同歩实行对客户体验十分不友善,那麼在这类状况下就必须完成多线程实行。多线程实行前面一般应用ajax
,后端开发应用Celery
。
新项目运用
django
新项目运用celery
,关键有二种每日任务方法,一是多线程每日任务(上传者每日任务),一般是web要求,二是计划任务
多线程每日任务redis
1.安裝celery
pip3 install celery
2.celery.py
在主新项目文件目录下,新创建 celery.py
文档:
import os
import django
from celery import Celery
from django.conf import settings
# 设定系统软件系统变量,安裝django,务必设定,不然在运行celery的时候会出错
# celery_study 是当今新项目名
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_demo.settings.py')
django.setup()
app = Celery('celery_demo')
app.config_from_object('django.conf.settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
留意:是和settings.py
文档同文件目录,一定不可以创建在新项目网站根目录,要不然会造成celery
这一控制模块名的取名矛盾
另外,在主新项目的init.py
中,加上以下编码:
from .celery import celery_app
__all__ = ['celery_app']
3.settings.py
在环境变量中配备相匹配的redis配备:
# Broker配备,应用Redis做为消息中间件
BROKER_URL = 'redis://127.0.0.1:6379/0'
# BACKEND配备,这儿应用redis
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
# 結果实例化计划方案
CELERY_RESULT_SERIALIZER = 'json'
# 每日任务結果到期時间,秒
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
# 时区时间配备
CELERY_TIMEZONE='Asia/Shanghai'
更为详尽的配备可查询官方网文本文档:http://docs.celeryproject.org/en/latest/userguide/configuration.html
4.tasks.py
在子运用下创建分别相匹配的每日任务文档tasks.py
(务必是tasks.py这一名称,不允许改动)
from celery import shared_task
@shared_task
def add(x, y):
return x y
5.启用每日任务
在 views.py
中,根据 delay
方式启用每日任务,而且回到每日任务相匹配的 task_id
,这一id用以事后查看每日任务情况
from celery_app.tasks import add
def index(request):
ar = add.delay(10, 6)
return HttpResponse(f'早已实行celery的add每日任务启用,task_id:{ar.id}')
6.运行celery
在命令窗口中,转换到新项目根目录下,实行下列指令:
celery worker -A celery_demo -l info
- -A celery_demo:特定新项目app
- worker: 说明这是一个每日任务实行模块
- -l info:特定日志輸出等级
輸出下列結果,意味着运行celery
取得成功
大量celery
指令的主要参数,能够 键入celery --help
7.获得每日任务結果
在 views.py
中,根据AsyncResult.get()
获得結果
def get_result(request):
task_id = request.GET.get('task_id')
ar = result.AsyncResult(task_id)
if ar.ready():
return JsonResponse({"status": ar.state, "result": ar.get()})
else:
return JsonResponse({"status": ar.state, "result": ""})
AsyncResult
类的常见的特性和方式:
- state: 回到每日任务情况,等同于
status
; - task_id: 回到
每日任务id
; - result: 回到每日任务結果,同
get()
方式; - ready(): 分辨每日任务是不是实行及其有結果,有結果为
True
,不然False
; - info(): 获得每日任务信息内容,默认设置为結果;
- wait(t): 等候t秒后获得結果,若每日任务实行结束,则不等候立即获得結果,若每日任务在实行中,则
wait
期内一直堵塞,直至超日报错; - successful(): 分辨每日任务是不是取得成功,取得成功为
True
,不然为False
;
编码的准备工作都做完了,大家逐渐浏览电脑浏览器127.0.0.1/celery_app/
,获得下列結果
早已实行celery的add每日任务启用,task_id:b1e9096e-430c-4f1b-bbfc-1f0a0c98c7cb
这一步的功效:运行add
每日任务,随后放到消息中间件中,这儿大家用的是redis
,就可以根据redis专用工具查询,以下
随后大家以前运行的celery
的worker
过程会获得任务列表,逐一执行任务,实行完毕后会储存到backend中,最终根据前面ajax
轮循一个插口,依据task_id
获取每日任务的結果
下面大家浏览http://127.0.0.1:8000/celery_app/get_result/?task_id=b1e9096e-430c-4f1b-bbfc-1f0a0c98c7cb
,就能从网页页面上查询到結果,以下
{
"status": "SUCCESS",
"result": 16
}
表明计划任务实行取得成功,回到結果为16
计划任务
在第一步的多线程每日任务的基本上,开展一部分改动就可以在
1.settings.py
在settings
文档,配备以下编码就可以
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
'mul_every_10_seconds': {
# 每日任务途径
'task': 'celery_app.tasks.mul',
# 每10秒实行一次
'schedule': 10,
'args': (10, 5)
},
'xsum_week1_20_20_00': {
# 每日任务途径
'task': 'celery_app.tasks.xsum',
# 每星期一20点20分实行
'schedule': crontab(hour=20, minute=20, day_of_week=1),
'args': ([1,2,3,4],),
},
}
主要参数表明以下:
- task:每日任务涵数
- schedule:实行頻率,能够 是整形(分秒),还可以是
timedelta
目标,还可以是crontab
目标,还可以是自定类(承继celery.schedules.schedule
) - args:部位主要参数,目录或元组
- kwargs:关键词主要参数,词典
- options:可选主要参数,词典,一切
apply_async()
适用的主要参数 - relative:默认设置是
False
,取相对性于beat
的开始时间;设定为True
,则取设定的timedelta
時间
更为详尽的表明参照官方网文本文档:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#crontab-schedules
2.运行celery
各自运行worker
和beat
celery worker -A celery_demo -l debug
celery beat -A celery_demo -l debug
我们可以见到计划任务会每过10s就运作每日任务
运作完的結果会储存在redis
中
每日任务关联
Celery
可根据task
关联到案例获得到task
的前后文,那样我们可以在task
运作情况下获得到task
的情况,纪录有关日志等
我们可以想像那样一个情景,当每日任务碰到难题,实行不成功时,大家必须开展再试,完成编码以下
@shared_task(bind=True)
def add(self, x, y):
try:
logger.info('-add' * 10)
logger.info(f'{self.name}, id:{self.request.id}')
raise Exception
except Exception as e:
# 错误每4秒试着一次,一共试着4次
self.retry(exc=e, countdown=4, max_retries=4)
return x y
表明以下:
- 在装饰器中添加主要参数
bind=True
- 在
task
涵数中的第一个基本参数为self
self
目标是celery.app.task.Task
的案例,能够 用以完成再试等多种多样作用
然后我们在views.py
文档中,载入以下主视图涵数
def get_result(request):
task_id = request.GET.get('task_id')
ar = result.AsyncResult(task_id)
if ar.successful():
return JsonResponse({"status": ar.state, "result": ar.get()})
else:
return JsonResponse({"status": ar.state, "result": ""})
然后大家浏览http://127.0.0.1:8000/celery_app/
,建立一个每日任务id,回到以下結果
早已实行celery的add每日任务启用,task_id:f55dcfb7-e184-4a29-abe9-3e1e55a2ffad
随后运行celery指令:
celery worker -A celery_demo -l info
大家会发觉celery
中的每日任务会抛出去一个出现异常,而且再试了4次,这是由于我们在tasks
每日任务中积极抛出去了一个出现异常
[2021-06-02 11:27:55,487: INFO/MainProcess] Received task: celery_app.tasks.add[f55dcfb7-e184-4a29-abe9-3e1e55a2ffad] ETA:[2021-06-02 11:27:59.420668 08:00]
[2021-06-02 11:27:55,488: INFO/ForkPoolWorker-11] Task celery_app.tasks.add[f55dcfb7-e184-4a29-abe9-3e1e55a2ffad] retry: Retry in 4s: Exception()
最终大家浏览http://127.0.0.1:8000/celery_app/get_result/?task_id=f55dcfb7-e184-4a29-abe9-3e1e55a2ffad
,查看每日任务的結果
{
"status": "FAILURE",
"result": ""
}
因为大家积极抛出异常(为了更好地仿真模拟实行全过程中的不正确),这就造成了大家的情况为FAILURE
每日任务勾子
Celery
在执行任务时,给予了勾子方式用以在每日任务实行进行情况下开展相匹配的实际操作,在Task
源代码中给予了许多情况钩子函数如:on_success
(取得成功后实行)、on_failure
(不成功情况下实行)、on_retry
(每日任务再试情况下实行)、after_return
(每日任务回到情况下实行)
- 根据承继
Task
类,调用相匹配方式就可以,实例:
class MyHookTask(Task):
def on_success(self, retval, task_id, args, kwargs):
logger.info(f'task id:{task_id} , arg:{args} , successful !')
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.info(f'task id:{task_id} , arg:{args} , failed ! erros: {exc}')
def on_retry(self, exc, task_id, args, kwargs, einfo):
logger.info(f'task id:{task_id} , arg:{args} , retry ! erros: {exc}')
- 在相匹配的
task
涵数的装饰器中,根据base=MyHookTask
特定
@shared_task(base=MyHookTask, bind=True)
def mul(self, x, y):
......
每日任务编辑
在许多状况下,一个每日任务必须由好几个子每日任务或是一个每日任务必须许多流程才可以进行,Celery
也可以完成那样的每日任务,进行这种类的每日任务根据下列控制模块进行:
- group: 并行处理生产调度每日任务
- chain: 链条式线程同步
- chord: 相近
group
,但分header
和body
两个一部分,header
能够 是一个group
每日任务,实行进行后启用body
的每日任务 - map: 投射生产调度,根据键入好几个入参来数次生产调度同一个每日任务
- starmap: 相近map,入参相近
*args
- chunks: 将每日任务依照一定总数开展排序
1.group
最先在urls.py
中载入以下编码:
path('primitive/', views.test_primitive),
然后在views.py
中载入主视图涵数
from celery import result, group
def test_primitive(request):
lazy_group = group(mul.s(i, i) for i in range(10)) # 转化成10个每日任务
promise = lazy_group()
result = promise.get()
return JsonResponse({'function': 'test_primitive', 'result': result})
在tasks.py
文档中载入以下编码
@shared_task
def mul(x, y):
return x * y
表明:
根据task
涵数的 s
方式传到主要参数,运行每日任务,大家浏览http://127.0.0.1:8000/celery_app/primitive/
,会获得下列結果
{
"function": "test_primitive",
"result": [
0,
1,
4,
9,
16,
25,
36,
49,
64,
81
]
}
上边这类方式必须开展等候,假如仍然想完成多线程的方法,那麼就务必在tasks.py
中新创建一个task
方式,启用group
,实例以下:tasks.py
from celery.result import allow_join_result
@shared_task
def first_group():
with allow_join_result():
return group(mul.s(i, i) for i in range(10))().get()
urls.py
path('group_task/', views.group_task),
views.py
def group_task(request):
ar = first_group.delay()
return HttpResponse(f'早已实行celery的group_task每日任务启用,task_id:{ar.id}')
2.chain
默认设置上一个每日任务的結果做为下一个每日任务的第一个主要参数
def test_primitive(request):
promise = chain(mul.s(2, 2), mul.s(5), mul.s(8))() # 160
result = promise.get()
return JsonResponse({'function': 'test_primitive', 'result': result})
3.chord
每日任务切分,分成header
和body
两一部分,hearder
每日任务实行完再实行body
,在其中hearder
回到結果做为参数传递给body
def test_primitive(request):
# header: [3, 12]
# body: xsum([3, 12])
promise = chord(header=[tasks.add.s(1,2),tasks.mul.s(3,4)],body=tasks.xsum.s())()
result = promise.get()
return JsonResponse({'function': 'test_primitive', 'result': result})
celery管理方法和监管
celery
根据flower
部件完成管理方法和监管作用 ,flower
部件不仅给予监管作用,还给予HTTP API
可完成对woker
和task
的管理方法
官方网站:https://pypi.org/project/flower/
文本文档:https://flower.readthedocs.io/en/latest
1.安裝flower
pip3 install flower
2.运行flower
flower -A celery_demo--port=5555
- -A:新项目名
- –port: 端口
3.在电脑浏览器键入:http://127.0.0.1:5555
,可以见到以下网页页面
4.根据api实际操作
curl http://127.0.0.1:5555/api/workers
关注不迷路
扫码下方二维码,关注宇凡盒子公众号,免费获取最新技术内幕!
评论0