current position:Home>Django uses Django celery beat to dynamically add scheduled tasks

Django uses Django celery beat to dynamically add scheduled tasks

2022-01-30 05:09:17 Take the surname of Wang

Version information

 #  Plug in installation  
 Copy code 

Installation and configuration

  1. Install the corresponding celery edition
  2. To configure
 # django Timezone configuration 
 TIME_ZONE = 'Asia/Shanghai'
 #  If USE_TZ Set to True when ,Django Will use the default time zone of the system , At this time TIME_ZONE It doesn't work with or without settings 
 #  If USE_TZ  Set to False,TIME_ZONE = 'Asia/Shanghai',  Use Shanghai's UTC Time .
 USE_TZ = False
 # celery beat To configure 
 CELERY_BEAT_SCHEDULER = 'django-celery-beat.schedulers.DatabaseScheduler'
 # celery  Set the number of startup jobs for 
 #  Task prefetching function , I'll take as much as I can  n  individual , To ensure that the communication cost can be reduced .
 #  In some cases, deadlock can be prevented 
 # celery  Of  worker  How many tasks are executed to restart 
 #  Disable all speed limits , If network resources are limited , It's not recommended to drive full power .
 #  Set up an agent broker
 CELERY_BROKER_URL = 'redis://'
 #  Appoint  Backend
 Copy code 
  1. Generate database

    python migrate

     #  Table structure generated after migration 
     django_celery_beat.models.PeriodicTask  This model defines a single recurring task to run .
     django_celery_beat.models.IntervalSchedule  At specific intervals ( for example , Every time 5 second ) Running plans .
     django_celery_beat.models.CrontabSchedule  And like in cron Timetable for this area   A week of minutes, hours and days  DAY_OF_MONTH month_of_year
     django_celery_beat.models.PeriodicTasks  This model is used only as an index to track when the plan changes 
     Copy code 
  2. Configure... In the working directory

     # -*- coding: utf-8 -*-
     # @File:
     # @Content: celery Timing task configuration 
     import os
     from celery import Celery, platforms
     from celery.schedules import crontab
     from django.conf import settings
     os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_server.settings")
     app = Celery("django_server")
     app.config_from_object("django.conf:settings", namespace="CELERY")
     #  Storage location of scheduled tasks 
     app.autodiscover_tasks(["monitoring.tasks", "wechat.tasks"])
     Copy code 
  3. establish tasks Mission

     from celery import shared_task
     def alarm_monitor_task(**kwargs):
       print(" Timing task !!!")
     Copy code 
  4. Create a scheduled task

     from django_celery_beat.models import PeriodicTask, IntervalSchedule
     ----- Periodic tasks 
     #   establish 10 Minutes apart  interval  object 
     schedule, _ = IntervalSchedule.objects.update_or_create(every=10, period=IntervalSchedule.MINUTES)
     #  If the task exists, update , Create if it does not exist 
         "interval": schedule,  #  Create above 10 Minutes apart  interval  object 
         "task": "monitoring.tasks.alarm_monitor_task",  #  Specify tasks that need to be performed periodically 
         "args"=json.dumps(['arg1', 'arg2']),
         "kwargs": json.dumps({"a": 1, "b": 2}, ensure_ascii=False)  #  Incoming parameter 
       name=" Timing task -task",
     #  Optional parameters for periodic tasks 
     IntervalSchedule.DAYS  Fixed interval days 
     IntervalSchedule.HOURS  Fixed interval hours 
     IntervalSchedule.MINUTES  Fixed interval minutes 
     IntervalSchedule.SECONDS  Fixed interval seconds 
     IntervalSchedule.MICROSECONDS  Fixed interval microseconds 
     ----Crontab  Periodic tasks 
     from django_celery_beat.models import CrontabSchedule, PeriodicTask
     #  Create interval 30 Minutes to perform tasks 
     crontab, _ = CrontabSchedule.objects.update_or_create(
     #  Update if the task exists , There is no creation 
         "kwargs": json.dumps(kwargs, ensure_ascii=False),
         "task": "wechat.tasks.subscribe_task",
         "crontab": crontab,
     #  Delete task 
     task = PeriodicTask.objects.filter(name__startswith=sub_id)
     if task:
     #  Pause the current task 
     tasks = PeriodicTask.objects.filter(name__startswith=sub_id)
     if tasks:
         tasks.update(enabled=True if status else False)
     Copy code 
  1. Run the task

     #  Start the task  work
     celery -A django_server worker -l INFO --logfile=/var/log/dec_server/worker.log
     #  Start timer triggers  beat
     celery -A django_server beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler --logfile=/var/log/dec_server/beat.log
     Copy code 


Reference link :……

copyright notice
author[Take the surname of Wang],Please bring the original link to reprint, thank you.

Random recommended