Django定时任务四种实现方法总结

Django定时任务四种实现方法总结


背景: 在使用Django框架开发web项目时,很多时候需要设置定时任务或让用户手动在页面上设置定时任务。

一、使用django-crontab插件来实现定时任务

1.0、 基本介绍

要使用django-crontab插件,只需要下载一个django-crontab包就可以使用cron表达式在Django框架中设置定时任务。这种方法不支持windows系统,功能也相对简单。

1.1、安装插件

pip install django-crontab

1.2、注册app

(1)在settings.py中INSTALLED_APPS引入app,完成子应用注册。

INSTALLED_APPS = [
    ...
    'django_crontab'
]

(2)在settings.py中配置定时任务,在settings.py最后增加一下代码:


# 定时任务
'''
*    *    *    *    * :分别表示 分(0-59)、时(0-23)、天(1 - 31)、月(1 - 12) 、周(星期中星期几 (0 - 7) (0 7 均为周天))
crontab范例:
每五分钟执行    */5 * * * *
每小时执行     0 * * * *
每天执行       0 0 * * *
每周一执行       0 0 * * 1
每月执行       0 0 1 * *
每天23点执行   0 23 * * *
'''
CRONJOBS = [
    ('*/1 * * * *', 'base.crontabs.confdict_handle', ' >> /tmp/logs/confdict_handle.log'), # 注意:/tmp/base_api 目录要手动创建
]

或者:

CRONJOBS = [
    ('*/5 * * * *', 'appname.cron.test','>>/home/test.log')
]

'''
‘/5 * * *’ 遵循的是crontab 语法。
‘appname.cron.test’,这个appname就是你开发时加入到settings中的那个。因为你的cron.py文件就在这个下面,否则找不到路径。cron 就是你自己起的任务文件的名字。test就是执行的函数中的内容。
‘>>/home/test.log’,通常会输出信息到一个文件中,就使用这个方法,注意的是‘>>’表示追加写入,’>’表示覆盖写入。
'''

一、附件部分(Linux 中的定时任务crontab的语法如下)

*  *  *  *  * command
分钟(0-59) 小时(0-23) 每个月的哪一天(1-31) 月份(1-12) 周几(0-6) shell脚本或者命令

核心语法:

CRONJOBS = [('*/5 * * * *', '任务路径.任务函数名','>>/home/book.log')]

参数说明:
‘*/5 * * *’ 表示五分钟一次,而django-crontab是调用Linux的crontab.

常见的参数:

  1. "*"表示可选的所有取值范围内的数字
  2. "/"表示'每',比如若第一个参数为/5,就是五分钟一次,例如:*/5就是每5个单位
  3. - 代表从某个数字到某个数字
  4. , 分开几个离散的数字
    Django定时任务四种实现方法总结

应用示例:

有兴趣的小伙伴可以深入研究下 Linux 的crontab定时任务。参考链接:django 定时任务 django-crontab 的使用

在执行脚本中:
0 6 * * * commands >> /tmp/test.log # 每天早上6点执行, 并将信息追加到test.log中
0 */2 * * * commands # 每隔2小时执行一次

1.3、编写定时任务方法

在本例中是在apps/base/crontabs.py(一般,执行add new file under appname, named cron.py)中增加的定时任务

from .models import ConfDict # base内的一个model,定时任务多数用来操作数据库,因此给一个示例
import datetime

# 定时任务 
def confdict_handle():
    try:
    	objs = CondDict.objects.all()
    	print(obj)
        loca_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        print('本地时间:'+str(loca_time))
    except Exception as e:
        print('发生错误,错误信息为:', e)

1.4、如何使用&运行

(1)开启定时器

python manage.py crontab add

(2)将任务添加并生效(查看开启的定时器)

  • 显示当前的定时任务
python manage.py crontab show
  • 删除所有定时任务
python manage.py crontab remove

(3)重启django服务,执行

corntab -e

(4)查看定时任务

crontab -l

此时,应该是可以看到系统中创建了该定时任务。

1.5、django-crontab插件优缺点:

  • 优点:
    简单、方便、易于管理
    和django服务是分离的,不会影响到django对外提供的web服务。
  • 缺点:
    无法在Windows平台上运行(django_crontab必须在Linux的crontab开启的情况下才可以使用,不然会出现不执行的情况);
    就算在Linux系统上,也可能出现运行了没有效果的消息,至今未知原因。

