Celery Next Steps
在应用中使用Celery 项目布局 (.venv) <HOME_DIR>/code/distr_q
.
├── __init__.py
└── celery_next
├── __init__.py
├── app.py
├── client.py
└── tasks.py
app.py worker app任务执行端, 指定需要运行的 tasks 模块, 指明 broker/backend
from celery import Celery
app = Celery(
'tasks' ,
broker='amqp://admin:admin@localhost/cc' ,
backend='rpc://admin:admin@localhost/cc' ,
include=['distr_q.celery_next.tasks' ]
)
app.conf.update(
result_expires=3600 ,
)
if __name__ == '__main__' :
app.start()
tasks.py 任务模块内容, 具体任务
import os
import sys
app_root = '/' .join(os.path.abspath(__file__).split('/' )[:-3 ])
sys.path.append(app_root)
from distr_q.celery_next.app import app
@app.task
def add (x, y) :
return x + y
@app.task
def mul (x, y) :
return x * y
@app.task
def xsum (numbers) :
return sum(numbers)
client.py 任务发起端
import os
import sys
app_root = '/' .join(os.path.abspath(__file__).split('/' )[:-3 ])
sys.path.append(app_root)
from distr_q.celery_next import tasks
if __name__ == '__main__' :
tasks.add.apply_async((2 , 2 ), queue='lopri' , countdown=10 )
启动 worker
celery -A celery_next.app worker -l info
celery -A celery_next.app worker -l info -Q lopri
停止 worker
Control-c
后台运行 主要依据celery multi命令来在后台启动一个或者多个workers
celery multi start w1 -A celery_next.app worker -l info
stop: 是异步执行的, 并不等待worker关闭, 会直接退出
stopwait: 同步等待所有执行结束才退出
注意celery multi doesn’t store information about workers so you need to use the same command-line arguments when restarting. Only the same pidfile and logfile arguments must be used when stopping.
celery不会存储workers信息, 所以在重启的时候,你必须提供相同的参数. 在停止的时候, 只有 pidfile和logfile参数必须得提供.
默认情况下会在当前目录创建pid,log文件, 为了防止多个workers相互覆盖, 推荐指定独立的目录
mkdir -p /var/run/celery
mkdir -p /var/log /celery
celery multi start w1 -A proj -l info \
--pidfile=/var/run/celery/%n.pid \
--logfile=/var/log /celery/%n%I.log
可以在一条命令中指定多个 workers
celery multi start 10 -A proj -l info\
-Q:1-3 images,video \
-Q:4,5 data \
-Q default -L:4,5 debug
--app参数--app指定了使用哪一个Celery app实例, 格式为 module.path:attribute
当前也同样支持只指定 package name, 会以如下顺序进行识别
参数 —-app=proj
查找 proj.app
查找 proj.celery
查找 模块proj 中值为Celery Application的属性
如果都没有, 会向子模块 proj.celery 中查找
proj.celery.app
proj.celery.celery
任意值为Celery application的属性
调用任务 delay 是apply_async的简写
add.delay(2 , 2 )
add.apply_async((2 , 2 ))
apply_async apply_async支持提供执行参数, 例如: 倒计时(countdown), 接受任务的队列queue等
add.apply_async((2, 2), queue='teenager' , countdown=10)
如果直接运行会在当前进程运行, 并不会发送消息到worker
作为celery的调用api, delay(), apply_async() 和 applying(__call__)同样也用于signature
delay, apply_async会返回AsyncResult实例, 依据此来查看执行结果或者异常原因.
由于没有一个后台backend可以适用任何情况, 因此你需要手动指定一个后台, 如果你配置好了后台, 你可以获取任务的返回信息
>>> res = add.delay(2, 2)
>>> res.get(timeout=1)
4
>>> res.id
>>> res = add.delay(2)
>>> res.get(timeout=1)
Traceback (most recent call last):
File "<stdin>" , line 1, in <module>
File "/opt/devel/celery/celery/result.py" , line 113, in get
interval=interval)
File "/opt/devel/celery/celery/backends/rpc.py" , line 138, in wait_for
raise meta['result' ]
TypeError: add() takes exactly 2 arguments (1 given)
>>> res.get(propagate=False)
TypeError('add() takes exactly 2 arguments (1 given)' ,)
>>> res.failed()
True
>>> res.successful()
False
>>> res.state
'FAILURE'
任务有多个状态, 但同一时刻只有一个状态, 典型状态变化如下:
PENDING -> STARTED -> SUCCESS
STARTED状态只有在配置了task_track_started或者使用@task(track_started=True)时, 才会被记录
PENDING状态并非是记录的状态, 而是任何一个未知id的默认状态.
>>> from proj.celery import app
>>> res = app.AsyncResult('this-id-does-not-exist' )
>>> res.state
'PENDING'
Canvas: 定义工作流 通常情况下, 可以使用delay方法来执行任务. 不过如果你需要把任务签名传递给其他进程或者作为参数传递给其他函数, 那么你就需要使用signatures签名方法了
signature可以简单理解为python偏函数
>>> add.signature((2 , 2 ), countdown=10 )
tasks.add(2 , 2 )
>>> add.s(2 , 2 )
tasks.add(2 , 2 )
调用API signature同样支持调用api
>>> s1 = add.s(2, 2)
>>> res = s1.delay()
>>> res.get()
4
>>> s2 = add.s(2)
>>> res = s2.delay(1)
>>> res.get()
3
>>> s3 = add.s(2, 2, debug=True)
>>> s3.delay(debug=False)
原语 group
chain
chord
map
starmap
chunks
Groups group可以并发调用多个任务, 并按照任务顺序返回每个任务的值.
>>> from celery import group
>>> from proj.tasks import add
>>> group(add.s(i, i) for i in range(10 ))().get()
[0 , 2 , 4 , 6 , 8 , 10 , 12 , 14 , 16 , 18 ]
>>> g = group(add.s(i) for in range(10 ))
>>> g(10 ).get()
[10 , 11 , 12 , 13 , 14 , 15 , 16 , 17 , 18 , 19 ]
Chains 可以使用chain来将多个功能链接
>>> from celery import chain
>>> from proj.tasks import add, mul
>>> chain(add.s(4 , 4 ) | mul.s(8 ))().get()
64
>>> g = chain(add.s(4 ) | mul.s(8 ))
>>> g(4 ).get()
64
>>> (add.s(4 , 4 ) | mul.s(8 ))().get()
64
Chords
翻译 <直: 弦>,<意: 协作回调>
>>> from celery import chord
>>> from proj.tasks import add, xsum
>>> chord((add.s(i, i) for i in range(10 )), xsum.s())().get()
90
链接到其他任务的组group会自动转换成chord
>>> (group(add.s(i, i) for in range(10 )) | xsum.s())().get()
90
所有的原语都可以任意组合
download.s(picture_urls) | group(apply_filter.s() for filter in filters)
路由 Celery支持所有的AMQP路由功能, 同样也可以使用简单的路由来把消息发送到指定的队列中
配置项task_routes, 让你可以在同一位置使用名字来实现路由任务
app.conf.update(
task_routes = {
'proj.tasks.add' : {'queue' : 'hipri' },
},
)
当然你可以在运行时, 覆盖掉配置选项
>>> from proj.tasks import add
>>> add.apply_async((2 , 2 ), queue='hipri' )
通过指定queue名字来消费指定队列 celery worker -Q <queue-name>
$ celery -A proj worker -Q hipri
$ celery -A proj worker -Q hipri,celery
远程控制 如果使用了 RabbitMQ, Redis, Qpid 作为broker, 就可以控制和监测worker
查看workers的当前任务
$ celery -A proj inspect active
这是通过广播方式实现的, 所有cluster下的所有worker都会收到指令
可以通过指定 --destination 来实现只监听某个/些workers
$ celery -A proj inspect active --destination=celery@exaple.com
查看更多信息
时区 默认情况使用UTC时区, 可以通过 app.conf.timezone = 'Asia/Shanghai' 来修改
优化 默认情况下并无优化, 只是简单的在大量短任务和少数耗时任务切换, 只是在吞吐和公平调度的折衷
可以查看优化向导 来获取更多信息
同样, 如果你使用了RabbitMQ, 则可以使用 librabbitmq 模块(c语言实现)
$ pip install librabbitmq
更多