Celery First Step

Celery: First Step

Celery是一个功能完备的任务队列, 可以方便的与其他语言集成, 可以放心在生成环境中使用.

  • 选择安装一个消息队列
  • 安装Celery并创建任务
  • 启动worker调用tasks
  • 追踪监视任务的不同状态,监测返回信息

参考链接

选择Broker

Celery需要接收和发送消息,通常情况下使用独立的消息队列

RabbitMQ

  • 安装
# 使用 docker 环境
docker pull rabbitmq
  • 配置
docker run \
-tid \
--hostname celery \
--name rabbit \
-p 15672:15672 \
-p 5672:5672 \
-p 5671:5671 \
-p 15671:15671 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
-e RABBITMQ_ERLANG_COOKIE='youcanneverseethis' \
-v <YOUR_HOME_DIR>/rabbitmq/:/var/lib/rabbitmq \
rabbitmq:3-management

Redis

可以使用, 但是在异常情况下可能会导致数据丢失

其他

安装Celery

pip install celery

程序

tasks.py

# 一个简单的任务, add, 并且返回两个数字的和
from celery import Celery
app = Celery('tasks',
backend='redis://:123456@localhost:6380/0',
broker='amqp://admin:admin@localhost:5672')
@app.task
def add(x, y):
return x + y
@app.task
def d0(x, y):
time.sleep(100)
return x / y
if __name__ == '__main__':
for i in range(5):
res = d0.delay(16, 0)
print(res.ready())

运行Celery worker 服务端

celery -A tasks worker --loglevel=info

执行任务

task端

可以使用 delay() 来调用方法, 这是简便的apply_async方法

  • 返回的是 AsyncResult 实例
  • 可以检测任务状态, 等待任务结束, 获取返回值
  • 如果失败可以获取失败详情
>>> from tasks import add
>>> add.delay(16, 4)
# 或者直接调用
python3 tasks.py

worker log输出

[2017-12-19 18:31:34,401: INFO/MainProcess] Received task: tasks.add[16ae3321-a3e1-4498-9c92-cd1680d82fae]
[2017-12-19 18:31:34,443: INFO/ForkPoolWorker-2] Task tasks.add[16ae3321-a3e1-4498-9c92-cd1680d82fae] succeeded in 0.03918749799777288s: 20

保存结果

使用RPC

app = Celery('tasks',
backend='rpc://admin:admin@localhost/cc',
broker='amqp://admin:admin@localhost:5672/cc')

使用redis

app = Celery('tasks',
backend='redis://:123456@localhost:6380/0',
broker='amqp://admin:admin@localhost:5672')

配置

Celery并不需要太多配置, 通常情况下, 它需要一个输入输出, 输入需要连接到broker, 而输出可以连接到一个backend, 不过如果你需要进行更进一步的操作, 你可以使用配置功能

启用方法

  • 直接在app中使用配置
  • 通过专用的配置模块

实例app

单个赋值

app.conf.task_serializer = 'json'

多个

app.conf.update(
task_serializer='json',
accept_content=['json'], # Ignore other content
result_serializer='json',
timezone='Europe/Oslo',
enable_utc=True,
)

模块

启用

# assume you have a `celeryconfig.py`
app.config_from_object('celeryconfig') # celeryconfig -> celeryconfig.py

celeryconfig.py

broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

验证配置参数 python3 -m celeryconfig

应用

  • 优先级
task_routes = {
'tasks.add': 'low-priority',
}
  • rate/运行速度
task_annotations = {
'tasks.add': {'rate_limit': '10/m'}
}
  • 当前如果你使用 RabbitMQ/Redis 作为broker, 你可以直接传递参数到任务
$ celery -A tasks control rate_limit tasks.add 10/m
文章目录
  1. Celery: First Step
    1. 参考链接
    2. 选择Broker
      1. RabbitMQ
      2. Redis
      3. 其他
    3. 安装Celery
    4. 程序
      1. tasks.py
    5. 运行Celery worker 服务端
    6. 执行任务
      1. task端
      2. worker log输出
    7. 保存结果
      1. 使用RPC
      2. 使用redis
    8. 配置
      1. 启用方法
      2. 实例app
      3. 模块
      4. 应用