二、使用django-apscheduler插件实现定时任务

2.0、基本介绍

django-apscheduler支持三种调度任务:固定时间间隔,固定时间点(日期),Crontab 命令。同时,它还支持异步执行、后台执行调度任务 配置简单、功能齐全、使用灵活、支持windows和linux,适合中小型项目。django-apscheduler中相关的概念和python的定时任务框架apscheduler中的概念是一样的

APScheduler的使用场景?

redis持久化存储时,使用APScheduler,使数据同步。
用户下单后使用,规定30min内必须支付,否则取消订单。

APScheduler 与 crontab 同为定时任务工具,有什么区别?

(1)crontab:

(2)APScheduler:

2.1、安装插件

pip install django-apscheduler

或者

pip install apscheduler

2.2、使用插件

修改settings.py增加以下代码:

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'django_apscheduler',     # 新加入的定时任务插件django-apscheduler
    'UserManger.apps.UsermangerConfig',
]

2.3、迁移数据库

因为django-apscheduler会创建表来存储定时任务的一些信息,所以将app加入之后需要迁移数据

python manage.py migrate

去数据库中看一看,生成了两个表格,大部分都顾名思义。

  1. django_apscheduler_djangojob——用于存储任务的表格
    Django定时任务四种实现方法总结
  2. django_apscheduler_djangojobexecution——用于存储任务执行状态的表格
    Django定时任务四种实现方法总结

参数说明:

Note:
这两个表用来管理你所需要的定时任务,然后就开始在任一view.py下写你需要实现的任务:

2.4、完整示例 在views.py中增加你的定时任务代码

注意:如果在其他文件中添加代码是没有效果的

from apscheduler.schedulers.background import BackgroundScheduler # 使用它可以使你的定时任务在后台运行
from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job
import time
'''
date:在您希望在某个特定时间仅运行一次作业时使用
interval:当您要以固定的时间间隔运行作业时使用
cron:以crontab的方式运行定时任务
minutes:设置以分钟为单位的定时器
seconds:设置以秒为单位的定时器
'''

try:
    scheduler = BackgroundScheduler() # 创建定时任务的调度器对象——实例化调度器
    # 调度器使用DjangoJobStore()
    scheduler.add_jobstore(DjangoJobStore(), "default")
	# 'cron'方式循环,周一到周五,每天9:30:10执行,id为工作ID作为标记  
    # ('scheduler',"interval", seconds=1)  #用interval方式循环,每一秒执行一次  
    @register_job(scheduler, 'cron', day_of_week='mon-fri', hour='9', minute='30', second='10',id='task_time')  
    #@register_job(scheduler, "interval", seconds=5)
    
    def my_job(param1, param2):  # 定义定时任务
        # 定时每5秒执行一次
        #t_now = time.localtime()
        print(time.strftime('%Y-%m-%d %H:%M:%S'))
    
    # 监控任务 
    register_events(scheduler)
    
	# 向调度器中添加定时任务
	scheduler.add_job(my_job, 'date', args=[100, 'python'])
    # 启动定时任务调度器工作——调度器开始 
    scheduler.start()
except Exception as e:
    print('定时任务异常:%s' % str(e))

2.6、如何使用&运行

apscheduler定时任务会跟随django项目一起运行因此直接启动django即可。

python manage.py runserver

2.7、django-apscheduler插件优缺点:

二、附件部分(django-apscheduler功能详解)

创建任务:
Django定时任务四种实现方法总结
有两种创建任务的方法:装饰器和add_job函数

1. 装饰器
在任意view.py中实现代码(我习惯新开一个app专门实现定时任务):

典型范例:

import time
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_job, register_events

print('django-apscheduler')

def job2(name):
    # 具体要执行的代码
    print('{} 任务运行成功!{}'.format(name,time.strftime("%Y-%m-%d %H:%M:%S")))

# 实例化调度器
scheduler = BackgroundScheduler()
# 调度器使用DjangoJobStore()
scheduler.add_jobstore(DjangoJobStore(), "default")
# 添加任务1
# 每隔5s执行这个任务
@register_job(scheduler,"interval", seconds=5,args=['王路'],id='job1')
# 每天8点半执行这个任务
#@register_job(scheduler, 'cron', id='test', hour=8, minute=30,args=['test'])
def job1(name):
    # 具体要执行的代码
    print('{} 任务运行成功!{}'.format(name,time.strftime("%Y-%m-%d %H:%M:%S")))

