Sep 06

Celery logs through syslog

Reading time: 2 – 2 minutes

Celery logs are colorized by default, the first big idea is disable color logs. It’s as easy as setting ‘CELERYD_LOG_COLOR’ to ‘False’ in ‘celery.conf’. The code could be something like this:

celery.conf.update('CELERYD_LOG_COLOR' = False)

Secondly we need a function where we set up a new handler and other settings to celery logging system. For example, the code could be:

from __future__ import absolute_import
from logging import BASIC_FORMAT, Formatter
from logging.handlers import SysLogHandler
from celery.log import redirect_stdouts_to_logger

def setup_log(**args):
    # redirect stdout and stderr to logger
    redirect_stdouts_to_logger(args['logger'])
    # logs to local syslog
    hl = SysLogHandler('/dev/log')
    # setting log level
    hl.setLevel(args['loglevel'])
    # setting log format
    formatter = Formatter(BASIC_FORMAT)
    hl.setFormatter(formatter)
    # add new handler to logger
    args['logger'].addHandler(hl)

Pay attention to ‘redirect_stdouts_to_logger’ it’s used to send all outputs like print’s or something else to syslog.

Thirdly we want to use those settings in our celery tasks, then we have to connect ‘setup_log’ code to some celery signals. Those signals are launched when ‘task_logger’ and ‘logger’ are configured. To connect signals:

from celery.signals import after_setup_task_logger, after_setup_logger

after_setup_logger.connect(setup_log)
after_setup_task_logger.connect(setup_log)

Fourthly we have to get the ‘logger’, we can have more than one if we are interested in records with task context or without it. For example:

logger = get_logger('just_a_name_for_internal_use')
logger_with_task_context = get_task_logger('name_of_the_task_to_be_recorded_in_logs')

Finally we only have to use those loggers with common methods DEBUG, INFO, WARN, ERROR and CRITICAL:

@celery.task
def the_task():
    logger.info('this is a message without task context')
    logger_with_task_context.debug('this record will have the prefix "name_of_the_task_to_be_recorded_in_logs" in syslog')
Mar 23

Deep inside AMQP

This entry is part 3 of 4 in the series AMQP and RabbitMQ

Reading time: 5 – 8 minutes

In the next lines I’ll describe with more details the properties and features of AMQP elements. It won’t be an exhaustive description but in my opinion more than enough to start playing with AMQP queues.

Channels

When producers and consumers connects to the broker using a TCP socket after authenticating the connection they establish a channel where AMQP commands are sent. The channel is a virtual path inside a TCP connection between this is very useful because there can be multiple channels inside the TCP connection each channels is identified using an unique ID.

An interesting parameter of a channel is confirmation mode if this is set to true when messages delivered to a exchange finally gets their queues the producer receives an acknowledge message with an UID of the message. This kind of messages are asynchronous and permits to a producer send the next message when it is still waiting the ACK message. Of course if the message cannot be stored and it is lost the producer receives a NACK (not acknowledged) message.

Producers

Maybe this is the most simple part of the system. Producers only need to negotiate the authentication across a TCP connection create a channel and then publish all messages that want with its corresponding routing key. Of course, producers can create exchanges, queues and then bind them. But usually this is not a good idea is much more secure do this from consumers. Because when a producers try to send a message to a broker and doesn’t have the needed exchange then message will be lost. Usually consumers are connected all time and subscribed to queues and producers only connect to brokers when they need to send messages.

Consumers

When a consumer connects to a queue usually uses a command called basic.consume to subscribe the channel to a queue, then every time subscribed queue has a new message it is sent to consumer after last message is consumed, or rejected.

If consumer only want to receive one message without a subscription it can use the command basic.get.This is like a poll method. In fact, the consumer only gets a message each time it sends the command.

You can get the best throughput using basic.consume command because is more efficient than poll every time the consumer wants another message.

When more than one consumer was connected to a queue, messages are distributed in a round-robin. After the message is delivered to a consumer this send an acknowledge message and then queue send another message to next consumer. If the consumer sends a reject message the same message is sent to next consumer.

