最新消息:XAMPP默认安装之后是很不安全的,我们只需要点击左方菜单的 "安全"选项,按照向导操作即可完成安全设置。

什么是自动化任务?Python7种定时任务实现方式

XAMPP案例 admin 51浏览 0评论

什么是自动化任务

自动化任务就是将一系列可以手动去实现的操作的集合,在特定的条件下自动去执行,比较常见的场景:
  • 运行监控报警,监测要素的变化,触发阈值条件时执行一系列通知邮件、短信的发送,也可以是更复杂的触发
  • 自动化田间灌溉控制,对土壤含水量、天气预报和植物生长周期分析需水量来调节灌溉水量
  • 自动驾驶,根据行驶路线、实时路况信息分析进行自动的驾驶
  • 自动化测试,对目标系统有已知的预期,按照配置的流程去验证每一个功能的结果
  • 定时爬虫,比如我们需要长期的爬取招聘信息,来进行数据分析,就需要定时的去爬取某段时间的增量信息
  • 还有些统计类问题,比如我们需要对请求的IP进行判断是否是一个坏的IP(同一个IP下有一定比例的用户从事违规操作时,判定为非法IP),我们可以在用户请求是实时去计算坏IP的列表,但当用户量非常大的时候就不是一个很好的选择了,我们可以选择定期执行这个坏IP列表的计算任务,因为对于不同用户的坏IP列表是同一个
以上这些任务都可以认为是自动化任务,我们今天主要针对其中的定时任务进行讨论。

定时任务

定时任务的实现方式又有N多种,我就了解的几种实现举例说明:
  • Windows操作系统的任务计划程序
  • Linux操作系统的 Crontab 定时任务
  • Python的 循环+sleep
  • Python的 Timer
  • Python的 schedule
  • Python的 APScheduler
  • Python的 Celery
我以两个场景来设计实例代码
  • 每30秒打印一下当前时间
  • 在2020-06-21 00:00:00 (具体时刻)打印一下当前时间

1. Windows任务计划程序

当前版本win10
  • 右键“计算机”->选择“管理”;
  • 然后就可以打开 “计算机管理”界面;
  • 在界面的左侧有点击“系统工具”->任务计划程序 ;
  • 可以看到右侧有“操作”“任务计划程序”;
    有创建基本任务、创建任务、导入任务;
AAA000080
我以两个场景来设计实例代码
  • 每30秒打印一下当前时间
  • 在2020-06-21 00:00:00 (具体时刻)打印一下当前时间
任务1:由于任务计划程序最小执行周期是“5分钟”,是在高级设置中设置重复任务间隔,并选择持续时间“无限制” 无法实现30秒的打印
任务2:设置中选择一次,开始时间设置2020-06-21 00:00:00 (具体时刻)可以满足需求

2. Linux Crontab定时任务

Crontab定时任务,在每个任务周期中执行一次特定任务,固无法控制执行次数
时间格式如下:
*    *    *    *    *  program
-    -    -    -    -    -
|    |    |    |    |    |----- 要执行的程序
|    |    |    |    +----- 星期中星期几 (0 - 7) (星期天 为0) 默认*
|    |    |    +---------- 月份 (1 - 12) 默认*
|    |    +--------------- 一个月中的第几天 (1 - 31) 默认*
|    +-------------------- 小时 (0 - 23) 默认*
+------------------------- 分钟 (0 - 59) 默认*
我以两个场景来设计实例代码
  • 每30秒打印一下当前时间;
  • 在2020-06-21 00:00:00 (具体时刻)打印一下当前时间 ;
任务1:* * * * * echo $(date +%F%n%T)含义每分钟打印当前时间,最小执行周期每分钟 无法实现30秒的打印
任务2:* * 21 6 * echo $(date +%F%n%T)含义每年的6月21日打印当前时间 无法实现只执行一次的打印

3. Python 循环+sleep

以下代码使用的 python3.7

我以两个场景来设计实例代码
  • 每30秒打印一下当前时间;
  • 在2020-06-21 00:00:00 (具体时刻)打印一下当前时间 ;
