current position:Home>The apscheduler module in Python implements scheduled tasks

The apscheduler module in Python implements scheduled tasks

2022-01-31 17:39:29 Notes of the lost schoolboy

This is my participation 11 The fourth of the yuegengwen challenge 17 God , Check out the activity details :2021 One last more challenge

Hardware and software environment

  • windows 10 64bits
  • anaconda with python 3.7
  • apscheduler 3.6.3

Preface

Speaking of scheduled tasks , The first reaction should be windows Own planned tasks or linux Self contained crontab, About ubuntu How to use... Under the operating system crontab You can refer to the following video . apscheduler It's a utility model python Timed task tool developed by language , It provides a very rich and easy-to-use timing task interface

install

Very simple installation , Use pip

pip install apscheduler
 Copy code 

apscheduler Four components of

  • triggers trigger By date 、 Time interval or contab Expressions trigger in three ways
  • job stores Job memory Specify the location where the job is stored , It is saved in memory by default , It can also be saved in various databases
  • executors actuator Submit the specified job to thread pool or process pool to run
  • schedulers Job scheduler Commonly used BackgroundScheduler( Background operation ) and BlockingScheduler ( Blocking type )

Code practice

Here are a few examples to see how to use apscheduler

import time
from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger


def my_job():
    print('my_job, {}'.format(time.ctime()))


if __name__ == "__main__":
    scheduler = BlockingScheduler()

    #  The interval is set to 1 second , You can also use minutes、hours、days、weeks etc. 
    intervalTrigger=IntervalTrigger(seconds=1)

    #  Set a for homework id, Facilitate the subsequent operation of the operation , Pause 、 Cancel waiting 
    scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
    scheduler.start()
    print('=== end. ===')

 Copy code 

Execute code , The output looks like this

apscheduler

Because we used BlockingScheduler, It's blocking , So I only saw my_job Output in method , And the statement print('=== end. ===') Not implemented .BackgroundScheduler It can run in the background , Does not block the execution of the main thread , Look at the following code

import time
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger


def my_job():
    print('my_job, {}'.format(time.ctime()))


if __name__ == "__main__":
    scheduler = BackgroundScheduler()
    intervalTrigger=IntervalTrigger(seconds=1)
    scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
    scheduler.start()
    print('=== end. ===')
    while True:
        time.sleep(1)

 Copy code 

The result of code execution is this

apscheduler

If you put triggers Set to DateTrigger, It becomes that the job is executed at a certain point in time , Examples are as follows

import time
import datetime
from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.triggers.date import DateTrigger


def my_job():
    print('my_job, {}'.format(time.ctime()))


if __name__ == "__main__":
    scheduler = BlockingScheduler()
    intervalTrigger=DateTrigger(run_date='2020-07-17 16:18:55')
    scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
    scheduler.start()

 Copy code 

Wait until the set time comes , The homework will be done by itself once

apscheduler

If you want to follow the specified cycle , You need to use CronTrigger 了 , The working principle is the same as UNIX in crontab Timed tasks are very similar , It can specify very detailed and complex rules . Similarly, look at the sample code

import time
from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger


def my_job():
    print('my_job, {}'.format(time.ctime()))


if __name__ == "__main__":
    scheduler = BlockingScheduler()
    
    #  Execute the job in the first second 
    intervalTrigger=CronTrigger(second=1)

    #  Daily 19:30:01 Perform operation 
    # intervalTrigger=CronTrigger(hour=19, minute=30, second=1)

    #  From year to year 10 month 1 Japan 19 Click to execute the job 
    # intervalTrigger=CronTrigger(month=10, day=1, hour=19)

    scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
    scheduler.start()

 Copy code 

The effect of code execution is like this

apscheduler

The code above does not use executors, Because there is only one homework , But from the debugging, we can find that , By default ,apscheduler It's also used ThreadPoolExecutor, And the size of the thread pool is 10

apscheduler

So let's see executors Use

import time
from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.executors.pool import ThreadPoolExecutor


def my_job():
    print('my_job, {}'.format(time.ctime()))


if __name__ == "__main__":

    executors = {
        'default': ThreadPoolExecutor(20)
    }

    scheduler = BlockingScheduler(executors=executors)

    intervalTrigger=IntervalTrigger(seconds=1)
    scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
    scheduler.start()

 Copy code 

You can see , Let's change the size of the thread pool to 20, Initializing scheduler Will be executors Pass in , The following operation is exactly the same as before

apscheduler

About ThreadPoolExecutor and ProcessPoolExecutor The choice of , Here's a principle , If it is cpu Intensive work , Use ProcessPoolExecutor, Other uses ThreadPoolExecutor, Of course ThreadPoolExecutor and ProcessPoolExecutor It can also be mixed

Finally, let's look at the job memory , We changed it to store it in sqlite in

import time
from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore


def my_job():
    print('my_job, {}'.format(time.ctime()))

jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}


if __name__ == "__main__":

    executors = {
        'default': ThreadPoolExecutor(20)
    }

    scheduler = BlockingScheduler(jobstores=jobstores, executors=executors)

    intervalTrigger=IntervalTrigger(seconds=1)
    scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
    scheduler.start()

 Copy code 

After code execution , Will be generated in the source directory sqlite Database files jobs.sqlite, We use graphical tools to open and view , You can see the of jobs stored in the database id And the next execution time of the job

apscheduler

Reference material

copyright notice
author[Notes of the lost schoolboy],Please bring the original link to reprint, thank you.
https://en.pythonmana.com/2022/01/202201311739258671.html

Random recommended