scheduler.add_job(job2,"interval",seconds=10,args=['王飞'],id="job2")

# 监控任务——注册定时任务
register_events(scheduler)
# 调度器开始运行
scheduler.start()

启动服务 python manage.py runserver 这个任务就会被存储到django_apscheduler_djangojob表中,并按照设置定时的执行程序。

装饰器@register_job()参数说明:

  1. scheduler: 指定调度器
  2. trigger: 任务执行的方式,共有三种: ‘date’、‘interval’、‘cron’。
    • ‘date’ + ‘run_date’ 的参数组合, 能实现单次任务。
      例子: 2019-07-07 22:49:00 执行任务
      @register_job(scheduler, ‘date’, id=‘test’, run_date=‘2019-07-07 22:49:00’)
      注:在亲测时,执行完任务会报错,原因时执行完任务后会去mysql中删除djangojob表中的任务。但是djangojobexecution表记录着执行结果,有外键关联着djangojob表,所以删除时显示有外键约束错误。但是任务会正常执行,执行之后也会正常删除。
    • ‘interval’ + ‘hours’ + ‘minutes’ + … 的参数组合,能实现间隔性任务。
      例子:每隔3个半小时执行任务
      @register_job(scheduler, ‘interval’, id=‘test’, hours=3, minutes=30)
      还有seconds,days参数可以选择
      注:如果任务需要执行10秒,而间隔设置为1秒,它是不会给你开10个线程同时去执行10个任务的。它会错过其他任务直到当前任务完成。
    • ‘cron’ + ‘hour’ + ‘minute’+…的参数组合,能实现cron类的任务。
      例子:每天的8点半执行任务
      @register_job(scheduler, ‘cron’, id=‘test’, hour=8, minute=30)
      还有day,second,month等参数可以选择。
  3. id: 任务的名字,不传的话会自动生成。不过为了之后对任务进行暂停、开启、删除等操作,建议给一个名字。并且是唯一的,如果多个任务取一个名字,之前的任务就会被覆盖。
  4. args: list类型。执行代码所需要的参数。
  5. next_run_time:datetime类型。开始执行时间。如果你现在创建一个定时任务,想3天后凌晨三点半自动给你女朋友发微信,那就需要这个参数了。

还有些其他的参数感兴趣的同学可以查看源代码来了解。

2. add_job函数

装饰器的方法适合于写代码的人自己创建任务,如果想让用户通过页面输入参数,并提交来手动创建定时任务,就需要使用add_job函数

下面这个小例子,前端传递json数据给后端,触发test_add_task函数,来添加任务:

import json
from django.http import JsonResponse
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job


scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), 'default')

# 与前端的接口
def test_add_task(request):
    if request.method == 'POST':
        content = json.loads(request.body.decode())  # 接收参数
        try:
            start_time = content['start_time']  # 用户输入的任务开始时间, '10:00:00'
            start_time = start_time.split(':')
            hour = int(start_time)[0]
            minute = int(start_time)[1]
            second = int(start_time)[2]
            s = content['s']  # 接收执行任务的各种参数
            # 创建任务
            scheduler.add_job(test, 'cron', hour=hour, minute=minute, second=second, args=[s])
            code = '200'
            message = 'success'
        except Exception as e:
            code = '400'
            message = e
            
        back = {
            'code': code,
            'message': message
        }
        return JsonResponse(json.dumps(data, ensure_ascii=False), safe=False)
        
# 具体要执行的代码
def test(s):
    pass
    

register_events(scheduler)
scheduler.start()

这样就可以由前端用户来手动设置定时任务了。

add_job函数参数说明:
和装饰器的参数大同小异,只是第一个参数不同。
如果具体要执行的函数和调用它的函数在一个文件中,那么只需要传递这个函数名就可以了(如上面的例子)。
但是我习惯将具体的业务代码写到另外一个文件中,view.py中只写前后端交互的接口函数,这种情况下传递的参数为一个字符串,格式为: ‘package.module:some.object’,即 包名.模块:函数名

参考链接:详解django-apscheduler的使用方法