任务1: 可以实现30秒的打印
借助循环和等待时间来实现任务执行,但是在一个线程组阻塞时不能进行其他操作,可以使用协程优化
import time
import datetime# 每30秒执行
while 1:
time.sleep(30)
print(datetime.datetime.now())
任务2: 可以实现特定时间的打印
每次循环判断当前时刻和目标是否一致,需要注意执行一次就需要跳出点前时刻的判断,不然在同一时刻会重复打印很多次
import time
import datetim# 特定时间执行
while 1:
now = datetime.datetime.now()
if now.year == 2020 and now.month == 6 and now.day == 21:
print(now)
break
time.sleep(1)

4. 线程Timer

以下代码使用的 python3.7

看下源码的描述
class Timer(Thread):
“”"Call a function after a specified number of seconds:t = Timer(30.0, f, args=None, kwargs=None)
t.start()
t.cancel() # stop the timer’s action if it’s still waiting”"”
在指定的秒数后调用函数
因为创建的定时是异步执行,所以不存在等待顺序执行问题。
  • 创建定时器 Timer(interval, function, args=None, kwargs=None);
  • 取消定时器cancel();
  • 使用线程方式执行start();
  • 等待线程执行结束join(self, timeout=None);
  • 定时器只能执行一次,如果需要重复执行,需要重新添加任务;
我以两个场景来设计实例代码
  • 每30秒打印一下当前时间
  • 在2020-06-21 00:00:00 (具体时刻)打印一下当前时间
任务1: 可以实现30秒的打印
虽然是可以实现每30秒一次的打印,但是定时器只能执行一次,如果需要重复添加任务,创建线程又比较浪费系统资源
from threading import Timer
import datetime
import timedef run_time(seconds, flag=False):
t = Timer(seconds, run_time, [seconds, True])
t.start()
if flag:
print(datetime.datetime.now())
time.sleep(3)
t.join()run_time(30)
任务2: 实现特定时间的打印
需要计算当前时间到目标时间间的秒数来实现
from threading import Timer
import datetime
import timedef run_time(seconds, flag=False):
if seconds > 0:
t = Timer(seconds, run_time, [seconds, True])
t.start()
if flag:
print(datetime.datetime.now())
time.sleep(3)
t.join()def get_seconds(run_date):
start = datetime.datetime.now()
end = datetime.datetime.strptime(run_date, “%Y-%m-%d %H:%M:%S”)
if end > start:
return (end-start).seconds
else:
return -1seconds = get_seconds(“2020-06-21 00:00:00″)
run_time(seconds)

5. schedule模块

以下代码使用的 python3.7

首先看下源码的描述
Python job scheduling for humans.github.com/dbader/scheduleAn in-process scheduler for periodic jobs that uses the builder pattern
for configuration. Schedule lets you run Python functions (or any other
callable) periodically at pre-determined intervals using a simple,
human-friendly syntax.Inspired by Addam Wiggins’ article “Rethinking Cron” [1] and the
“clockwork” Ruby module [2][3].

Features:
- A simple to use API for scheduling jobs.
- Very lightweight and no external dependencies.
- Excellent test coverage.
- Tested on Python 2.7, 3.5 and 3.6

Usage:
>>> import schedule
>>> import time

>>> def job(message=’stuff’):
>>> print(“I’m working on:”, message)

>>> schedule.every(10).minutes.do(job)
>>> schedule.every(5).to(10).days.do(job)
>>> schedule.every().hour.do(job, message=’things’)
>>> schedule.every().day.at(“10:30″).do(job)

>>> while True:
>>> schedule.run_pending()
>>> time.sleep(1)

任务1: 可以实现30秒的打印
可以实现每30秒一次的打印,还可以同时创建多个定时任务同时去执行,由于执行过程是顺序执行,pn休眠2秒,循环任务查询休眠1秒,会存在实际间隔时间并不是设定的30秒,当工作任务回非常耗时就会影响其他任务的触发时间
import datetime
import schedule
import timedef pn():
print(datetime.datetime.now())
time.sleep(2)schedule.clear()
schedule.every(30).seconds.do(pn)while 1:
schedule.run_pending()
time.sleep(1)

