oriolrius.cat

Des del 2000 compartiendo sobre…

Tag: queues

Server send push notifications to client browser without polling

Reading time: 5 – 8 minutes

Nowadays last version of browsers support websockets and it’s a good a idea to use them to connect to server a permanent channel and receive push notifications from server. In this case I’m going to use Mosquitto (MQTT) server behind lighttpd with mod_websocket as notifications server. Mosquitto is a lightweight MQTT server programmed in C and very easy to set up. The best advantage to use MQTT is the possibility to create publish/subscriber queues and it’s very useful when you want to have more than one notification channel. As is usual in pub/sub services we can subscribe the client to a well-defined topic or we can use a pattern to subscribe to more than one topic. If you’re not familiarized with MQTT now it’s the best moment to read a little bit about because that interesting protocol. It’s not the purpose of this post to explain MQTT basics.

A few weeks ago I set up the next architecture just for testing that idea:

mqtt_schema

weboscket gateway to mosquitto mqtt server with javascrit mqtt client

The browser

Now it’s time to explain this proof of concept. HTML page will contain a simple Javascript code which calls mqttws31.js library from Paho. This Javascript code will connect to the server using secure websockets. It doesn’t have any other security measure for a while may be in next posts I’ll explain some interesting ideas to authenticate the websocket. At the end of the post you can download all source code and configuration files. But now it’s time to understand the most important parts of the client code.

client = new Messaging.Client("ns.example.tld", 443, "unique_client_id");
client.onConnectionLost = onConnectionLost;
client.onMessageArrived = onMessageArrived;
client.connect({onSuccess:onConnect, onFailure:onFailure, useSSL:true});

Last part is very simple, the client connects to the server and links some callbacks to defined functions. Pay attention to ‘useSSL’ connect option is used to force SSL connection with the server.

There are two specially interesting functions linked to callbacks, the first one is:

function onConnect() {
  client.subscribe("/news/+/sport", {qos:1,onSuccess:onSubscribe,onFailure:onSubscribeFailure});
}

As you can imagine this callback will be called when the connections is established, when it happens the client subscribes to all channels called ‘/news/+/sports’, for example, ‘/news/europe/sports/’ or ‘/news/usa/sports/’, etc. We can also use, something like ‘/news/#’ and it will say we want to subscribe to all channels which starts with ‘/news/’. If only want to subscribe to one channel put the full name of the channel on that parameter. Next parameter are dictionary with quality of service which is going to use and links two more callbacks.

The second interesting function to understand is:

function onMessageArrived(message) {
  console.log("onMessageArrived:"+message.payloadString);
};

It’s called when new message is received from the server and in this example, the message is printed in console with log method.

The server

I used an Ubuntu 12.04 server with next extra repositories:

# lighttpd + mod_webserver
deb http://ppa.launchpad.net/roger.light/ppa/ubuntu precise main
deb-src http://ppa.launchpad.net/roger.light/ppa/ubuntu precise main

# mosquitto
deb http://ppa.launchpad.net/mosquitto-dev/mosquitto-ppa/ubuntu precise main
deb-src http://ppa.launchpad.net/mosquitto-dev/mosquitto-ppa/ubuntu precise main

With these new repositories you can install required packages:

apt-get install lighttpd lighttpd-mod-websocket mosquitto mosquitto-clients

After installation it’s very easy to run mosquitto in test mode, use a console for that and write the command: mosquitto, we have to see something like this:

# mosquitto
1379873664: mosquitto version 1.2.1 (build date 2013-09-19 22:18:02+0000) starting
1379873664: Using default config.
1379873664: Opening ipv4 listen socket on port 1883.
1379873664: Opening ipv6 listen socket on port 1883.

The configuration file for lighttpd in testing is:

server.modules = (
        "mod_websocket",
)

websocket.server = (
        "/mqtt" => ( 
                "host" => "127.0.0.1",
                "port" => "1883",
                "type" => "bin",
                "subproto" => "mqttv3.1"
        ),
)

server.document-root        = "/var/www"
server.upload-dirs          = ( "/var/cache/lighttpd/uploads" )
server.errorlog             = "/var/log/lighttpd/error.log"
server.pid-file             = "/var/run/lighttpd.pid"
server.username             = "www-data"
server.groupname            = "www-data"
server.port                 = 80

$SERVER["socket"] == ":443" {
    ssl.engine = "enable" 
    ssl.pemfile = "/etc/lighttpd/certs/sample-certificate.pem" 
    server.name = "ns.example.tld"
}

Remember to change ‘ssl.pemfile’ for your real certificate file and ‘server.name’ for your real server name. Then restart the lighttpd and validate SSL configuration using something like:

openssl s_client -host ns.example.tld -port 443

You have to see SSL negotiation and then you can try to send HTTP commands, for example: “GET / HTTP/1.0” or something like this. Now the server is ready.

