Celery Next Step

Celery Next Steps

在应用中使用Celery

项目布局

(.venv) <HOME_DIR>/code/distr_q
.
├── __init__.py
└── celery_next
├── __init__.py
├── app.py
├── client.py
└── tasks.py

app.py

worker app任务执行端, 指定需要运行的 tasks 模块, 指明 broker/backend

from celery import Celery
app = Celery(
'tasks', # app 名字
broker='amqp://admin:admin@localhost/cc',
backend='rpc://admin:admin@localhost/cc',
include=['distr_q.celery_next.tasks'] # tasks 模块文件
)
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()

tasks.py

任务模块内容, 具体任务

import os
import sys
app_root = '/'.join(os.path.abspath(__file__).split('/')[:-3])
sys.path.append(app_root)
from distr_q.celery_next.app import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)

client.py

任务发起端

import os
import sys
app_root = '/'.join(os.path.abspath(__file__).split('/')[:-3])
sys.path.append(app_root)
from distr_q.celery_next import tasks
if __name__ == '__main__':
tasks.add.apply_async((2, 2), queue='lopri', countdown=10)

启动 worker

# 不指定queue, 默认指向 queue-> celery
celery -A celery_next.app worker -l info
# 指定queue lopri
celery -A celery_next.app worker -l info -Q lopri

停止 worker

Control-c

后台运行

主要依据celery multi命令来在后台启动一个或者多个workers

# celery multi <CMD> w1 -A celery_next.app worker -l info
# CMD in [start, restart, stop, stopwait]
celery multi start w1 -A celery_next.app worker -l info
  • stop: 是异步执行的, 并不等待worker关闭, 会直接退出
  • stopwait: 同步等待所有执行结束才退出

注意

celery multi doesn’t store information about workers so you need to use the same command-line arguments when restarting. Only the same pidfile and logfile arguments must be used when stopping.

celery不会存储workers信息, 所以在重启的时候,你必须提供相同的参数. 在停止的时候, 只有 pidfile和logfile参数必须得提供.

默认情况下会在当前目录创建pid,log文件, 为了防止多个workers相互覆盖, 推荐指定独立的目录

mkdir -p /var/run/celery
mkdir -p /var/log/celery
celery multi start w1 -A proj -l info \
--pidfile=/var/run/celery/%n.pid \
--logfile=/var/log/celery/%n%I.log

可以在一条命令中指定多个 workers

celery multi start 10 -A proj -l info\
-Q:1-3 images,video \
-Q:4,5 data \
-Q default -L:4,5 debug

--app参数

--app指定了使用哪一个Celery app实例, 格式为 module.path:attribute

当前也同样支持只指定 package name, 会以如下顺序进行识别

参数 —-app=proj

  • 查找 proj.app
  • 查找 proj.celery
  • 查找 模块proj 中值为Celery Application的属性

如果都没有, 会向子模块 proj.celery 中查找

  • proj.celery.app
  • proj.celery.celery
  • 任意值为Celery application的属性

调用任务

delay

apply_async的简写

add.delay(2, 2)
# 等同于
add.apply_async((2, 2))

apply_async

apply_async支持提供执行参数, 例如: 倒计时(countdown), 接受任务的队列queue

# 发送任务到 teenager, 并于10s后运行
add.apply_async((2, 2), queue='teenager', countdown=10)

如果直接运行会在当前进程运行, 并不会发送消息到worker

>>> add(2, 2)
4

作为celery的调用api, delay(), apply_async() 和 applying(__call__)同样也用于signature

delay, apply_async会返回AsyncResult实例, 依据此来查看执行结果或者异常原因.

由于没有一个后台backend可以适用任何情况, 因此你需要手动指定一个后台, 如果你配置好了后台, 你可以获取任务的返回信息

>>> res = add.delay(2, 2)
# 查看结果返回
>>> res.get(timeout=1)
4
# 查看 id
>>> res.id
# 查看错误信息
>>> res = add.delay(2)
>>> res.get(timeout=1)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/devel/celery/celery/result.py", line 113, in get
interval=interval)
File "/opt/devel/celery/celery/backends/rpc.py", line 138, in wait_for
raise meta['result']
TypeError: add() takes exactly 2 arguments (1 given)
# 同样你可以禁止错误抛出, 这样只获取错误信息, 而不中断程序运行
>>> res.get(propagate=False)
TypeError('add() takes exactly 2 arguments (1 given)',)
# 查看是否运行成功
>>> res.failed()
True
>>> res.successful()
False
# 只查看运行状态
>>> res.state
'FAILURE'

任务有多个状态, 但同一时刻只有一个状态, 典型状态变化如下:

