If you are in need of having applications, scripts or software of any kind comunicate between themselves, RabbitMQ is a great messaging queue software for that purpose. Being able to create multiple queues, virtual hosts, users, etc on the same environment, makes it a tool equipped to deal with most enterprise needs on software communication. In the script below, it is assumed that you already have a running RabbitMQ service deployed and we will not be going through it in this article, so if you haven't yet setup your node or cluster, make sure to find a good tutorial or read the documentation on how to deploy a RabbitMQ instance.
"The great enemy of communication, we find, is the illusion of it." William H. Whyte
This python script creates a class with logging for DEBUG purposes only and with most RabbitMQ basic needs. The connection is initiated when the object is created and reconnection attempts are done whenever a network error is presented. The pika library is the wrapper for the RabbitMQ API in python and is used to connect/disconnect and operate. In this example, I am using JSON for communication and transform it to a JSON string before sending the message to the queue, which is then converted again to JSON after being received the string from the queue.
The library can be installed through pip with the following command:
pip3 install pika
#!/usr/bin/env python3
import json, logging, pika
class RabbitMQ:
def __init__(self, host, username, password, virtual_host='/', durable=True):
self.host = host
self.username = username
self.password = password
self.virtual_host = virtual_host
self.durable = durable
self.connect()
def connect(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host=self.host,
credentials=pika.credentials.PlainCredentials(self.username, self.password),
virtual_host=self.virtual_host))
def disconnect(self):
self.connection.close()
def receive(self, queue):
message = None
logging.debug(f"Checking messages on RabbitMQ for queue '{queue}'")
while True:
try:
channel = self.connection.channel()
except pika.exceptions.StreamLostError:
logging.debug(f"RabbitMQ lost stream, reconnecting")
self.connect()
else:
break
status = channel.queue_declare(queue=queue, durable=self.durable)
if status.method.message_count == 0:
logging.debug(f"Empty queue '{queue}', skipping")
channel.close()
return
try:
for method_frame, properties, body in channel.consume(queue):
message = json.loads(body)
logging.debug(f"Message '{message}' received from queue '{queue}'")
channel.basic_ack(method_frame.delivery_tag)
break # Needed so the consumer doesn't stay waiting for another message
except pika.exceptions.ChannelClosedByBroker as e:
logging.debug(f"RabbitMQ channel closed due to '{e}'")
requeued_messages = channel.cancel()
channel.close()
return message
def send(self, queue, message):
while True:
try:
channel = self.connection.channel()
except pika.exceptions.StreamLostError:
logging.debug(f"RabbitMQ lost stream, reconnecting")
self.connect()
else:
break
channel.queue_declare(queue=queue, durable=self.durable)
channel.basic_publish(exchange='', routing_key=queue, body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
)
)
logging.debug(f"Message '{message}' sent to RabbitMQ queue '{queue}'")
channel.close()
if __name__ == '__main__':
logging.basicConfig()
rabbitmq = RabbitMQ('localhost', 'username', 'somepassword')
rabbitmq.send('test', [{"foo":"bar"},{"foo":"bar"}])
rabbitmq.receive('test')
To run the script simply execute the following line in the terminal:
python3 /path/to/created_script.py