摘要.
# 简介
Celery 是一个由 Python 实现的分布式任务队列,任务队列通常有 3 个方面的功能。
- 1. 减缓高并发压力,先将任务写入队列,有空余资源再运行
- 2. 执行定时任务,先将任务写入队列,指定时间下再执行
- 3. 异步任务,web 中存在耗时任务可以先将其写入队列,然后后台任务进程去执行
已经有很多文章来描述 Celery 的用法与简单原理,本篇文章也会简单提及,但不会费太多笔墨。
本篇重点在于,利用 Python 动手实现一个简单的 Celery,并使用自己实现的 Celery 实现异步任务,与上一篇「Python Web:Flask 异步执行任务」一样,通过 Flask 构建一个简单的 web,然后执行耗时任务,希望前端可以通过进度条显示任务的进度。
需注意,这里不会去解读 Celery 的源码,其源码具有很多工程细节,比较复杂,这里只是从其本质出发,简单的实现一个玩具 Celery,这个玩具 Celery 在稳定性、效率等方面当然不能与 Celery 相比,但可以很好的理解 Celery 大体是怎么实现的。
本文讲究的是「形离神合」,与 Celery 实现细节不同,但本质原理相同。
那我们开始吧!
# Celery 的概念与原理
Celery 5 个关键的概念,弄明白,就大致理解 Celery 了。
- 1.Task(任务) 简单而言就是你要做的事情,如用户注册流程中的发送邮件
- 2.Worker(工作者) 在后台处理 Task 的人
- 3.Broker(经纪人) 本质是一种队列,Task 会交给 Broker ,Worker 会从 Broker 中取 Task ,并处理
- 4.Beat 定时任务调度器,根据定的时间,向 Broker 中添加数据,然后等待 Worker 去处理
- 5.Backend 用于保存 Worker 执行结果的对象,每个 Task 都要有返回值,这些返回值,就在 Backend 中
这里我们抛开这里的各种概念,从更本质的角度来看 Celery,发现它就一个任务序列化存储与反序列化获取的过程。
以 Web 异步任务为例,使用方式通常为:
- 1. 有一个要长时间处理 I/O 的函数,如果不将其异步执行就会产生的阻塞,这通常是不被允许的
- 2. 启动一个后台任务执行进程
- 3. 当要执行耗时函数时,不会立刻同步运行,而是提取函数的关键数据,将其序列化存储到队列中,队列可以使用 Redis 或其他方式实现
- 4. 后台任务执行进程会从队列中获取数据,并将其反序列化还原
- 5. 后台任务执行进程会使用原来的函数以及还原的数据完成函数的执行,从而实现异步执行的效果。
流程并不复杂,Celery 中不同的概念分别负责上面流程中的不同部分。
# 实现一个简单的 Celery
接着我们来实现一个 Celery,这里 Celery 选择 Redis 作为后端。
先来整理一个大体的框架。
首先我们需要定义一个 Task 类来表示要执行的任务,不同的任务要执行的具体逻辑由使用者自身编写。
接着要定义一个任务队列,即 Celery 中的 Broker,用于存储要执行的任务。
随后要定义执行进程 Worker,Worker 要从 Broker 中获取任务并去执行。
最后还需要定义一个用于存储任务返回数据的类,即 Celery 中的 Backend。
看上去有点复杂,不慌,其实很简单。
# 实现任务类
首先来实现 task.py,用于定义任务相关的一些逻辑
import abc
import json
import uuid
import traceback
import pickle
from broker import Broker
from backend import Backend
class BaseTask(abc.ABC):
"""
Example Usage:
class AdderTask(BaseTask):
task_name = "AdderTask"
def run(self, a, b):
result = a + b
return result
adder = AdderTask()
adder.delay(9, 34)
"""
task_name = None
def __init__(self):
if not self.task_name:
raise ValueError("task_name should be set")
self.broker = Broker()
@abc.abstractmethod # abstractmethod 派生类必须重写实现逻辑
def run(self, *args, **kwargs):
raise NotImplementedError("Task `run` method must be implemented.")
def update_state(self, task_id, state, meta={}):
_task = {"state": state, "meta": meta}
serialized_task = json.dumps(_task)
backend = Backend()
backend.enqueue(queue_name=task_id, item=serialized_task)
print(f"task info: {task_id} succesfully queued")
def delay(self, *args, **kwargs):
try:
self.task_id = str(uuid.uuid4())
_task = {"task_id": self.task_id, "args": args, "kwargs": kwargs}
serialized_task = json.dumps(_task)
self.broker.enqueue(queue_name=self.task_name, item=serialized_task)
print(f"task: {self.task_id} succesfully queued")
except Exception:
raise Exception("Unable to publish task to the broker.")
return self.task_id
def async_result(task_id):
backend = Backend()
_dequeued_item = backend.dequeue(queue_name=task_id)
dequeued_item = json.loads(_dequeued_item)
state = dequeued_item["state"]
meta = dequeued_item["meta"]
class Info():
def __init__(self, state, meta):
self.state = state
self.meta = meta
info = Info(state, meta)
return info
上述代码中,定义了 BaseTask 类,它继承自 python 的 abc.ABC 成为一个抽象基类,其中一开始便要求必须定义 task_name,这是因为后面我们需要通过 task_name 去找对应的任务队列。
BaseTask 类的 run() 方法被 abc.abstractmethod 装饰,该装饰器会要求 BaseTask 的派生类必须重写 run() 方法,这是为了让使用者可以自定义自己的任务逻辑。
BaseTask 类的 update_state() 方法用于更新任务的状态,其逻辑很简单,先将参数进行 JSON 序列化,然后调用 Backend 的 enqueue() 方法将数据存入,这里的 Backend 其实是 Redis 实例,enqueue() 方法会将数据写入 Redis 的 list 中,需要注意,这里 list 的 key 为 task_id,即当前任务的 id。
BaseTask 类的 delay() 方法用于异步执行任务,首先通过 uuid 为任务创建一个唯一 id,然后将方法的参数通过 JSON 序列化,然后调用 Broker 的 enqueue() 将数据存入,这里的 Broker 其实也是一个 Redis 实例,enqueue() 方法同样是将数据写入到 Redis 的 list 中,只是 list 的 key 为 task_name,即当前任务的名称。
此外还实现了 async_result() 方法,该方法用于异步获取任务的数据,通过该方法可以获得任务的执行结果,或任务执行中的各种数据,数据的结构是有简单约定的,必须要有 state 表示当然任务的状态,必须要有 meta 表示当前任务的一些信息。
# 实现 Broker 与 Backend
在 task.py 中使用了 Broker 与 Backend,那接着就来实现一下这两个,先实现 Broker。
import redis
class Broker:
"""
use redis as our broker.
This implements a basic FIFO queue using redis.
"""
def __init__(self):
host = "localhost"
port = 6379
password = None
self.redis_instance = redis.StrictRedis(
host=host, port=port, password=password, db=0, socket_timeout=8.0
)
def enqueue(self, item, queue_name):
self.redis_instance.lpush(queue_name, item)
def dequeue(self, queue_name):
dequed_item = self.redis_instance.brpop(queue_name, timeout=3)
if not dequed_item:
return None
dequed_item = dequed_item[1]
return dequed_item
没什么可讲的,就是定了两个方法用于数据的存储与读取,存储使用 lpush 方法,它会将数据从左边插入到 Redis 的 list 中,读取数据使用 brpop 方法,它会从 list 的右边取出第一个元素,返回该元素值并从 list 删除,左进右出就构成了一个队列。
为了简便,Backend 的代码与 Broker 一模一样,只是用来存储任务的信息而已,代码就不贴了。
# 后台任务执行进程 Worker
接着来实现后台任务执行进程 Worker
import json
class Worker:
"""
Example Usage:
task = AdderTask()
worker = Worker(task=task)
worker.start()
"""
def __init__(self, task) -> None:
self.task = task
def start(self,):
while True:
try:
_dequeued_item = self.task.broker.dequeue(queue_name=self.task.task_name)
dequeued_item = json.loads(_dequeued_item)
task_id = dequeued_item["task_id"]
task_args = dequeued_item["args"]
task_kwargs = dequeued_item["kwargs"]
task_kwargs['task_id'] = task_id
self.task.run(*task_args, **task_kwargs)
print("succesful run of task: {0}".format(task_id))
except Exception:
print("Unable to execute task.")
continue
上述代码中,定义了 Worker 类,Worker 类在初始化时需要指定具体的任务实例,然后从 broker 中获取任务相关的数据,接着调用其中的 run() 方法完成任务的执行,比较简单。
# 使用玩具 Celery
玩具 Celery 的关键结构都定义好了,接着就来使用一下它,这里依旧会使用「Python Web:Flask 异步执行任务」文章中的部分代码,如前端代码,这里也不再讨论其前端代码,没有阅读可以先阅读一下,方便理解下面的内容。
首先定义出一个耗时任务
class LongTask(BaseTask):
task_name = "LongTask"
def run(self, task_id):
"""Background task that runs a long function with progress reports."""
verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
message = ''
total = random.randint(10, 50)
for i in range(total):
if not message or random.random() < 0.25:
message = '{0} {1} {2}...'.format(random.choice(verb),
random.choice(adjective),
random.choice(noun))
self.update_state(task_id=task_id, state='PROGRESS',
meta={'current': i, 'total': total,
'status': message})
time.sleep(1)
self.update_state(task_id=task_id, state='FINISH', meta={'current':100, 'total': 100,'status': 'Task completed!', 'result':32})
return
每个耗时任务都要继承在 BaseTask 并且重写其 run() 方法,run() 方法中的逻辑就是当前这个耗时任务要处理的具体逻辑。
这里逻辑其实很简单,就是随机的从几个列表中抽取词汇而已。
在 for 迭代中,想要前端知道当前任务 for 迭代的具体情况,就需要将相应的数据通过 BaseTask 的 update_state() 方法将其更新到 backend 中,使用 task_id 作为 Redis 中 list 的 key。
当逻辑全部执行完后,将状态为 FINISH 的信息存入 backend 中。
写一个接口来触发这个耗时任务
@app.route('/longtask', methods=['POST'])
def longtask():
long_task = LongTask()
task_id = long_task.delay()
return jsonify({}), 202, {'Location': url_for('taskstatus',
task_id=task_id)}
逻辑非常简单,实例化 LongTask(),并调用其中的 delay() 方法,该方法会将当前任务存入认为队列中,当前的请求会将当前任务的 task_id 通过响应包头的中的 taskstatus 字段传递给前端。
前端获取到后,就可以通过 task_id 去获取当前任务执行状态等信息,从而实现前端的可视化。
接着定义相应的接口来获取当前任务的信息,调用用 async_result() 方法,将 task_id 传入则可。
# app.py
@app.route('/status/<task_id>')
def taskstatus(task_id):
info = async_result(task_id)
print(info)
if info.state == 'PENDING':
response = {
'state': info.state,
'current': 0,
'total': 1,
'status': 'Pending...'
}
elif info.state != 'FAILURE':
response = {
'state': info.state,
'current': info.meta.get('current', 0),
'total': info.meta.get('total', 1),
'status': info.meta.get('status', '')
}
if 'result' in info.meta:
response['result'] = info.meta['result']
else:
# something went wrong in the background job
response = {
'state': info.state,
'current': 1,
'total': 1,
'status': str(info.meta), # this is the exception raised
}
return jsonify(response)
最后,需要定义一个启动后台任务执行进程的逻辑
from worker import Worker
from app import LongTask
if __name__ == "__main__":
long_task = LongTask()
worker = Worker(task=long_task)
worker.start()
至此,整体结构就构建完了,使用一下。
首先运行 redis。
redis-server
然后运行 Flask。
python app.py
最后启动一下后台任务执行进程,它相当于 Celery 的celery -A xxx worker --loglevel=info
命令。
python run_worker.py
同时执行多个任务,效果如下
对应的一些打印如下:
python run_worker.py
Unable to execute task.
Unable to execute task.
Unable to execute task.
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
python app.py
* Serving Flask app "app" (lazy loading)
* Environment: production
WARNING: Do not use the development server in a production environment.
Use a production WSGI server instead.
* Debug mode: on
* Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
* Restarting with stat
* Debugger is active!
* Debugger PIN: 145-285-706
127.0.0.1 - - [25/Sep/2019 11:14:07] "GET / HTTP/1.1" 200 -
task: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
127.0.0.1 - - [25/Sep/2019 11:14:11] "POST /longtask HTTP/1.1" 202 -
<task.async_result.<locals>.Info object at 0x107f50780>
127.0.0.1 - - [25/Sep/2019 11:14:11] "GET /status/3c7cd8ac-7482-467b-b17c-dba2649b70ee HTTP/1.1" 200 -
<task.async_result.<locals>.Info object at 0x107f50a20>
127.0.0.1 - - [25/Sep/2019 11:14:13] "GET /status/3c7cd8ac-7482-467b-b17c-dba2649b70ee HTTP/1.1" 200 -
# 尾
需要注意一些,上面的代码中,使用 Worker 需要实例化具体的任务,此时任务实例与 app.py 中通过接口创建的任务实例是不同的,Worker 利用不同的实例,使用相同的参数,从而实现执行效果相同的目的。
代码已上传 Githu:github.com/ayuLiao/toy…
如果你觉得文章有帮助,请按一下右下角的「在看」小星星,那是可以按的,谢谢。 https://juejin.cn/post/6844903957312045064 https://juejin.cn/post/6844903957312045064