RabbitMQ for Inter-Process Communications.

Often, the solution to some particular issue we have involves the use of scripts. This can be one or a few scripts organized with a specific structure and a specific design. Most likely you will need to have an intercommunication mechanism for your scripts to work. We call it Inter-Process Communication or IPC for short. In general, it’s common for applications to be implemented as a combination of particular components working together by sending and intercepting messages since it allows components of your App to be independently changed without amending other components, as well as decoupling logic. As a result, application design is made more coherent and it becomes much easier to test individual components since they are independent from other components and its dependencies can be relatively easy stubbed.

Depending on the particular IPC implementation, the intercommunication mechanism can be organized in quite a few ways: files, channels (or pipes), sockets, remote procedure calls (RPCs), memory separation, etc. Each way has its own pros and cons. For instance, when we use a channel, the script must “hold” a connection all the time to make it possible to send data. RPC, on the other hand, is appropriate only when each script in a chain of calls gets executed sequentially so that a result from the previous call is passed to the input for the next one. In fact, most of the implementations for any of these methods can vary based on the operating system. The RPC method is probably hard to adapt to asynchronous intercommunication. In this case, the simplest way to enable communication would be to use files or a database as a temporary store for messages. But the most reliable way is to use Message Brokers or Message Queues.

The most popular services that provide Messages Queues are AWS SQS, Microsoft Azure Service Bus, IBM MQ, Apache Kafka, RabbitMQ, and others. The most interesting services for us are AmazonMQ and RabbitMQ. These are two very similar tools: AmazonMQ, a managed service provided by the AWS, and RabbitMQ, open source software that can be installed on your own server.

RabbitMQ is probably the most widely deployed Message Broker, it is based on the Advanced Message Queuing Protocol (AMQP) standard, and as we have said, it is open source. In short, RabbitMQ is a powerful and flexible Message Broker despite its compact design.

RabbitMQ: basic usage

We are not going to talk about RabbitMQ installation since you can find a lot of good resources online. Once you have your RabbitMQ up and running we can install pika library to easily work with the message broker. Installation is as simple as running the following command:

pip install pika

Since we will be working with messages in the queue we will need to have a messages producer that will fill in our queue. Here is the simple code to do that:

producer.py

import pika
from datetime import datetime

def main():
    # Creating connection for RabbitMQ
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672))

    # Getting channel descriptor
    channel = connection.channel()

    # Declare new queue if it doesn't exist
    channel.queue_declare(queue='test_queue')

    # Create new message with timestamp
    data = "Hi, consumer! [{}]".format(datetime.strftime(datetime.now(), '%Y/%m/%d %H:%M:%S'))

    # Publish message in queue
    channel.basic_publish(exchange='',
                          routing_key='test_queue',
                          body=data)
    # Close connection
    connection.close()


if __name__ == '__main__':
    main()

To write to the queue we will need to get a channel descriptor then initialize the queue and then call the method basic_publish. Once this is done, close the connection.

Here is a simple script that will read from the queue:

producer.py

import pika
from datetime import datetime

def main():
    # Creating connection for RabbitMQ
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672))

    # Getting channel descriptor
    channel = connection.channel()

    # Declare new queue if it doesn't exist
    channel.queue_declare(queue='test_queue')

    # Create new message with timestamp
    data = "Hi, consumer! [{}]".format(datetime.strftime(datetime.now(), '%Y/%m/%d %H:%M:%S'))

    # Publish message in queue
    channel.basic_publish(exchange='',
                          routing_key='test_queue',
                          body=data)
    # Close connection
    connection.close()


if __name__ == '__main__':
    main()

We run a similar set of basic commands in the beginning. Please note that even though the Consumer reads from the queue, we initialize the queue just in case Consumer will be executed earlier. That allows us to avoid issues in case the Producer will try to connect to a queue that does not exist yet.

To read from the queue we use the method basic_get which provides direct access to the queue. The no_ask flag instructs our library to delete the message from the queue without waiting for confirmation. In our simple example it is not as important, but please keep in mind that for your production grade scripts and make them robust, you may need to manage your messages in case there is an error additionally, so to say, handle gracefully. In other cases, you may lose your message without acknowledgment. Be sure to handle this condition as well.

Here is an example reading from the queue with confirmation:

# Start fetching from queue
while True:
        method_frame, header_frame, body = channel.basic_get(queue='test_queue', no_ack=False)
        if body:
            print(body.decode("utf-8"))
            channel.basic_ack(method_frame.delivery_tag)
        else:
            break