任务2: 实现特定时间的打印
需要计算当前时间到目标时间间的秒数来实现
import datetime
import schedule
import timedef pn():
print(datetime.datetime.now())
time.sleep(2)def get_seconds(run_date):
start = datetime.datetime.now()
end = datetime.datetime.strptime(run_date, “%Y-%m-%d %H:%M:%S”)
if end > start:
return (end – start).seconds
else:
return -1schedule.clear()
seconds = get_seconds(“2020-06-21 00:00:00″)
if seconds > 0:
schedule.every(seconds).seconds.do(pn)

while 1:
schedule.run_pending()
time.sleep(1)

6. APScheduler定时任务框架

以下代码使用的 python3.7

APScheduler是Python的一个定时任务框架,用于执行周期或者定时任务,
可以基于日期、时间间隔,及类似于Linux上的定时任务crontab类型的定时任务;
该该框架不仅可以添加、删除定时任务,还可以将任务存储到数据库中,实现任务的持久化,使用起来非常方便。
  • triggers(触发器):触发器包含调度逻辑,每一个作业有它自己的触发器;
  • job stores(作业存储):用来存储被调度的作业,默认的作业存储器是简单地把作业任务保存在内存中,支持存储到MongoDB,Redis数据库中;
  • executors(执行器):执行器用来执行定时任务,只是将需要执行的任务放在新的线程或者线程池中运行;
  • schedulers(调度器):调度器是将其它部分联系在一起,对使用者提供接口,进行任务添加,设置,删除。
    add_job 创建任务
创建任务支持三种类型触发器dateintervalcron
date触发器
  • run_date 具体的日期执行,时间参数run_date,可以是dete/time格式的字符串,当不写时分秒是 默认是 00:00:00
interval触发器
  • weeks 参数weeks 每n周后执行,int类型
  • days 参数days每n天后执行,int类型
  • hours: 参数hours每n小时后执行,int类型
  • minutes:参数minutes每n分钟后执行,int类型
  • seconds: 参数seconds每n秒后执行,int类型
  • start_date 参数start_date,可以是dete/time格式的字符串,控制执行循环的时间范围
  • end_date 参数 end_date,可以是dete/time格式的字符串,控制执行循环的时间范围
cron触发器
使用方式与 Crontab定时任务类似
  • year int类型 取值范围1970~9999, 默认*
  • month int类型 取值范围1~12, 默认1
  • day int类型 取值范围1~31, 默认1
  • week 一年中的第几周,int类型 取值范围1~53, 默认*
  • day_of_week一周中的第几天,int类型 取值范围0~6, 默认*
  • hour int类型 取值范围0~23, 默认0
  • minute int类型 取值范围0~59, 默认0
  • second int类型 取值范围0~59, 默认0
任务1: 可以实现30s的打印
使用interval类型触发器添加定时任务
可以实现每30秒一次的打印,还可以同时创建多个定时任务同时去执行,由于执行过程是顺序执行,pn休眠3秒,会存在实际间隔时间并不是设定的30秒,当工作任务回非常耗时就会影响其他任务的触发时间
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import timedef pn():
print(datetime.datetime.now())
time.sleep(3)def run_time(seconds):
scheduler = BlockingScheduler()
scheduler.add_job(pn, ‘interval’, seconds=seconds, id=str(time.time()))
scheduler.start()run_time(30)

任务2: 可以实现特定时间的打印
使用date类型或cron类型触发器添加定时任务
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import timedef pn():
print(datetime.datetime.now())
time.sleep(3)def run_time():
scheduler = BlockingScheduler()
scheduler.add_job(pn, ‘cron’, year=2020, month=6, day=21, id=str(time.time()))
scheduler.add_job(pn, ‘date’, run_date=”2020-06-21″, id=str(time.time()))
scheduler.start()run_time()

7. Celery分布式任务调度的框架