基础组件:
Django定时任务四种实现方法总结
APScheduler 有四种组件,分别是:调度器(scheduler),作业存储(job store),触发器(trigger),执行器(executor)

  • schedulers(调度器)
    它是任务调度器,属于控制器角色。它配置作业存储器和执行器可以在调度器中完成,例如添加、修改和移除作业。
  • triggers(触发器)
    描述调度任务被触发的条件。不过触发器完全是无状态的。
  • job stores(作业存储器)
    任务持久化仓库,默认保存任务在内存中,也可将任务保存都各种数据库中。
  • executors(执行器)
    负责处理作业的运行,它们通常通过在作业中提交指定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。

schedulers(调度器):
它提供 7 种调度器,能够满足我们各种场景的需要。例如:后台执行某个操作,异步执行操作等。调度器分别是:

  • BlockingScheduler : 调度器在当前进程的主线程中运行,也就是会阻塞当前线程。
  • BackgroundScheduler : 调度器在后台线程中运行,不会阻塞当前线程。(Django框架使用)
  • AsyncIOScheduler : 结合 asyncio 模块(一个异步框架)一起使用。
  • GeventScheduler : 程序中使用 gevent(高性能的Python并发框架)作为IO模型,和 – GeventExecutor 配合使用。
  • TornadoScheduler : 程序中使用 Tornado(一个web框架)的IO模型,用 ioloop.add_timeout 完成定时唤醒。
  • TwistedScheduler : 配合 TwistedExecutor,用 reactor.callLater 完成定时唤醒。scrapy爬虫框架
  • QtScheduler : 你的应用是一个 Qt 应用,需使用QTimer完成定时唤醒。

triggers(触发器):

它提供 3种内建的 trigger:

  1. date 触发器 作业任务只会执行一次。它表示特定的时间点触发。它的参数如下:
参数说明
run_date (datetime 或 str)作业的运行日期或时间
timezone (datetime.tzinfo 或 str)指定时区

典型范例1:

# 在 2017-12-13 时刻运行一次 job_func 方法
scheduler .add_job(job_func, 'date', run_date=date(2017, 12, 13), args=['text'])
# 在 2017-12-13 14:00:00 时刻运行一次 job_func 方法
scheduler .add_job(job_func, 'date', run_date=datetime(2017, 12, 13, 14, 0, 0), args=['text'])
# 在 2020-12-13 14:00:01 时刻运行一次 job_func 方法
scheduler.add_job(job3,"date",run_date='2020-12-13 14:00:01',args=['王飞'],id="job3")

典型范例2:

 # 每天0点执行函数的代码,0点的话,hour可以不用写
app.scheduler.add_job(函数名, "cron", hour=0, args=[函数需要传的参数]) 

#每天凌晨3点执行代码
app.scheduler.add_job(函数名, "cron", hour=3, args=[app])

#如果date后面没有参数的话,就是立刻执行代码,一般测试的时候用
app.scheduler.add_job(函数名, "date", args=[app])
  1. interval 触发器 固定时间间隔触发。interval 间隔调度,参数如下:
参数说明
weeks (int)间隔几周
days (int)间隔几天
hours (int)间隔几小时
minutes (int)间隔几分钟
seconds (int)间隔多少秒
start_date (datetime 或 str)开始日期
end_date (datetime 或 str)结束日期
timezone (datetime.tzinfo 或str)时区

典型范例:

# 每隔两分钟执行一次 job_func 方法
scheduler .add_job(job_func, 'interval', minutes=2)
# 在 2017-12-13 14:00:01 ~ 2017-12-13 14:00:10 之间, 每隔两分钟执行一次 job_func 方法
scheduler .add_job(job_func, 'interval', minutes=2, start_date='2017-12-13 14:00:01' , end_date='2017-12-13 14:00:10')
  1. cron 触发器 在特定时间周期性地触发,和Linux crontab格式兼容。