The Test

Now you have to load the HTML test page in your browser and validate how the connections is getting the server and then how the mosquitto console says how it receives the connection. Of course, you can modify the Javascript code to print more log information and follow how the client is connected to MQTT server and how it is subscribed to the topic pattern.

If you want to publish something in MQTT server we could use the CLI, with a command mosquitto_pub:

mosquitto_pub -h ns.example.tld -t '/news/europe/sport' -m 'this is the message about european sports'

Take a look in your browser Javascript consle you have to see how the client prints the message on it. If it fails, review the steps and debug each one to solve the problem. If you need help leave me a message. Of course, you can use many different ways to publish messages, for example, you could use python code to publish messages in MQTT server. In the same way you could subscribe not only browsers to topics, for example, you could subscribe a python code:

import mosquitto

def on_connect(mosq, obj, rc):
    print("rc: "+str(rc))

def on_message(mosq, obj, msg):
    print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))

def on_publish(mosq, obj, mid):
    print("mid: "+str(mid))

def on_subscribe(mosq, obj, mid, granted_qos):
    print("Subscribed: "+str(mid)+" "+str(granted_qos))

def on_log(mosq, obj, level, string):
    print(string)

mqttc = mosquitto.Mosquitto("the_client_id")
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_publish = on_publish
mqttc.on_subscribe = on_subscribe

mqttc.connect("ns.example.tld", 1883, 60)
mqttc.subscribe("/news/+/sport", 0)

rc = 0
while rc == 0:
    rc = mqttc.loop()

Pay attention to server port, it isn’t the ‘https’ port (443/tcp) because now the code is using a real MQTT client. The websocket gateway isn’t needed.

The files

  • mqtt.tar.gz – inside this tar.gz you can find all referenced files

Deep inside AMQP

Reading time: 5 – 8 minutes

In the next lines I’ll describe with more details the properties and features of AMQP elements. It won’t be an exhaustive description but in my opinion more than enough to start playing with AMQP queues.

Channels

When producers and consumers connects to the broker using a TCP socket after authenticating the connection they establish a channel where AMQP commands are sent. The channel is a virtual path inside a TCP connection between this is very useful because there can be multiple channels inside the TCP connection each channels is identified using an unique ID.

An interesting parameter of a channel is confirmation mode if this is set to true when messages delivered to a exchange finally gets their queues the producer receives an acknowledge message with an UID of the message. This kind of messages are asynchronous and permits to a producer send the next message when it is still waiting the ACK message. Of course if the message cannot be stored and it is lost the producer receives a NACK (not acknowledged) message.

Producers

Maybe this is the most simple part of the system. Producers only need to negotiate the authentication across a TCP connection create a channel and then publish all messages that want with its corresponding routing key. Of course, producers can create exchanges, queues and then bind them. But usually this is not a good idea is much more secure do this from consumers. Because when a producers try to send a message to a broker and doesn’t have the needed exchange then message will be lost. Usually consumers are connected all time and subscribed to queues and producers only connect to brokers when they need to send messages.

Consumers

When a consumer connects to a queue usually uses a command called basic.consume to subscribe the channel to a queue, then every time subscribed queue has a new message it is sent to consumer after last message is consumed, or rejected.

If consumer only want to receive one message without a subscription it can use the command basic.get.This is like a poll method. In fact, the consumer only gets a message each time it sends the command.

You can get the best throughput using basic.consume command because is more efficient than poll every time the consumer wants another message.

When more than one consumer was connected to a queue, messages are distributed in a round-robin. After the message is delivered to a consumer this send an acknowledge message and then queue send another message to next consumer. If the consumer sends a reject message the same message is sent to next consumer.

There are two types of acknowledgements:

  • basic.ack: this is the message that sends consumer to queue to acknowledge the reception of a message
  • auto_ack: this is a parameter we can set when consumer subscribes to a queue. The setting assumes ACK message from consumer and then queue sends next message without waiting the ACK message.

The message basic.reject is sent when the consumer wants to reject a received message. This message discards the message and it is lost. If we want to requeue the message we can set the parameter requeue=true when sent a reject message.

When the queue is created there can be a parameter called dead letter set to true, then consumer rejects a message with the parameter requeue=false the message is queued to a new queue called  dead letter. This is very useful because after all we can go tho that queue an inspect the message rejection reason.

Queues

Both consumers and producers can create a queue using queue.declare command. The most natural way is create queues from consumers and then bind it to an exchange. The consumers needs a free channel to create a queue, if a channel is subscribed to a queue, the channel is busy and cannot create new queues. When a queue is created usually we use a name to identify the queue, if the name is not specified it’s randomly generated. This is useful when create temporary and anonymous queues for RPC-over-AMQP.