Celery是实时处理的异步任务调度的框架,需要借助中间件完成调度任务,也可以很好的用来做定时任务服务。
AAA0000080
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,由三部分组成:
  • 消息中间件(message broker):Celery本身不提供消息服务,依赖于中间件RabbitMQ, Redis等
  • 任务执行单元(worker): 是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中
  • 任务执行结果存储(task result store): 用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis等
以下代码使用版本 python2.7 Celery 3.2

目录结构

AAA79

celery_app_init_.py
# -*- coding: utf-8 -*-
# 拒绝隐式引入,如果celery.py和celery模块名字一样,避免冲突,需要加上这条语句
# 该代码中,名字是不一样的,最好也要不一样
from __future__ import absolute_import
from celery import Celeryapp = Celery(‘tasks’)
app.config_from_object(‘celery_app.celeryconfig’)
celery_app\celeryconfig.py
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from celery.schedules import crontab
from datetime import timedelta# 使用redis存储任务队列
BROKER_URL = ‘redis://127.0.0.1:6379/1′
# 使用redis存储结果
CELERY_RESULT_BACKEND = ‘redis://127.0.0.1:6379/2′# 时区设置
CELERY_TIMEZONE = ‘Asia/Shanghai’# 导入任务所在文件
CELERY_IMPORTS = [
'celery_app.celery_task.tasks',
]

# 需要执行任务的配置
CELERYBEAT_SCHEDULE = {
‘np-seconds-10′: {
# 具体需要执行的函数
# 该函数必须要使用@app.task装饰
‘task’: ‘celery_app.celery_task.tasks.pn’,
# 定时时间
# 每30秒分钟执行一次
‘schedule’: timedelta(seconds=30),
‘args’: ()
},
‘np-month-day’: {
‘task’: ‘celery_app.celery_task.tasks.pn’,
# 每年6月21日执行一次
‘schedule’: crontab(day_of_month=21, month_of_year=6),
‘args’: ()
},
}

celery_app\celery_task\tasks.py
# -*- coding: utf-8 -*-
from .. import app
import datetime
import time@app.task
def pn():
print(datetime.datetime.now())
time.sleep(3)
我以两个场景来设计实例代码
  • 每30秒打印一下当前时间
  • 在2020-06-21 00:00:00 (具体时刻)打印一下当前时间
任务1: 可以实现30秒的打印
‘np-seconds-10′: {
# 具体需要执行的函数
# 该函数必须要使用@app.task装饰
‘task’: ‘celery_app.celery_task.tasks.pn’,
# 定时时间
# 每30秒分钟执行一次
‘schedule’: timedelta(seconds=30),
‘args’: ()
},
任务2: 不可以实现指定时间的打印
一般指定时间也没有意义,每年的某个月日的任务比较多
使用方式与 Crontab定时任务类似
‘np-month-day’: {
‘task’: ‘celery_app.celery_task.tasks.pn’,
# 每年6月21日执行一次
‘schedule’: crontab(day_of_month=21, month_of_year=6),
‘args’: ()
},

启动定时任务

由于是分布式服务,我们需要启动 中间件,触发器和处理模块
启动Redis

AAA079

celery_app 统计目录下执行
命令行执行命令来启动 发布任务
celery -A celery_app beat

AAA0079

命令行执行命令来启动 执行任务
celery worker -A celery_app -l info

AAA00079

AAA000079

总结

简单总结上面七种定时任务实现:
  • Windows任务计划程序合适重启服务,定时执行bat脚本,清理日志等操作;
  • Linux Crontab定时任务适合执行shell脚本或者一系列Linux命令,大部分任务都可以使用,但对于部分项目需要在用户界面添加任务的情况支持不友善;
  • 循环+sleep方式适合简单测试;
  • Timer可以实现定时任务,但是对定点任务来说,需要检查当前时间点;
  • schedule可以定点定时执行,但是需要在循环中检测任务,而且存在阻塞,执行时间不能预期;
  • APScheduler框架功能相对完善,可以满足大多数定时任务;
  • Celery框架分布式调度,可与Django、Flask等Web框架完美结合使用,对于大任务量的服务支持比较好。

转载请注明:XAMPP中文组官网 » 什么是自动化任务?Python7种定时任务实现方式