参数说明
year (int 或 str)年,4位数字
month (int 或 str)月 (范围1-12)
day (int 或 str)日 (范围1-31
week (int 或 str)周 (范围1-53)
day_of_week (int 或 str)周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
hour (int 或 str)时 (范围0-23)
minute (int 或 str)分 (范围0-59)
second (int 或 str)秒 (范围0-59)
start_date (datetime 或 str)最早开始日期(包含)
end_date (datetime 或 str)最晚结束时间(包含)
timezone (datetime.tzinfo 或str)指定时区

这些参数是支持算数表达式,取值格式有如下:

Django定时任务四种实现方法总结

典型范例:

# 在每天的2点35分36分37分 执行 job_func 任务
scheduler.add_job(job4,"cron",hour='2', minute='35-37',args=['王涛'],id="job4")

作业存储(job store):
Django定时任务四种实现方法总结
有两种添加方法,其中一种是 add_job(), 另一种则是@register_job()修饰器来修饰函数。
Django定时任务四种实现方法总结
这个两种办法的区别是:第一种方法返回一个 Job 的实例,可以用来改变或者移除 job。第二种方法只适用于应用运行期间不会改变的 job。

典型范例:

@register_job(scheduler, 'cron', day_of_week='mon-fri', hour='9', minute='30', second='10',id='task_time')·
        def test_job():
           t_now = time.localtime()
           print(t_now)

其他功能:

django-apscheduler框架还提供了很多操作定时任务的函数。比如:

  • 删除任务:scheduler.remove_job(job_name)
  • 暂停任务:scheduler.pause_job(job_name)
  • 开启任务:scheduler.resume_job(job_name)
  • 获取所有任务:scheduler.get_jobs()
  • 修改任务:scheduler.modify_job(job_name)

注:修改任务只能修改参数,如果要修改执行时间的话,有3种方法
第一就把任务删了重新创建,
第二直接操作数据库,
第三用到下面重设任务。

可以在页面上做一个这样的表格,再加上简单的前后端交互就可以让用户自行管理定时任务:
Django定时任务四种实现方法总结

  • 重设任务:scheduler.reschedule_job(job_name)
scheduler.reschedule_job(job_id="job1", trigger='interval', minutes=1)

执行器(executor):
Django定时任务四种实现方法总结
执行调度任务的模块。最常用的 executor 是 ThreadPoolExecutor,存储路径是在Django数据库中。

执行器 executors 可以使用线程池(ThreadPoolExecutor)和进程池(ProcessPoolExecutor)

  1. 线程池:
from apscheduler.executors.pool import ThreadPoolExecutor
ThreadPoolExecutor(max_workers)  
ThreadPoolExecutor(20) # 最多20个线程同时执行

使用方法:

executors = {
      'default': ThreadPoolExecutor(20)
  }
  scheduler = BackgroundScheduler(executors=executors)
  1. 进程池:
from apscheduler.executors.pool import ProcessPoolExecutor
ProcessPoolExecutor(max_workers)
ProcessPoolExecutor(5) # 最多5个进程同时执行

使用方法:

 executors = {
      'default': ProcessPoolExecutor(3)
  }
  scheduler = BackgroundScheduler(executors=executors)

参考链接:定时任务APScheduler,随时可以保持数据同步

总结:
django-apscheduler使用起来十分方便。提供了基于日期、固定时间间隔以及crontab 类型的任务,我们可以在主程序的运行过程中快速增加新作业或删除旧作业,并且作业会存储在数据库中,当调度器重启时,不必重新添加作业,作业会恢复原状态继续执行。django-apscheduler可以当作一个跨平台的调度工具来使用,可以做为 linux 系统crontab 工具或 windows 计划任务程序的替换。需要注意的是,apscheduler不是一个守护进程或服务,它自身不带有任何命令行工具。它主要是要在现有的应用程序中运行。

对job的操作:

  • add_job():会返回一个apscheduler.job.Job的实例,可以用来改变或者移除job。
  • scheduler_job():只适应于应用运行期间不会改变的job

移除job:

  • remove_job():使用jobID移除
  • job.remove():使用add_job()返回实例

参考链接:Django高级特性:django-apscheduler定时任务

【以下内容未经验证,选择性参考】:摘自定时任务APScheduler,随时可以保持数据同步

  • 监控任务:使用django_apscheduler.jobstores提供的register_events监控任务:register_events(scheduler)

  • 程序运行(开启调度器):

scheduler.start()
  • 停止APScheduler运行(如果报错,调度器就立即停止执行):
scheduler.shutdown()

参考范例【未验证】01:

在任意一个app内的views.py中写好定时任务

from apscheduler.scheduler import Scheduler
from time import sleep


def task_Fun():
    '''
    这里写定时任务
    '''
    sleep(1)



sched = Scheduler()


@sched.interval_schedule(seconds=6)
def my_task1():
    print('定时任务1开始\n')
    task_Fun()
    print('定时任务1结束\n')

@sched.interval_schedule(hours=4)
def my_task2():
    print('定时任务2开始\n')
    sleep(1)
    print('定时任务2结束\n')


sched.start()

ok。启动django 项目,定时任务就会在你设定的时间执行了

参考链接:Django使用apscheduler完成定时任务

参考范例【未验证】02:

from django.utils import timezone
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
#from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
 
jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}