PENDING -> STARTED -> SUCCESS

STARTED状态只有在配置了task_track_started或者使用@task(track_started=True)时, 才会被记录

PENDING状态并非是记录的状态, 而是任何一个未知id的默认状态.

>>> from proj.celery import app
>>> res = app.AsyncResult('this-id-does-not-exist')
>>> res.state
'PENDING'

Canvas: 定义工作流

通常情况下, 可以使用delay方法来执行任务. 不过如果你需要把任务签名传递给其他进程或者作为参数传递给其他函数, 那么你就需要使用signatures签名方法

signature可以简单理解为python偏函数

>>> add.signature((2, 2), countdown=10)
tasks.add(2, 2)
# 简便写法
>>> add.s(2, 2)
tasks.add(2, 2)

调用API

signature同样支持调用api

# 完全参数
>>> s1 = add.s(2, 2)
>>> res = s1.delay()
>>> res.get()
4
# 部分参数, 类似于python的偏函数
>>> s2 = add.s(2)
>>> res = s2.delay(1)
>>> res.get()
3
# 已设定的参数, 可以在调用时覆盖原有参数
>>> s3 = add.s(2, 2, debug=True)
>>> s3.delay(debug=False)

原语

group
chain
chord
map
starmap
chunks

Groups

group可以并发调用多个任务, 并按照任务顺序返回每个任务的值.

>>> from celery import group
>>> from proj.tasks import add
>>> group(add.s(i, i) for i in range(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
# 偏函数
>>> g = group(add.s(i) for in range(10))
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

Chains

可以使用chain来将多个功能链接

>>> from celery import chain
>>> from proj.tasks import add, mul
# (4 + 4) * 8, 可以使用管道操作符 |
>>> chain(add.s(4, 4) | mul.s(8))().get()
64
# 偏函数 (? + 4) * 8
>>> g = chain(add.s(4) | mul.s(8))
>>> g(4).get()
64
# 不使用 chain 关键字
>>> (add.s(4, 4) | mul.s(8))().get()
64

Chords

翻译 <直: 弦>,<意: 协作回调>

>>> from celery import chord
>>> from proj.tasks import add, xsum
>>> chord((add.s(i, i) for i in range(10)), xsum.s())().get()
90

链接到其他任务的组group会自动转换成chord

>>> (group(add.s(i, i) for in range(10)) | xsum.s())().get()
90

所有的原语都可以任意组合

# 下载照片, 并过滤符合条件的结果
download.s(picture_urls) | group(apply_filter.s() for filter in filters)

路由

Celery支持所有的AMQP路由功能, 同样也可以使用简单的路由来把消息发送到指定的队列中

配置项task_routes, 让你可以在同一位置使用名字来实现路由任务

app.conf.update(
task_routes = {
'proj.tasks.add': {'queue': 'hipri'},
},
)

当然你可以在运行时, 覆盖掉配置选项

>>> from proj.tasks import add
>>> add.apply_async((2, 2), queue='hipri')

通过指定queue名字来消费指定队列 celery worker -Q <queue-name>

# 消费单个队列
$ celery -A proj worker -Q hipri
# 同时消费多个队列, 使用,分隔即可, 与顺序无关, 会自动调整权重
# celery 是默认的队列名
$ celery -A proj worker -Q hipri,celery

远程控制

如果使用了 RabbitMQ, Redis, Qpid 作为broker, 就可以控制和监测worker

查看workers的当前任务

$ celery -A proj inspect active

这是通过广播方式实现的, 所有cluster下的所有worker都会收到指令

可以通过指定 --destination 来实现只监听某个/些workers

$ celery -A proj inspect active --destination=celery@exaple.com

查看更多信息

时区

默认情况使用UTC时区, 可以通过 app.conf.timezone = 'Asia/Shanghai' 来修改

优化

默认情况下并无优化, 只是简单的在大量短任务少数耗时任务切换, 只是在吞吐和公平调度的折衷

可以查看优化向导来获取更多信息

同样, 如果你使用了RabbitMQ, 则可以使用 librabbitmq 模块(c语言实现)

$ pip install librabbitmq

更多

文章目录
  1. Celery Next Steps
    1. 在应用中使用Celery
      1. 项目布局
        1. app.py
        2. tasks.py
        3. client.py
      2. 启动 worker
      3. 停止 worker
      4. 后台运行
      5. 注意
      6. --app参数
    2. 调用任务
      1. delay
      2. apply_async
    3. Canvas: 定义工作流
      1. 调用API
      2. 原语
        1. Groups
        2. Chains
        3. Chords
    4. 路由
    5. 远程控制
    6. 时区
    7. 优化
    8. 更多