In the example above, if the confirmation after message processing is left intact the message will stay in the queue and block all other messages from been processed and you will end up with a hanging script.

The other way to read from the queue is to use method basic_consume which listens to a queue and in case it receives a new message it will call the callback method:

def callback(channel, method, properties, body):
    print(body.decode("utf-8"))

channel.basic_consume(callback, queue='test_queue')

channel.start_consuming()

The main downside of using this method is that it will be running and waiting for new messages while manually stopped. This approach will not work should you have a function that must return the proper status code upon completion.

Processing “dead” messages

Sometimes you may face a situation where you have messages in the queue that cannot be processed. For instance, you may get an error for message processing and would like to retry it later. Sometimes those messages can block your main queue from processing other messages. That can lead to data inconsistency or queue overflow and can lead to message loss and data integrity issues.

FIFO principle allows us to easily and efficiently move messages in the queue which may be not as transparent to implement or to think about.

Here are a few options to solve such an issue:

  1. If the message resides in the queue and is poorly formed with an error in its structure, it can be simply ignored with a request to resend a message after reading it. Simply remove it by using a method basic_ack which stands for basic acknowledgement:
method = channel.basic_get(queue='test', no_ack=False)
channel.basic_ack(method.delivery_tag)

Or, if you use method basic_nack with the required flag set to False:

method = channel.basic_get(queue='test', no_ack=False)
channel.basic_nack(method.delivery_tag, requeue=False)

Here is a catch. Before RabbitMQ version 2.7, the basic_nack method usage with flag required set to True would put a message in the end of the queue, which would solve our issue. However, in the newer versions, RabbitMQ persists messages in the same order as they were added to the queue.

  1. If the message cannot be processed, you may delay its processing and retry later. The best practice would be to use “Dead Letter Exchanges”. The basic idea is to temporarily put problematic, unconfirmed messages (marked with a method basic_nack or basic_reject) into a separate queue, so to say temporary storage, and once you processed all messages, put them back into the main queue and retry processing. Here is an example:
...
channel.exchange_declare(exchange='dlx', exchange_type='direct')
channel.queue_declare(queue='rejected')
channel.queue_declare(queue='main', arguments={
            'x-dead-letter-exchange': 'dlx',
        })
channel.queue_bind(exchange='dlx', routing_key='main', queue='rejected')
...

method = channel.basic_get(queue='test', no_ack=False)
if random.random() < 0.5:
    channel.basic_nack(method.delivery_tag, requeue=False)
else:
    channel.basic_ack(method.delivery_tag)
...
rejected_cnt = channel.queue_declare(queue='rejected', passive=True).method.message_count

if rejected_cnt:
    for _ in range(rejected_cnt):
             method, header, body = channel.basic_get(queue='rejected', no_ack=False)
             channel.basic_publish(exchange='', routing_key='main', body=body)
             channel.basic_ack(method.delivery_tag)

Data Consistency and Data Integrity in RabbitMQ

Another important issue that you as a developer may face is data integrity. Most likely in the real world your scripts will be running on slow and unreliable connections, so interruptions will happen all the time and sometimes our servers go down unpredictably.

By default, RabbitMQ will not persist messages to the underlying file system and will use in-memory storage, unless you explicitly set it.

According to the best practices, you should initialize your queues with the durable flag set to True:

channel.queue_declare(queue='queue', durable=True)

Also, make sure you send your messages with delivery mode set to 2, which represents durable mode of message transmittance:

channel.basic_publish(exchange='', routing_key=self.queue_name, body=data, properties=pika.BasicProperties(
                         delivery_mode=2  # persistent mode
                      ))

This will write your messages to file system and will work reliably well in case of interruptions. Keep in mind that this change may impact your system performance.

Conclusion

RabbitMQ is quite an easy and handy tool to work with the queues and transmit messages between systems in order to decouple logical pieces of your platform.

We showed some basic functionality and explained basic workflow to write to the queue, read from the queue, and process messages. Also, we showed some best practices for using RabbitMQ to make your system robust and predictable. At Konstankino we utilize RabbitMQ for interprocess communications to make complex systems simpler and easier to work with. Also, this method allows us to scale our creations better, so our customers can be prepared to handle an increasing number of users.

Contact Info
Alex

Team Lead - Konstankino LLC


At Konstankino we are focused on efficiency and security, streamlining business processes, custom software development, and deployment.