#异步式的
scheduler = BackgroundScheduler(
    jobstores=jobstores,
    executors=executors,
    job_defaults=job_defaults,
    timezone=timezone.get_current_timezone())
 
#阻塞的,适用于scheduler是独立服务的场景。
#scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=timezone.get_current_timezone())
#scheduler = BlockingScheduler(executors=executors, job_defaults=job_defaults, timezone=timezone.get_current_timezone())
 
def myjob():
    pass
 
try:
    scheduler.start()
    # 5s后执行myjob
    # 传入时间去除毫秒
    deadline = datetime.datetime.now().replace(microsecond=0) + datetime.timedelta(seconds=5)
    scheduler.add_job(myjob, 'date', run_date=deadline)
except (KeyboardInterrupt, SystemExit):
    scheduler.shutdown()

参考链接:django apscheduler在特定时间执行一次任务(run at a specify time only once)

三、使用Celery插件实现定时任务

3.0、基本介绍

Celery分布式任务队列。侧重实时操作,可用于生产系统处理数以百万计的任务,都用于大型项目,配置和使用较为复杂。Django的分布式主要由Celery框架实现,这是python开发的分布式任务队列。由于它本身不支持消息存储服务,所以需要第三方消息服务来传递任务,一般使用Redis。

3.1、安装依赖
本例建立在认为你已经知道如何使用Celery实现异步任务的基础上,需要学习的请移步
Django使用Celery

本例使用redis作为Borker和backend

pip install -U "celery[redis]"

3.2、使用插件

修改settings.py 增加以下代码

....
# Celery配置
from kombu import Exchange, Queue
# 设置任务接受的类型,默认是{'json'}
CELERY_ACCEPT_CONTENT = ['application/json']
# 设置task任务序列列化为json
CELERY_TASK_SERIALIZER = 'json'
# 请任务接受后存储时的类型
CELERY_RESULT_SERIALIZER = 'json'
# 时间格式化为中国时间
CELERY_TIMEZONE = 'Asia/Shanghai'
# 是否使用UTC时间
CELERY_ENABLE_UTC = False
# 指定borker为redis 如果指定rabbitmq CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
# 指定存储结果的地方,支持使用rpc、数据库、redis等等,具体可参考文档 # CELERY_RESULT_BACKEND = 'db+mysql://scott:tiger@localhost/foo' # mysql 作为后端数据库
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
# 设置任务过期时间 默认是一天,为None或0 表示永不过期
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
# 设置worker并发数,默认是cpu核心数
# CELERYD_CONCURRENCY = 12
# 设置每个worker最大任务数
CELERYD_MAX_TASKS_PER_CHILD = 100
# 指定任务的位置
CELERY_IMPORTS = (
    'base.tasks',
)
# 使用beat启动Celery定时任务
# schedule时间的具体设定参考:https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html
CELERYBEAT_SCHEDULE = {
    'add-every-10-seconds': {
        'task': 'base.tasks.cheduler_task',
        'schedule': 10,
        'args': ('hello', )
    },
}
...

3.3、编写定时任务代码

在本例中是在apps/base/tasks.py中增加的定时任务

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time

@shared_task
def cheduler_task(name):
    print(name)
    print(time.strftime('%X'))

3.4、如何使用&运行

启动定时任务beat

celery -A dase_django_api beat -l info

启动Celery worker 用来执行定时任务

celery -A dase_django_api worker -l -l info 

注意: 这里的 dase_django_api 要换成你的Celery APP的名称

3.5、Celery插件的优缺点:

  • 优点:
    稳定可靠、管理方便
    高性能、支持分布式
    Celery侧重于实时操作,可用于生产系统每天处理数以百万计的任务,可用于大型项目。
    可在分布的机器、进程、线程上执行任务调度。
  • 缺点:
    实现复杂
    部署复杂
    非轻量级
    配置和使用较为复杂,需要Redis数据库和多个python第三方库。

四、自建代码实现定时任务

4.1、创建定时任务
在apps/base下创建一个文件名为schedules.py;键入一下内容