There are two types of acknowledgements:

  • basic.ack: this is the message that sends consumer to queue to acknowledge the reception of a message
  • auto_ack: this is a parameter we can set when consumer subscribes to a queue. The setting assumes ACK message from consumer and then queue sends next message without waiting the ACK message.

The message basic.reject is sent when the consumer wants to reject a received message. This message discards the message and it is lost. If we want to requeue the message we can set the parameter requeue=true when sent a reject message.

When the queue is created there can be a parameter called dead letter set to true, then consumer rejects a message with the parameter requeue=false the message is queued to a new queue called  dead letter. This is very useful because after all we can go tho that queue an inspect the message rejection reason.

Queues

Both consumers and producers can create a queue using queue.declare command. The most natural way is create queues from consumers and then bind it to an exchange. The consumers needs a free channel to create a queue, if a channel is subscribed to a queue, the channel is busy and cannot create new queues. When a queue is created usually we use a name to identify the queue, if the name is not specified it’s randomly generated. This is useful when create temporary and anonymous queues for RPC-over-AMQP.

Parameters we can set when create a new queue:

  • exclusive – this setting makes a queue private and is only accessible from your application. Only one consumer can connect to a queue.
  • auto-delete – when last consumer unsubscribes from queue the queue is removed.
  • passive – when create a queue that exists the server returns successfully or returns fail if parameters don’t match. If passive parameter is set and we create a queue that exists always returns success but if the queue doesn’t exist it is not created.
  • durable – the queue can persist when the services reboots.

Exchange and binding

In the first post of the serie we talked about different exchange types as you can remember these types are: direct, fanout and topic. And the most important parameter to set when producer sends a message is the routing key this is used to route the message to a queue.

Once we have declared an exchange this can be related with a queue using a binding command: queue_bind. The relation between them is made using the routing key or a pattern based in routing key. When exchange has type fanout the routing key or patterns are not needed.

Some pattern examples can be: log.*, message.* and #.

The most important exchange parameters are:

  • type: direct, fanout and topic.
  • durable: makes an exchange persistent to reboots.

Broker and virtual hosts

A broker is a container where exhanges, bindings and queues are created. Usually we can define more than one virtual brokers in the same server. Virtual brokers are also called virtual hosts. The users, permissions and something else related to a Broker cannot be used from another one. This is very useful because we can create multiple brokers in the same physical server like multi-domain web server and when some of this virtual hosts is too big it can be migrated to another physical server and it can be clustered if it is required.

Messages

An AMQP message is a binary without a fixed size and format. Each application can set it’s own messages. The AMQP broker only will add small headers to be routed among different queues as fast as possible.

Messages are not persistent inside a broker unless the producer sets the parameter persistent=true. In the other way the messages needs to be stored in durable exchanges and durable queues to persist in the broker when it is restarted. Of course when the messages are persistent these must be wrote to disk and the throughput will fall down. Then maybe sometimes create persistent messages is not a good idea.

 

 

Mar 15

What is AMQP? and the architecture

This entry is part 2 of 4 in the series AMQP and RabbitMQ

Reading time: 3 – 4 minutes

What is AMQP? (Advanced Message Queuing Protocol)

When two applications need to communicate there are a lot of solutions like IPC, if these applications are remote we can use RPC. When two or more applications communicate with each other we can use ESB. And there are many more solutions. But when more than two applications communicate and the systems need to be scalable the problem is a bit more complicated. In fact, when we need to send a call to a remote process or distribute object processing among different servers we start to think about queues.

Typical examples are rendering farms, massive mail sending, publish/subscriptions solutions like news systems. At that time we start to consider a queue-based solution. In my case the first approach to these types of solutions was Gearman; that is a very simple queue system where workers connect to a central service where producers have to call the methods published by workers; the messages are queued and delivered to workers in a simple queue.

Another interesting solution can be use Redis like a queue service using their features like publish/subscribe. Anyway always you can develop your own queue system. Maybe there a lot of solutions like that but when you are interested in develop in standard way and want a long-run solution with scalability and high availability then you need to think in use AMQP-based solutions.

