current position:Home>Python implements four schemes for timed tasks, lazy artifact

Python implements four schemes for timed tasks, lazy artifact

2022-02-02 05:25:17 jiejie

In daily work , We often use tasks that need to be performed periodically , One way is to use  Linux System native crond[1]  Combined with the command line . Another way is to use... Directly Python. The next is a common one Python Implementation of scheduled tasks .\

 picture

\

1、 utilize while True: + sleep() Achieve timed tasks

be located time Module sleep(secs) function , You can pause the currently executing thread secs Continue in seconds . The so-called pause , That is, the current thread enters the blocking state , When reach sleep() After the time specified by the function , Then it changes from blocking state to ready state , wait for CPU Dispatch .

Based on this feature, we can use while Dead cycle +sleep() The way to achieve a simple scheduled task .

Code example :

import datetime

import time

def time_printer():

    now = datetime.datetime.now()

    ts = now.strftime('%Y-%m-%d %H:%M:%S')

    print('do func time :', ts)

def loop_monitor():

    while True:

        time_printer()

        time.sleep(5)  #  Pause  5  second 

if __name__ == "__main__":

    loop_monitor()
 Copy code 

Main drawback :

  • Only the interval can be set , You cannot specify a specific time , Like every morning 8:00
  • sleep It's a blocking function , in other words sleep This period of time , The program can't do anything .

2、 Use Timeloop The library runs scheduled tasks

Timeloop[2]  Is a library , Can be used to run multi cycle tasks . This is a simple library , It USES decorator Mode runs marker functions in threads .

Sample code :

import time

from timeloop import Timeloop

from datetime import timedelta

tl = Timeloop()

@tl.job(interval=timedelta(seconds=2))

def sample_job_every_2s():

    print "2s job current time : {}".format(time.ctime())

@tl.job(interval=timedelta(seconds=5))

def sample_job_every_5s():

    print "5s job current time : {}".format(time.ctime())

@tl.job(interval=timedelta(seconds=10))

def sample_job_every_10s():

    print "10s job current time : {}".format(time.ctime())
 Copy code 

3、 utilize threading.Timer Achieve timed tasks

threading Module Timer Is a non blocking function , Than sleep A little better ,timer The most basic understanding is timer , We can start multiple scheduled tasks , These timer tasks are executed asynchronously , So there is no waiting sequence execution problem .

Timer(interval, function, args=[ ], kwargs={ })

  • interval: Specified time
  • function: The method to be executed
  • args/kwargs: Method parameters

Code example :

import datetime

from threading import Timer

def time_printer():

    now = datetime.datetime.now()

    ts = now.strftime('%Y-%m-%d %H:%M:%S')

    print('do func time :', ts)

    loop_monitor()

def loop_monitor():

    t = Timer(5, time_printer)

    t.start()

if __name__ == "__main__":

    loop_monitor()
 Copy code 

remarks :Timer It can only be executed once , Here you need to call , Otherwise, it can only be executed once

4、 Use built-in modules sched Achieve timed tasks

sched The module implements a general event scheduler , In the scheduler class, a delay function is used to wait for a specific time , Perform tasks . Support multithreaded applications at the same time , The delay function is called immediately after each task is executed , To ensure that other threads can also execute .

class sched.scheduler(timefunc, delayfunc) This class defines a common interface for scheduling events , It needs to pass in two parameters externally ,timefunc Is a function that returns a time type number without parameters ( Commonly used, such as time In the module time),delayfunc It should be a call that requires a parameter 、 And timefunc The output of is compatible 、 And acts as a function of delaying multiple time units ( Commonly used as time Modular sleep).

Code example :

import datetime

import time

import sched

def time_printer():

    now = datetime.datetime.now()

    ts = now.strftime('%Y-%m-%d %H:%M:%S')

    print('do func time :', ts)

    loop_monitor()

def loop_monitor():

    s = sched.scheduler(time.time, time.sleep)  #  Generate scheduler 

    s.enter(5, 1, time_printer, ())

    s.run()

if __name__ == "__main__":

    loop_monitor()
 Copy code 

scheduler Object main method :

  • enter(delay, priority, action, argument), Schedule an event to delay delay Time units .
  • cancel(event): Delete event from queue . If the event is not in the current queue , Then the method will run a ValueError.
  • run(): Run all scheduled events . This function will wait ( Use the... Passed to the constructor delayfunc() function ), Then execute the event , Until there are no more scheduled events .

Personal comments : Than threading.Timer Better , There is no need to call... In a loop .\

Python It is a very diverse and well developed language , So there will certainly be many functions I didn't consider , If anyone knows , You can tell me in the comments section

copyright notice
author[jiejie],Please bring the original link to reprint, thank you.
https://en.pythonmana.com/2022/02/202202020525161903.html

Random recommended