This article narrates my experience in designing an alert system for my IoT platform project.
The main goal of this system is to monitor data and trigger an alert if it exceeds a predefined threshold.
My research aimed at finding an alert solution requiring minimal administration and easy usability.
RabbitMQ, a powerful means of communication between applications and connected objects, facilitates the efficient
transfer of messages, but it does not provide a built-in mechanism for long-term storage. To overcome this limitation,
integration with a robust database such as PostgreSQL is essential.
For instance, when leveraging AMQP or MQTT protocols for data transmission from connected objects, such as monitoring
room temperature or collecting energy consumption from a device, there is often a pressing need to retain this
information for future analysis. RabbitMQ does not inherently provide this data persistence feature, but by intelligently
synchronizing your message flow with PostgreSQL, you can create a comprehensive solution that enables not only
real-time communication but also durable data storage.
In this tutorial, we will show you how to save RabbitMQ messages in PostgreSQL using Python.
In our example, we will use Docker to launch RabbitMQ and PostgreSQL. Below is the docker-compose.yml file that
defines the RabbitMQ and PostgreSQL services.
This Docker Compose file defines two services, RabbitMQ and PostgreSQL, along with their respective configurations
to create and orchestrate Docker containers that interact with each other. Here is a detailed explanation of the content:
RabbitMQ Service:
rabbitmq: Service name.
image: Uses RabbitMQ version 3.12 image with the management plugin enabled, allowing access to the RabbitMQ admin interface at http://localhost:15672. Connect using credentials user and password.
container_name: Name of the container created for this service.
environment: Defines environment variables for RabbitMQ, including username, password, and virtual host (vhost).
ports: Maps ports 5672 and 15672 of the RabbitMQ container to the same ports on the host.
PostgreSQL Service:
postgres: Service name.
image: Uses PostgreSQL version 16 image with Alpine Linux.
container_name: Name of the container created for this service.
hostname: Hostname of the PostgreSQL container.
restart: Container's automatic restart policy (set to "always").
environment: Defines environment variables for PostgreSQL, including the database name, username, and password.
volumes: Mounts the local directory ./docker-entrypoint-initdb.d/ into the PostgreSQL container's database initialization directory, allowing execution of SQL initialization scripts.
ports: Maps port 5432 of the PostgreSQL container to the same port on the host.
To launch RabbitMQ and PostgreSQL, execute the following command:
The publisher is a Python script that connects to RabbitMQ and sends messages to a queue.
To connect to RabbitMQ, we will use the pika library. To install it, execute the following command:
pipinstallpika
Following code is the publisher:
importrandomimportpikafromtimeimportsleep# Connection to RabbitMQurl='amqp://user:password@localhost:5672/default_vhost'params=pika.URLParameters(url)connection=pika.BlockingConnection(params)channel=connection.channel()# Creation of the 'temperature' queuechannel.queue_declare('temperature')# Creation of the 'temperature_routing_key' route that links the 'temperature' queue to the 'amq.direct' exchangechannel.queue_bind('temperature','amq.direct','temperature_routing_key')whileTrue:sleep(3)# Send a message to the 'temperature' queuechannel.basic_publish('amq.direct','temperature_routing_key',body=str(random.uniform(0,100)))
The RabbitMQ connection URL is amqp://user:password@localhost:5672/default_vhost where:
user is the username
password is the password
localhost is the RabbitMQ server's IP address
5672 is the RabbitMQ port
default_vhost is the RabbitMQ vhost (virtual host)
The consumer is a Python script that connects to RabbitMQ, reads messages from a queue, and saves them in PostgreSQL.
Here is a simple example of a consumer that reads messages from the temperature queue and saves them in the temperature
table in PostgreSQL:
importpikaimportpsycopg2# Connexion to PostgreSQL databaseconnection_sql=psycopg2.connect(database="postgres_db",user="postgres_user",password="password",host="localhost",port="5432")cursor=connection_sql.cursor()# Definition of the callback function that will be called when a message is received in the 'temperature' queuedefcallback(ch,method,properties,body):# Conversion of the message to stringbody=body.decode()# Insertion of the message in the 'temperature' tablecursor.execute("INSERT INTO temperature (value) VALUES (%s)",(body,))connection_sql.commit()# Connection to RabbitMQurl='amqp://user:password@localhost:5672/default_vhost'params=pika.URLParameters(url)connection=pika.BlockingConnection(params)channel=connection.channel()channel.basic_consume('temperature',callback,auto_ack=True)channel.start_consuming()channel.close()connection.close()