Parameters we can set when create a new queue:

  • exclusive – this setting makes a queue private and is only accessible from your application. Only one consumer can connect to a queue.
  • auto-delete – when last consumer unsubscribes from queue the queue is removed.
  • passive – when create a queue that exists the server returns successfully or returns fail if parameters don’t match. If passive parameter is set and we create a queue that exists always returns success but if the queue doesn’t exist it is not created.
  • durable – the queue can persist when the services reboots.

Exchange and binding

In the first post of the serie we talked about different exchange types as you can remember these types are: direct, fanout and topic. And the most important parameter to set when producer sends a message is the routing key this is used to route the message to a queue.

Once we have declared an exchange this can be related with a queue using a binding command: queue_bind. The relation between them is made using the routing key or a pattern based in routing key. When exchange has type fanout the routing key or patterns are not needed.

Some pattern examples can be: log.*, message.* and #.

The most important exchange parameters are:

  • type: direct, fanout and topic.
  • durable: makes an exchange persistent to reboots.

Broker and virtual hosts

A broker is a container where exhanges, bindings and queues are created. Usually we can define more than one virtual brokers in the same server. Virtual brokers are also called virtual hosts. The users, permissions and something else related to a Broker cannot be used from another one. This is very useful because we can create multiple brokers in the same physical server like multi-domain web server and when some of this virtual hosts is too big it can be migrated to another physical server and it can be clustered if it is required.

Messages

An AMQP message is a binary without a fixed size and format. Each application can set it’s own messages. The AMQP broker only will add small headers to be routed among different queues as fast as possible.

Messages are not persistent inside a broker unless the producer sets the parameter persistent=true. In the other way the messages needs to be stored in durable exchanges and durable queues to persist in the broker when it is restarted. Of course when the messages are persistent these must be wrote to disk and the throughput will fall down. Then maybe sometimes create persistent messages is not a good idea.

 

 

What is AMQP? and the architecture

Reading time: 3 – 4 minutes

What is AMQP? (Advanced Message Queuing Protocol)

When two applications need to communicate there are a lot of solutions like IPC, if these applications are remote we can use RPC. When two or more applications communicate with each other we can use ESB. And there are many more solutions. But when more than two applications communicate and the systems need to be scalable the problem is a bit more complicated. In fact, when we need to send a call to a remote process or distribute object processing among different servers we start to think about queues.

Typical examples are rendering farms, massive mail sending, publish/subscriptions solutions like news systems. At that time we start to consider a queue-based solution. In my case the first approach to these types of solutions was Gearman; that is a very simple queue system where workers connect to a central service where producers have to call the methods published by workers; the messages are queued and delivered to workers in a simple queue.

Another interesting solution can be use Redis like a queue service using their features like publish/subscribe. Anyway always you can develop your own queue system. Maybe there a lot of solutions like that but when you are interested in develop in standard way and want a long-run solution with scalability and high availability then you need to think in use AMQP-based solutions.

The most simple definition of AMQP is: “message-oriented middleware”. Behind this simple definition there are a lot of features available. Before AMQP there was some message-oriented middlewares, for example, JMS. But AMQP is the standard protocol to keep when you choice a queue-based solution.

AMQP have features like queuing, routing, reliability and security. And most of the implementations of AMQP have a really scalable architectures and high availability solutions.

The architecture

The basic architecture is simple, there are a client applications called producers that create messages and deliver it to a AMQP server also called broker. Inside the broker the messages are routed and filtered until arrive to queues where another applications called consumers are connected and get the messages to be processed.

When we have understood this maybe is the time to deep inside the broker where there are AMQP magic. The broker has three parts:

  1. Exchange: where the producer applications delivers the messages,  messages have a routing key and exchange uses it to route messages.
  2. Queues: where messages are stored and then consumers get the messages from queues.
  3. Bindings: makes relations between exchanges and queues.

When exchange have a message uses their routing key and three different exchange methods to choose where the message goes:

    1. Direct Exchange:  routing key matches the queue name.
    2. Fanout Exchange: the message is cloned and sent to all queues connected to this exchange.
    3. Topic Exchange: using wildcards the message can be routed to some of connected queues.

This is the internal schema of a broker:

AMQP and RabbitMQ [TOC]

Reading time: 1 – 2 minutes

After reading the book ‘RabbitMQ in action‘ I’m working on series of posts  that will include the following subjects:

  1. What is AMQP? and the architecure
  2. Deep inside AMQP
  3. RabbitMQ CLI quick reference
  4. Hello World using ‘kombu’ library and python
  5. Parallel programming
  6. Events example
  7. RPC
  8. Clustering fundamentals
  9. Managing RabbitMQ from administration web interface
  10. Managing RabbitMQ from REST API

Please let me know if you are interested in this series of posts. Because in my opinion this is very interesting and it always comes in handy to know if someone has been working on those subjects.