• celery介绍与使用


    一.celery介绍

    celery作用

    1.celery可以实现异步任务来提高项目的并发量,完成延迟任务、定时任务

    2.celery是一个简单、灵活、可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具

    celery架构

    1.消息中间件:broker 提交的任务(函数)都放到这里,celery本身不提供中间件,需要借助于第三方:redis,rabbitmq

    2.任务执行单元:worker,真正执行任务的地方,一个个进程,执行函数

    3.结果存储:backend,函数return的结果存储在这里,celery本身不提供结果存储,借助于第三方:redis,数据库,rabbitmq

    celery特点

    celery是独立的服务

    1.可以不依赖任何服务器,通过自身命令,启动

    2.celery服务为其他项目服务提供异步解决任务需求的

    注意:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

    二.celery快速使用

    1.安装模块

            官方介绍:Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform(不支持windows,请不要打开与该平台相关的任何问题)

    1. pip install celery
    2. pip install eventlet # Windows系统需安装

    2.使用步骤

    新建包:celery_task
        -在包先新建一个 celery.py
        -在里面写app的初始化
        -在包里新建app_task.py 编写相关任务 
        -其它程序,提交任务
        -启动worker ---》它可以先启动,在提交任务之前-->包所在的目录下
            celery -A celery_task worker -l info -P eventlet
        -查看任务执行的结果了

    1. '''celery_task/celery.py'''
    2. from celery import Celery
    3. backend = 'redis://127.0.0.1:6379/1'
    4. broker = 'redis://127.0.0.1:6379/0'
    5. # 一定不要忘了include
    6. app = Celery(__name__, broker=broker, backend=backend,include=['celery_task.home_task','celery_task.user_task'])
    1. '''celery_task/app_task.py'''
    2. from .celery import app
    3. @app.task
    4. def add(a, b):
    5. time.sleep(3)
    6. print('计算结果是:%s' % (a + b))
    7. return a + b
    1. '''add_task.py'''
    2. from celery_task.user_task import send_sms
    3. # 提交了一个发送短信异步任务
    4. res=send_sms.delay('132xxxxxxxx','9999')
    5. print(res) # 672237ce-c941-415e-9145-f31f90b94627
    6. # 任务执行,要启动worker
    7. # 查看任务执行的结果

    3.启动celery工作服务器

    1. celery -A tasks worker -l info -P eventlet
    2. celery -A tasks worker --loglevel=INFO -P eventlet

    4.backend中查看任务执行结果

    1. from tasks import app
    2. from celery.result import AsyncResult
    3. task_id = '672237ce-c941-415e-9145-f31f90b94627'
    4. if __name__ == '__main__':
    5. res = AsyncResult(id=task_id, app=app)
    6. if res.successful():
    7. result = res.get()
    8. print(result)
    9. # 等同上面代码
    10. # if res.state == 'SUCCESS':
    11. # result = res.get()
    12. # print(result)
    13. elif res.failed():
    14. print('任务失败')
    15. # elif res.state == 'FAILURE':
    16. # print('任务失败')
    17. elif res.status == 'PENDING':
    18. print('任务等待中被执行')
    19. elif res.status == 'RETRY':
    20. print('任务异常后正在重试')
    21. elif res.status == 'STARTED':
    22. print('任务已经开始被执行')

    AsyncResult下的方法

    1. def failed(self):
    2. """Return :const:`True` if the task failed."""
    3. return self.state == states.FAILURE
    4. def successful(self):
    5. """Return :const:`True` if the task executed successfully."""
    6. return self.state == states.SUCCESS

    3.celery开启定时、延迟任务、异步任务

    异步任务

    task.delay(*args, **kwargs)
    

    定时任务

    1. app.conf.beat_schedule = {
    2. 'send_sms_task': {
    3. 'task': 'celery_task.add_task.send_sms', # 路径
    4. 'schedule': timedelta(seconds=5), # 每五秒执行一次
    5. # 'schedule': crontab(hour=12, day_of_week=1), # 每周一12点发送验证码
    6. 'args': ('132xxxxxxxx', '7777'),
    7. },
    8. }

    延迟任务

    1. task.apply_async(args=[参数,参数],eta=时间对象(utc时间))
    2. from datetime import timedelta, datetime
    3. res = add.apply_async(args=(1, 2), eta=(datetime.utcnow() + timedelta(seconds=20)))
    4. print(res.task_id) # c78505e2-614d-4bb2-930c-c73c325af519

    三.在django中使用

    在包内的celery.py中添加代码

    1. import os
    2. from celery import Celery
    3. from datetime import timedelta
    4. from celery.schedules import crontab
    5. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy.settings.dev')
    6. import django
    7. django.setup()
    8. broker = 'redis://127.0.0.1:6379/0'
    9. backend = 'redis://127.0.0.1:6379/1'
    10. app = Celery(main=__name__, broker=broker, backend=backend,
    11. include=['celery_tasks.home_tasks', 'celery_tasks.user_tasks'])

    四.双写一致性

    为了提高并发量和访问速度我们把数据存放到redis中

    1. class SlideShowView(GenericViewSet, ListMixinView):
    2. queryset = SlideShow.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[
    3. :settings.SLIDE_SHOW_COUNT]
    4. serializer_class = SlideShowSer
    5. def list(self, request, *args, **kwargs):
    6. result = cache.get('banner_list')
    7. if result:
    8. print('走了缓存')
    9. return APIResponse(code=1001, result=result)
    10. res = super().list(request, *args, **kwargs)
    11. result = res.data.get('result')
    12. cache.set('banner_list', result)
    13. print('走了数据库')
    14. return res

    celery定时任务实现双写一致性

    当把数据存放到redis中时我们修改数据库但是redis中的数据不会改变就会造成数据不一致的情况

    - 解决方式一:

    1. 修改mysql数据库,删除缓存 【缓存的修改是在后】
    2. 修改数据库,修改缓存 【缓存的修改是在后】
    3. 定时更新缓存,针对于实时性不是很高的接口适合定时更新.

     - 解决方式二:

    开启crlery定时每30分钟朝数据库获取一次数据存放到redis中

    1. #home_tasks.py 首页相关任务
    2. import time
    3. from .celery import app
    4. from home.models import SlideShow
    5. from django.conf import settings
    6. from home.serializer import SlideShowSer
    7. from django.core.cache import cache
    8. @app.task
    9. def update_banner():
    10. # 更新缓存
    11. queryset = SlideShow.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.SLIDE_SHOW_COUNT]
    12. ser = SlideShowSer(instance=queryset, many=True)
    13. # print(ser.data)
    14. for item in ser.data:
    15. item['image'] = settings.HOST_URL + item['image']
    16. cache.set('banner_list', ser.data)
    17. return True
    1. # celery.py
    2. import os
    3. from celery import Celery
    4. from datetime import timedelta
    5. from celery.schedules import crontab
    6. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy.settings.dev')
    7. import django
    8. django.setup()
    9. broker = 'redis://127.0.0.1:6379/0'
    10. backend = 'redis://127.0.0.1:6379/1'
    11. app = Celery(main=__name__, broker=broker, backend=backend,
    12. include=['celery_tasks.home_tasks', 'celery_tasks.user_tasks'])
    13. app.conf.beat_schedule = {
    14. # 定时任务
    15. 'update_banner': {
    16. 'task': 'celery_tasks.home_tasks.update_banner',
    17. 'schedule': timedelta(minutes=30),
    18. # 'schedule': crontab(hour=8, day_of_week=1),
    19. 'args': (),
    20. },
    21. }

  • 相关阅读:
    ai智能电话机器人如何撑起一个部门
    systemVerilog的变量类型转换
    银河麒麟4.0Kylin桌面版安装Java环境
    Pandas写入Excel文件如何避免覆盖已有Sheet
    长尾分布系列论文解析(二)Delving into Deep Imbalanced Regression
    【Python+selenium】如何高效地将driver定位到当前窗口
    SpringMVC+SpringBoot【理解版】
    在kubernetes中部署kubersphere
    Linux命令入门教程(四):文本编辑篇
    23种设计模式(八)代理模式(阁瑞钛伦特软件-九耶实训)
  • 原文地址:https://blog.csdn.net/weixin_52596593/article/details/128192982