The most simple definition of AMQP is: “message-oriented middleware”. Behind this simple definition there are a lot of features available. Before AMQP there was some message-oriented middlewares, for example, JMS. But AMQP is the standard protocol to keep when you choice a queue-based solution.

AMQP have features like queuing, routing, reliability and security. And most of the implementations of AMQP have a really scalable architectures and high availability solutions.

The architecture

The basic architecture is simple, there are a client applications called producers that create messages and deliver it to a AMQP server also called broker. Inside the broker the messages are routed and filtered until arrive to queues where another applications called consumers are connected and get the messages to be processed.

When we have understood this maybe is the time to deep inside the broker where there are AMQP magic. The broker has three parts:

  1. Exchange: where the producer applications delivers the messages,  messages have a routing key and exchange uses it to route messages.
  2. Queues: where messages are stored and then consumers get the messages from queues.
  3. Bindings: makes relations between exchanges and queues.

When exchange have a message uses their routing key and three different exchange methods to choose where the message goes:

    1. Direct Exchange:  routing key matches the queue name.
    2. Fanout Exchange: the message is cloned and sent to all queues connected to this exchange.
    3. Topic Exchange: using wildcards the message can be routed to some of connected queues.

This is the internal schema of a broker:

Mar 09

AMQP and RabbitMQ [TOC]

This entry is part 1 of 4 in the series AMQP and RabbitMQ

Reading time: 1 – 2 minutes

After reading the book ‘RabbitMQ in action‘ I’m working on series of posts  that will include the following subjects:

  1. What is AMQP? and the architecure
  2. Deep inside AMQP
  3. RabbitMQ CLI quick reference
  4. Hello World using ‘kombu’ library and python
  5. Parallel programming
  6. Events example
  7. RPC
  8. Clustering fundamentals
  9. Managing RabbitMQ from administration web interface
  10. Managing RabbitMQ from REST API

Please let me know if you are interested in this series of posts. Because in my opinion this is very interesting and it always comes in handy to know if someone has been working on those subjects.

Dec 28

Introducció a RestMS

Reading time: 2 – 4 minutes

RestMS schemaXMPP és un protocol de missatgeria no només orientat a mantenir converses entre usuaris sinó també a ser usat com a sistema RPC entre diferents aplicacions. Malgrat això res és perfecte i AMQP més orientat a la segona funció que no pas a la primera es perfila com estàndard  corporatiu molt més potent i eficient que XMPP en diversos aspectes. Però AMQP peca per ser força inaccessible degut a que no és trivial d’entendre i usar.

En tot aquest món de la missatgeria entre aplicacions hi ha un altre protocol no tan conegut però que preten tenir el millor d’XMPP i d’AMQP: RestMS. De fet, RestMS és realment una simplifiació d’AMQP que usa com a sistema de transport REST.

Es tracta d’un estàndard obert, amb una implementació Open Source força sòlida. Aquest estàndard ens aporta:

  • sistema d’enrutat de missatges
  • models de cues
  • fàcil d’extendre la seva semàntica
  • simple, perquè és precís i petit
  • segur, usa els sistemes de seguretat estàndards d’HTTP
  • escalable, perquè usa servidors, caches, proxies, etc. del món HTTP
  • resol la dicotomia ‘polling vs events’ usant long-polling, igual que BOSH
  • en principi no disposa de sistema de presència, tot i que es podria montar fàcilment
  • pot codificar durant el transportar les dades en XML o JSON
  • portable en diferents sistemes operatius
  • ofereix interoperatibilitat entre diferents llenguatges de programació

Per tot plegat RestMS es perfila com una alternativa interessant per alguns entorns, aportant una solució simple i potent en molts aspectes. Tot i que teoria en mà, el trobo força més lent que no pas AMQP. De fet, des de que vaig veure com s’usava un sistema AMQP per fer balanceix de càrrega en aplicacions de video usant GStreamer per fer una prova ‘fast and dirty’ em vaig quedar impresionat.