Skip to content

How to Save RabbitMQ Messages in PostgreSQL with Python?

RabbitMQ, a powerful means of communication between applications and connected objects, facilitates the efficient transfer of messages, but it does not provide a built-in mechanism for long-term storage. To overcome this limitation, integration with a robust database such as PostgreSQL is essential.

For instance, when leveraging AMQP or MQTT protocols for data transmission from connected objects, such as monitoring room temperature or collecting energy consumption from a device, there is often a pressing need to retain this information for future analysis. RabbitMQ does not inherently provide this data persistence feature, but by intelligently synchronizing your message flow with PostgreSQL, you can create a comprehensive solution that enables not only real-time communication but also durable data storage.

In this tutorial, we will show you how to save RabbitMQ messages in PostgreSQL using Python.

Prerequisites for Saving RabbitMQ Messages in PostgreSQL

To follow this tutorial, you need the following:

  • a RabbitMQ server
  • a PostgreSQL server

Launching RabbitMQ Server and PostgreSQL Server

In our example, we will use Docker to launch RabbitMQ and PostgreSQL. Below is the docker-compose.yml file that defines the RabbitMQ and PostgreSQL services.

version: "3"

services:
  rabbitmq:
    image: rabbitmq:3.12-management-alpine
    container_name: rabbit_mq_to_db
    environment:
      - RABBITMQ_DEFAULT_USER=user
      - RABBITMQ_DEFAULT_PASS=password
      - RABBITMQ_DEFAULT_VHOST=default_vhost
    ports:
      - 5672:5672
      - 15672:15672

  postgres:
    image: postgres:16-alpine
    container_name: postgres_db
    hostname: postgres
    restart: always
    environment:
      - POSTGRES_DB=postgres_db
      - POSTGRES_USER=postgres_user
      - POSTGRES_PASSWORD=password
    volumes:
      - ./docker-entrypoint-initdb.d/:/docker-entrypoint-initdb.d
    ports:
      - "5432:5432"

Description of the docker-compose.yml file:

This Docker Compose file defines two services, RabbitMQ and PostgreSQL, along with their respective configurations to create and orchestrate Docker containers that interact with each other. Here is a detailed explanation of the content:

RabbitMQ Service:

  • rabbitmq: Service name.
  • image: Uses RabbitMQ version 3.12 image with the management plugin enabled, allowing access to the RabbitMQ admin interface at http://localhost:15672. Connect using credentials user and password.
  • container_name: Name of the container created for this service.
  • environment: Defines environment variables for RabbitMQ, including username, password, and virtual host (vhost).
  • ports: Maps ports 5672 and 15672 of the RabbitMQ container to the same ports on the host.

PostgreSQL Service:

  • postgres: Service name.
  • image: Uses PostgreSQL version 16 image with Alpine Linux.
  • container_name: Name of the container created for this service.
  • hostname: Hostname of the PostgreSQL container.
  • restart: Container's automatic restart policy (set to "always").
  • environment: Defines environment variables for PostgreSQL, including the database name, username, and password.
  • volumes: Mounts the local directory ./docker-entrypoint-initdb.d/ into the PostgreSQL container's database initialization directory, allowing execution of SQL initialization scripts.
  • ports: Maps port 5432 of the PostgreSQL container to the same port on the host.

To launch RabbitMQ and PostgreSQL, execute the following command:

docker-compose up

Create Python Scripts to Save RabbitMQ Messages in PostgreSQL

Now that RabbitMQ and PostgreSQL are launched, we will create the Python scripts. We will need two scripts:

  • A RabbitMQ publisher to send messages with Python
  • A RabbitMQ consumer to receive messages with Python and save them in PostgreSQL

We will also need the pika library, which is a Python implementation of the AMQP 0-9-1 protocol to connect to RabbitMQ.

Finally, we will need the psycopg2 library to connect to PostgreSQL and interact with the database.

RabbitMQ Publisher to Send Messages with Python

The publisher is a Python script that connects to RabbitMQ and sends messages to a queue.

To connect to RabbitMQ, we will use the pika library. To install it, execute the following command:

pip install pika

Following code is the publisher:

import random

import pika
from time import sleep

# Connection to RabbitMQ
url = 'amqp://user:password@localhost:5672/default_vhost'
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel()

# Creation of the 'temperature' queue
channel.queue_declare('temperature')

# Creation of the 'temperature_routing_key' route that links the 'temperature' queue to the 'amq.direct' exchange
channel.queue_bind('temperature', 'amq.direct', 'temperature_routing_key')
while True:
    sleep(3)
    # Send a message to the 'temperature' queue
    channel.basic_publish('amq.direct', 'temperature_routing_key', body=str(random.uniform(0, 100)))

The RabbitMQ connection URL is amqp://user:password@localhost:5672/default_vhost where:

  • user is the username
  • password is the password
  • localhost is the RabbitMQ server's IP address
  • 5672 is the RabbitMQ port
  • default_vhost is the RabbitMQ vhost (virtual host)

RabbitMQ Consumer to Receive Messages with Python and Save them in PostgreSQL

The consumer is a Python script that connects to RabbitMQ, reads messages from a queue, and saves them in PostgreSQL.

Here is a simple example of a consumer that reads messages from the temperature queue and saves them in the temperature table in PostgreSQL:

import pika
import psycopg2

# Connexion to PostgreSQL database
connection_sql = psycopg2.connect(database="postgres_db", user="postgres_user", password="password", host="localhost", port="5432")
cursor = connection_sql.cursor()

# Definition of the callback function that will be called when a message is received in the 'temperature' queue
def callback(ch, method, properties, body):
    # Conversion of the message to string
    body = body.decode()
    # Insertion of the message in the 'temperature' table
    cursor.execute("INSERT INTO temperature (value) VALUES (%s)", (body,))
    connection_sql.commit()

# Connection to RabbitMQ
url = 'amqp://user:password@localhost:5672/default_vhost'
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.basic_consume('temperature', callback, auto_ack=True)
channel.start_consuming()
channel.close()
connection.close()

Complete Python Source Code

You can find the complete source code for this tutorial on Github.

It is slightly different from what is presented in this tutorial as it includes the file for creating the temperature table in PostgreSQL.

It uses classes to connect to PostgreSQL and for the RabbitMQ consumer.