import os, sys, time, datetime
import threading
import django
base_apth = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# print(base_apth)
# 将项目路径加入到系统path中,这样在导入模型等模块时就不会报模块找不到了
sys.path.append(base_apth)
os.environ['DJANGO_SETTINGS_MODULE'] ='base_django_api.settings' # 注意:base_django_api 是我的模块名,你在使用时需要跟换为你的模块
django.setup()
from base.models import ConfDict

def confdict_handle():
    while True:
        try:
            loca_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            print('本地时间:'+str(loca_time))
            time.sleep(10)
        except Exception as e:
            print('发生错误,错误信息为:', e)
            continue


def main():
    '''
    主函数,用于启动所有定时任务,因为当前定时任务是手动实现,因此可以自由发挥
    '''
    try:
        # 启动定时任务,多个任务时,使用多线程
        task1 = threading.Thread(target=confdict_handle)
        task1.start()
    except Exception as e:
        print('发生异常:%s' % str(e))

if __name__ == '__main__':
    main()

4.2、如何使用&运行

直接运行脚本即可

python apps/base/schedules.py

4.3、自建代码的优缺点:
优点:
自定义
高度自由
缺点:
过于简单
当任务过多时,占用资源也会增加

参考链接:Django用定时任务比较不同姿势,使用,的,多种,对比
Crontab和APScheduler
django 定时任务 django-crontab & APScheduler 使用

Python 定时任务(schedule, Apscheduler, celery, python-crontab)-爱代码爱编程

定时任务简介

定时任务, linux 自带的 crontab ,windows 自带的任务计划,都可以实现守时任务。没错,操作系统基本都会提供定时任务的实现,但是如果你想要更加精细化的控制,或者说任务程序需要跨平台运行,最好还是自己实现定时任务框架,Python 的 apscheduler 提供了非常丰富而且方便易用的定时任务接口。本文两种方式实现定时任务。可以直接参考目录三Django中使用django-apscheduler,目录二内容测试已通过,相关数据库配置暂时无空尝试。

APScheduler简介

APscheduler全称Advanced Python Scheduler :作用为在指定的时间规则执行指定的作业。

apscheduler的四大组件,分别是Triggers,Job stores,Executors,Schedulers

Scheduler添加job流程:

Django定时任务四种实现方法总结

Django定时任务四种实现方法总结

触发器(triggers):触发器包含调度逻辑,描述一个任务何时被触发,按日期或按时间间隔或按 cronjob 表达式三种方式触发。每个作业都有它自己的触发器,除了初始配置之外,触发器是完全无状态的。

