Celery: First Step
Celery是一个功能完备的任务队列, 可以方便的与其他语言集成, 可以放心在生成环境中使用.
- 选择安装一个消息队列
- 安装Celery并创建任务
- 启动
worker调用tasks
- 追踪监视任务的不同状态,监测返回信息
参考链接
选择Broker
Celery需要接收和发送消息,通常情况下使用独立的消息队列
RabbitMQ
docker run \ -tid \ --hostname celery \ --name rabbit \ -p 15672:15672 \ -p 5672:5672 \ -p 5671:5671 \ -p 15671:15671 \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=admin \ -e RABBITMQ_ERLANG_COOKIE='youcanneverseethis' \ -v <YOUR_HOME_DIR>/rabbitmq/:/var/lib/rabbitmq \ rabbitmq:3-management
|
Redis
可以使用, 但是在异常情况下可能会导致数据丢失
安装Celery
程序
tasks.py
from celery import Celery app = Celery('tasks', backend='redis://:123456@localhost:6380/0', broker='amqp://admin:admin@localhost:5672') @app.task def add(x, y): return x + y @app.task def d0(x, y): time.sleep(100) return x / y if __name__ == '__main__': for i in range(5): res = d0.delay(16, 0) print(res.ready())
|
运行Celery worker 服务端
celery -A tasks worker --loglevel=info
|
执行任务
task端
可以使用 delay() 来调用方法, 这是简便的apply_async方法
- 返回的是
AsyncResult 实例
- 可以检测任务状态, 等待任务结束, 获取返回值
- 如果失败可以获取失败详情
>>> from tasks import add >>> add.delay(16, 4) python3 tasks.py
|
worker log输出
[2017-12-19 18:31:34,401: INFO/MainProcess] Received task: tasks.add[16ae3321-a3e1-4498-9c92-cd1680d82fae] [2017-12-19 18:31:34,443: INFO/ForkPoolWorker-2] Task tasks.add[16ae3321-a3e1-4498-9c92-cd1680d82fae] succeeded in 0.03918749799777288s: 20
|
保存结果
使用RPC
app = Celery('tasks', backend='rpc://admin:admin@localhost/cc', broker='amqp://admin:admin@localhost:5672/cc')
|
使用redis
app = Celery('tasks', backend='redis://:123456@localhost:6380/0', broker='amqp://admin:admin@localhost:5672')
|
配置
Celery并不需要太多配置, 通常情况下, 它需要一个输入和输出, 输入需要连接到broker, 而输出可以连接到一个backend, 不过如果你需要进行更进一步的操作, 你可以使用配置功能
启用方法
实例app
单个赋值
app.conf.task_serializer = 'json'
|
多个
app.conf.update( task_serializer='json', accept_content=['json'], result_serializer='json', timezone='Europe/Oslo', enable_utc=True, )
|
模块
启用
app.config_from_object('celeryconfig')
|
celeryconfig.py
broker_url = 'pyamqp://' result_backend = 'rpc://' task_serializer = 'json' result_serializer = 'json' accept_content = ['json'] timezone = 'Europe/Oslo' enable_utc = True
|
验证配置参数 python3 -m celeryconfig
应用
task_routes = { 'tasks.add': 'low-priority', }
|
task_annotations = { 'tasks.add': {'rate_limit': '10/m'} }
|
- 当前如果你使用
RabbitMQ/Redis 作为broker, 你可以直接传递参数到任务
$ celery -A tasks control rate_limit tasks.add 10/m
|