Jeśli jesteś właścicielem tej strony, możesz wyłączyć reklamę poniżej zmieniając pakiet na PRO lub VIP w panelu naszego hostingu już od 4zł!

Using Celery as middleware in SOA

Using Celery as middleware in SOA

When creating a service orientated architecture, one of the most important decisions to make is usually what protocol to use for inter service communication. Let’s say an architecture consists of two layers:

  • edge
  • application

The edge is just a thin publicly accessible HTTP layer exposing RESTful endpoints. This is usually just a server with Nginx reverse proxy (or multiple servers behind a load balancer) and something to route URL requests and make calls to services that handle all business logic.

The application layer contains all business logic. It usually consists of several services (login service, wallet service, payment service etc) deployed in a VPC (virtual private cloud).

The question is, how should the edge communicate with different services? And how should the services communicate between each other?

A simple and very common solution is for the services to expose their own RESTful APIs so the edge server can trigger service calls via standard HTTP.

One of the drawbacks of this solution is that the edge server and all application servers now need to know additional configuration (DNS, IP addresses etc). Also, the services need to be deployed behind their own load balancers in order to scale and they need to run their own web server (Nginx and an application server, something like UWSGI or Green Unicorn).

I prefer to use AMQP protocol instead. This solution solves the above mentioned problems. For example, I am using RabbitMQ and Celery in my latest project as a middleware. I have two RabbitMQ clusters:

  • one for synchronous blocking tasks
  • one for asynchronous non blocking tasks

All my services are running as Celery deamons on application servers. There is no need for complex configuration (only URL of the message broker is needed) and no need to run a web server. Also, load balancers are no longer needed anymore as RabbitMQ cluster uses round robin to distribute messages to available workers.

Edge server only publishes messages to the correct queue. To make this work as expected, I wrote a clever routing Celery configuration which routes tasks to correct queues.

I am using a separate exchange for each service. Each service is running as two Celery deamons (sync and async) to group synchronous and asynchronous tasks together. Here is how it’s done:

from celery import Celery
import re

class Router(object):
    def route_for_task(self, task, args=None, kwargs=None):
        parts = task.split('.')
        if re.match(r'^mp[a-z_]+.sync.[a-z_]+$', task) is not None:
            return {
                'routing_key': task,
                'queue': parts[0] + '.sync',
        elif re.match(r'^mp[a-z_]+.async.[a-z_]+$', task) is not None:
            return {
                'routing_key': task,
                'queue': parts[0] + '.async',
        return None

def _get_celery_queues():
    services = [

    queues = {}
    for service in services:
        queues[service + '.sync'] = {
            'binding_key': service + '.sync.#',
            'exchange': service,
            'exchange_type': 'topic',
            'delivery_mode': 1, # transient messages, not written to disk
        queues[service + '.async'] = {
            'binding_key': service + '.async.#',
            'exchange': service,
            'exchange_type': 'topic'

    return queues

class CeleryConfig(object):
    CELERY_ROUTES = (Router(),)

    #: Only add pickle to this list if your broker is secured
    #: from unwanted access (see userguide/security.html)
    CELERY_ACCEPT_CONTENT = ['pickle', 'json']
    CELERY_BACKEND = 'amqp'

    # Replicate queues to all nodes in the cluster


    CELERY_QUEUES = _get_celery_queues()

def celery_apps_factory(app_type, sync_broker_url, async_broker_url, service_name):
    protocol = 'pyamqp' if app_type == 'SUBSCRIBER' else 'librabbitmq'

    broker_url_sync = protocol + '://' + sync_broker_url
    broker_url_async = protocol + '://' + async_broker_url

    sync_app = Celery(service_name + '.sync_app', broker=broker_url_sync)

    async_app = Celery(service_name + '.async_app', broker=broker_url_async)
    async_app.conf.CELERY_IGNORE_RESULT = True

    return sync_app, async_app

In the routing configuration above the SOA platform would have a common prefix mp (mp = my platform…. just an example). Every service would therefor be prefixed with mp (e.g. mplogin would be name for the login service).

A task name would consist of service name (the same as exchange name), “sync” or “async” word and a task name – all separated by full stop. For example:

  • mplogin.sync.register

Is a task to register a new user. If you split it by commas, you can say that:

  • mplogin is the name of the exchange
  • mplogin.sync is the name of the queue
  • register is the name of the task (service method)

Here would be an example definition of the register task then:

sync_app, async_app = celery_apps_factory(

def register(user_obj):
    response = user_service.register(user_obj)

To register a new user, you could then call the task from the edge server:

login_sync_app, login_async_app = celery_apps_factory(

        'user_obj': json_obj,

I hope at least somebody will find this useful :)


var d = new Date();
r = escape(d.getTime()*Math.random());

Posted Maj 13th, 2014 in Zend and PHP.

Leave a response: