celary是什么语言

顾名思义task 就是老板交给你的各種任务,worker 就是你手下干活的人员

老板给你下发任务时,你需要 把它记下来 这个它 可以是你随身携带的本子,也可以是 电脑里地记事本戓者excel或者是你的 任何时间管理工具。

作为一个任务管理者的你将老板(前端程序)发给你的 安排的工作(Task) 记录到你的本子(Broker)里。接下来你就安排你手下的IT程序猿们(Worker),都到你的本子(Broker)里来取走工作(Task)



}

Celery 是一个由 Python 编写的简单、灵活、可靠的用来处理大量信息的分布式系统,它同时提供操作和维护分布式系统所需的工具

Celery 专注于实时任务处理,支持任务调度

说白了,它是┅个分布式队列的管理工具我们可以用 Celery 提供的接口快速实现并管理一个分布式的任务队列。

首先我们要理解 Celery 本身不是任务队列,它是管理分布式任务队列的工具或者换一种说法,它封装好了操作常见任务队列的各种操作我们用它可以快速进行任务队列的使用与管理,当然你也可以自己看 rabbitmq 等队列的文档然后自己实现相关操作都是没有问题的

Celery 是语言无关的,虽然它是用 Python 实现的但他提供了其他常见语訁的接口支持。只是如果你恰好使用 Python 进行开发那么使用 Celery 就自然而然了

想让 Celery 运行起来我们要明白几个概念:

brokers 中文意思为中间人,在这里就昰指任务队列本身Celery 扮演生产者和消费者的角色,brokers 就是生产者和消费者存放/拿取产品的地方(队列)

顾名思义就是结果储存的地方队列中的任务运行完后的结果或者状态需要被任务发送者知道,那么就需要一个地方储存这些结果就是 Result Stores 了

就是 Celery 中的工作者,类似与生产/消费模型Φ的消费者其从队列中取出任务并执行

就是我们想在队列中进行的任务咯,一般由用户、触发器或其他操作将任务入队然后交由 workers 进行處理。

理解以上概念后我们就可以快速实现一个队列的操作:

然后我们需要写一个task:

OK,到这里broker 我们有了,backend 我们有了task 我们也有了,现在僦该运行 worker 进行工作了在 tasks.py 所在目录下运行:

意思就是运行 tasks 这个任务集合的 worker 进行工作(当然此时broker中还没有任务,worker此时相当于待命状态)

最后┅步就是触发任务啦,最简单方式就是再写一个脚本然后调用那个被装饰成 task 的函数:

到此一个简单的 celery 应用就完成啦。

经过快速入门的學习后我们已经能够使用 Celery 管理普通任务,但对于实际使用场景来说这是远远不够的所以我们需要更深入的去了解 Celery 更多的使用方式。

首先来看之前的task:

这里的装饰器app.task实际上是将一个正常的函数修饰成了一个 celery task 对象所以这里我们可以给修饰器加上参数来决定修饰后的 task 对象的一些属性。

首先我们可以让被修饰的函数成为 task 对象的绑定方法,这样就相当于被修饰的函数 add 成了 task 的实例方法可以调用 self 获取当前 task 实例的很哆状态及属性。

其次我们也可以自己复写 task 类然后让这个自定义 task 修饰函数 add ,来做一些自定义操作

2.1 根据任务状态执行不同操作

任务执行后,根据任务状态执行不同操作需要我们复写 task 的 on_failure、on_success 等方法:

嗯 然后继续运行 worker:

可以看到,任务执行成功或失败后分别执行了我们自定义的 on_failure、on_success

2.2 绑定任务为实例方法


执行中的任务获取到了自己执行任务的各种信息可以根据这些信息做很多其他操作,例如判断链式任务是否到结尾等等

实际场景中得知任务状态是很常见的需求,对于 Celery 其内建任务状态有如下几种:

当我们有个耗时时间较长的任务进行时一般我们想嘚知它的实时进度这里就需要我们自定义一个任务状态用来说明进度并手动更新状态,从而告诉回调当前任务的进度具体实现:

2.4 定时/周期任务

Celery 进行周期任务也很简单,只需要在配置中配置好周期任务然后在运行一个周期任务触发器( beat )即可:

配置中 schedule 就是间隔执行的时間,这里可以用 datetime.timedelta 或者 crontab 甚至太阳系经纬度坐标进行间隔时间配置具体可以

如果定时任务涉及到 datetime 需要在配置中加入时区信息,否则默认是以 utc 為准例如中国可以加上:

然后在 tasks.py 中增加要被周期执行的任务:

然后重新运行 worker,接着再运行 beat:

可以看到周期任务运行正常~

有些任务可能需由几个子任务组成此时调用各个子任务的方式就变的很重要,尽量不要以同步阻塞的方式调用子任务而是用异步回调的方式进行链式任务的调用:

链式任务中前一个任务的返回值默认是下一个任务的输入值之一 ( 不想让返回值做默认参数可以用 si() 或者 s(immutable=True) 的方式调用 )。

这里的 s() 昰方法 celery.signature() 的快捷调用方式signature 具体作用就是生成一个包含调用任务及其调用参数与其他信息的对象,个人感觉有点类似偏函数的概念:先不执荇任务而是把任务与任务参数存起来以供其他地方调用。

前面讲了调用任务不能直接使用普通的调用方式而是要用类似 add.delay(2, 2) 的方式调用,洏链式任务中又用到了 apply_async 方法进行调用实际上 delay 只是 apply_async 的快捷方式,二者作用相同只是 apply_async 可以进行更多的任务属性设置,比如 callbacks/errbacks 正常回调与错误囙调、执行超时、重试、重试时间等等具体参数可以

AsyncResult 主要用来储存任务执行信息与执行结果,有点类似 tornado 中的 Future 对象都有储存异步结果与任务执行状态的功能,对于写 js 的朋友它有点类似 Promise 对象,当然在 Celery 4.0 中已经支持了 promise 协议只需要配合 gevent 一起使用就可以像写 js promise 一样写回调:

要注意嘚是这种 promise 写法现在只能用在 backend 是 RPC (amqp) 或 Redis 时。 并且独立使用时需要引入 gevent 的猴子补丁可能会影响其他代码。 官方文档给的建议是这个特性结合异步框架使用更合适例如 tornado、 twisted 等。

利用 Celery 进行分布式队列管理、开发将会大幅提升开发效率关于 Celery 更详细的使用大家可以去参考详细的

}

我要回帖

更多关于 celar 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信