执行器(executors):执行器是将指定的作业(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。

作业(任务)存储器(job stores):作业存储器指定了作业被存放的位置,默认情况下作业保存在内存,也可将作业保存在各种数据库中,当作业被存放在数据库中时,它会被序列化,当被重新加载时会反序列化。作业存储器充当保存、加载、更新和查找作业的中间商。在调度器之间不能共享作业存储。

注意:一个任务储存器不要共享给多个调度器,否则会导致状态混乱

调度器(schedulers):任务调度器,属于控制角色,通过它配置作业存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行在应用程序中,开发人员通常不需要直接处理作业存储器、执行器或触发器,配置作业存储器和执行器是通过调度器来完成的。

一个调度器由上方三个组件构成,一般来说,一个程序只要有一个调度器就可以了。开发者也不必直接操作任务储存器、执行器以及触发器,因为调度器提供了统一的接口,通过调度器就可以操作组件,比如任务的增删改查。

调度器工作流程】:

Django定时任务四种实现方法总结

调度器组件详解:

根据开发需求选择相应的组件,下面是不同的调度器组件:

  1. BlockingScheduler 阻塞式调度器:适用于只跑调度器的程序。

  2. BackgroundScheduler 后台调度器:适用于非阻塞的情况,调度器会在后台独立运行。

  3. AsyncIOScheduler AsyncIO调度器,适用于应用使用AsnycIO的情况。

  4. GeventScheduler Gevent调度器,适用于应用通过Gevent的情况。

  5. TornadoScheduler Tornado调度器,适用于构建Tornado应用。

  6. TwistedScheduler Twisted调度器,适用于构建Twisted应用。

  7. QtScheduler Qt调度器,适用于构建Qt应用。

(1)任务储存器的选择:

要看任务是否需要持久化。如果你运行的任务是无状态的,选择默认任务储存器MemoryJobStore就可以应付。但是,如果你需要在程序关闭或重启时,保存任务的状态,那么就要选择持久化的任务储存器。如果,作者推荐使用SQLAlchemyJobStore并搭配PostgreSQL作为后台数据库。这个方案可以提供强大的数据整合与保护功能。

(2)执行器的选择:

同样要看你的实际需求。默认的ThreadPoolExecutor线程池执行器方案可以满足大部分需求。如果,你的程序是计算密集型的,那么最好用ProcessPoolExecutor进程池执行器方案来充分利用多核算力。也可以将ProcessPoolExecutor作为第二执行器,混合使用两种不同的执行器。

配置一个任务,就要设置一个任务触发器。触发器可以设定任务运行的周期、次数和时间。

(3)APScheduler有三种内置的触发器:

一个任务也可以设定多种触发器,比如,可以设定同时满足所有触发器条件而触发,或者满足一项即触发。

触发器代码示例:

date 是最基本的一种调度,作业任务只会执行一次。它表示特定的时间点触发。它的参数如下:

【参数说明】

from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

def my_job(text):
    print(text)

# 在2019年4月15日执行
scheduler.add_job(my_job, 'date', run_date=date(2019, 4, 15), args=['测试任务'])

scheduler.start()

###########################################################################################
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

def my_job(text):
    print(text)
    
# datetime类型(用于精确时间)
scheduler.add_job(my_job, 'date', run_date=datetime(2019, 4, 15, 17, 30, 5), args=['测试任务'])

scheduler.start()

注意:run_date参数可以是date类型、datetime类型或文本类型。

scheduler.add_job(my_job, 'date', run_date='2009-11-06 16:30:05', args=['测试任务'])

【更多示例代码,参考链接】:python 定时任务APScheduler 使用介绍

【针对上述4个模块的详细介绍,参考博文】Python定时库APScheduler&Django使用django-apscheduler实现定时任务

有价值的实战项目参考

teprunner测试平台定时任务这次终于稳了

本文开发内容
作为测试平台而言,定时任务算是必备要素了,只有跑起来的自动化,才能算是真正的自动化。本文将给测试计划添加定时任务功能,具体如下:

  1. 前端添加测试计划的定时任务开关
  2. 采用crontab表达式设置计划时间
  3. 后端集成django-apschedule,在数据库中记录任务明细和执行详情。
  4. 定时清理执行记录。

前端效果图:
Django定时任务四种实现方法总结

Django – 定时任务模块设计与实践

页面静态化介绍

1. 页面静态化介绍、

1.为什么要做页面静态化

2.什么是页面静态化

2. 首页页面静态化实现

1.首页页面静态化实现步骤

2.首页页面静态化实现

import os
import time
from django.conf import settings
from django.template import loader
from apps.contents.models import ContentCategory
from apps.contents.utils import get_categories

def generate_static_index_html():
    """
    生成静态的主页html文件
    """
    print('%s: generate_static_index_html' % time.ctime())

    # 获取商品频道和分类
    categories = get_categories()

    # 广告内容
    contents = {}
    content_categories = ContentCategory.objects.all()
    for cat in content_categories:
        contents[cat.key] = cat.content_set.filter(status=True).order_by('sequence')

    # 渲染模板
    context = {
        'categories': categories,
        'contents': contents
    }

    # 获取首页模板文件
    template = loader.get_template('index.html')
    # 渲染首页html字符串
    html_text = template.render(context)
    # 将首页html字符串写入到指定目录,命名'index.html'
    file_path = os.path.join(settings.STATICFILES_DIRS[0], 'index.html')
    with open(file_path, 'w', encoding='utf-8') as f:
        f.write(html_text)

3. 定时任务crontab静态化首页

对于首页的静态化,考虑到页面的数据可能由多名运营人员维护,并且经常变动,所以将其做成定时任务,即定时执行静态化。

在Django执行定时任务,可以通过 django-crontab扩展来实现。

参考链接:定时器任务django-crontab的使用【静态化高频率页面,增加用户体验】【系统的定时器,独立于项目执行】【刘新宇】

APScheduler常见错误

报错No module named ‘apscheduler.scheduler’

BlockingScheduler算是会实行block阻塞程序运行(会阻塞主线程的运行)

APScheduler中两种调度器的区别及使用过程中要注意的问题

apscheduelr整体实现方式和扩展方式解读

参考链接:【Django】定时任务APScheduler

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
乘风的头像乘风管理团队
上一篇 2023年3月11日
下一篇 2023年3月11日

相关推荐