Jeśli jesteś właścicielem tej strony, możesz wyłączyć reklamę poniżej zmieniając pakiet na PRO lub VIP w panelu naszego hostingu już od 4zł!

Celery workflows

Celery workflows

Celery is a great asynchronous task/job queue framework. It allows you you create distributed systems where tasks (execution units) are executed concurrently on multiple workers using multiprocessing. It also supports scheduling and scales really well since you can horizontally scale workers.

Celery is great at firing both synchronous and – which is one of its main strengths – asynchronous tasks such as email sending, processing of credit cards, writing transactions to a general ledger.

However, Celery offers much more. One of its most useful features is an ability to chain multiple tasks to create workflows.

Task Callbacks

Let’s create few simple tasks for demonstration purposes:

from celery import shared_task

def add(x, y):
    return x + y

def multiply(x, y):
    return x * y

def tsum(numbers):
    return sum(numbers)

A very simple example of linking two tasks would be:

add.apply_async((5, 5), link=add.s(35))

Which would result in:

(5 + 5) + 35

You can also define an error callback. Let’s create a simple error handling task:

def error_handler(uuid):
    result = AsyncResult(uuid)
    exc = result.get(propagate=False)
    print('Task {0} raised exception: {1!r}n{2!r}'.format(
          uuid, exc, result.traceback))

You could then write:

add.apply_async((5, 5), link_error=error_handler.s())

This is useful to send an email notifying of system error or for logging exceptions for later debugging.

Both callbacks and error callbacks can be expressed as a list:

add.apply_async((5, 5), link=[add.s(35), multiply.s(2)])

The result from the first task would then be passed to two callbacks so you would get:

(5 + 5) + 35


(5 + 5) * 2

If you don’t want to pass the result from first task to its callback, you can create an immutable callback. This can be useful when you have a piece of logic you want to execute after the task but do not need its return value.

add.apply_async((2, 2),, 4))

Next, let’s look at some more complex workflow primitives Celery offers.

The Primitives

First primitive I will show you is group. Groups are used when you want to execute any number of tasks in parallel.

from celery import group
result = group(add.s(i, i) for i in xrange(10))()

Would result in a list of results:

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Next primitive is a chain. Chain defines a set of tasks to be executed one after another in a synchronous matter.

result = (multiply.s(5, 5) | add.s(4) | multiply.s(8))()

Would give you equivalent of:

((5 * 5) + 4 ) * 8 = 29 * 8

Another very useful primitive is a chord. Chord let’s you define a header and a body. Header is a list of tasks to be executed in parallel, body is a callback to be executed after all tasks in the header have run. The callback in body will receive a list of arguments representing return values of all tasks in the header.

from celery import chord

result = chord((add.s(i, i) for i in xrange(10)), tsum.s())()

Would result in [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] being passed to tsum task which would add all numbers together giving 90 as a result, basically:

sum([0, 2, 4, 6, 8, 10, 12, 14, 16, 18])

There are couple more primitives. Map and starmap work similar to the built in Python map function.[range(10), range(100)])

Will result in:

[45, 4950]

Starmap allows you to send arguments as *args:

add.starmap(zip(range(10), range(10)))

Will result in:

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Chunks let you split a long list of arguments into subsets, resulting in a task called multiple times with a smaller chunk of arguments.

# array of 1000 tuple pairs [(1, 1), (2, 2), ..., (999, 999)]
items = zip(xrange(1000), xrange(1000))
add.chunks(items, 10)

This was a very short introduction to workflows in Celery. There is much more flexibility in defining workflows. I haven’t really properly touched error handling and lots of different options you have.

There are some limitations as well though. For instance, I chaining chords together is not possible as far as I know.

Finally, let me say that Celery is an essential part of every Python programmer’s repertoire. If you haven’t used it yet, you should definitely take a look.

It can be used from simple use cases such as asynchronous charging of credit cards and sending emails in the background to more sophisticated stuff like workflows or even as a middleware in service oriented architectures.


var d = new Date();
r = escape(d.getTime()*Math.random());

Posted Lipiec 17th, 2014 in Zend and PHP.

Leave a response: