RabbitMQ - Pika
Short Tutorial on Rabbit MQM & PIKA
To tell you short Rabbit MQM is somewhat a Mail Server. If you still dont understand it, then think of it like a friend who safe keep the courier sent you when you are not available in office.
Technicall speaking, Rabbit is (1) an Open Source (2) Message Queue Broker.
MQM Definitions:
Producer: one who creates a job request.
Consumer: one who processer a job request.
Queue: one who stores request.
Exchange: Gate/Door for Queue.
RoutingKey: Key for accessing Queue via Exchange.
Generally, this is how people/process send work
<code>producer -> consumer
In Reality, as consumers are not always standing for your instruction, so,.. having a queue system helps
<code>producer -> stack/queue -> consumer
For some reasons, mostly a producer generally does not stick to one type of customer and starts creating multiple kind of products for different purposes and consumer
<code>producer -> exchange_server -> Queue -> consumer
An Exchange Server, understand the producer and sends the requests to relavant queues.
I’m using macbook(good/bad), so here are my installation instructions
<code># Installing RabbitMQ
brew install rabbitmq
Rabbit uses erlang
, so brew will install it. No need to specify special instruction.
Now that we have a message broker/message queue manager, we need some body who will actually pick and do that tasks.
<code># pip install celery
We will celery
module itself to create and send work.
Created a simple example of a slow running job.
create a queue
created an exchange
1. Sample Working Python Program
[code lang=Python] import random import time
def fibo(t): “"”Return the smallest fibonaci number less than given value.””” a = 0 b = 1 while (b < t): a, b = b, a + b return a
def main(value): time.sleep(random.random() * 3) return fibo(value) [/code]
Testing fibo
<code>>>> ## testing
>>> fibo(12), fibo(21), fibo(1112)
(8, 13, 987)
Testing Producer
<code>## testing
main(12), main(21), main(1112)
(8, 13, 987)
2. Working RabbitMq
Experimental Commands
# rabbit mqm is a Message queue broker
# It has Queues, Exchanges for Queues and
# Routing Key that connect them.
# Here are some commands to reset your MQM if wish to experiment
# starting mqm server
# webpage (http://localhost:15672/#/)
# credentials (guest/guest)
# to stop - stopping above does not always kill the background
# so use this command.
kill `ps -eaf | grep -v grep | grep rabbit | awk '{ print $2 }'`
# rabbit mqm - resetting queues
# stop application
rabbitmqctl stop_app
# reset
rabbitmqctl reset
# start
rabbitmqctl start_app
# kill rabbit
kill `ps -eaf | grep -v grep | grep rabbit | awk '{ print $2 }'`
If you have installed correct, you can run following command in your terminal
The default rabbitmq server comes with a UI, and you can find it at following url.
Default credentials are username: guest
and password: guest
1. created queues, exchanges and routing_keys
Create a queue name as
, with no extra but default params -
Create a queue name as
, like above -
Go to http://localhost:15672/#/exchanges with name as
and type asdirect
In exchange page, you can see the newly created test-exchange
in the list of exchanges, click it.
Go to section, Add binding from this exchange
and start adding following
To queue
and routing key isredpill
To queue
and routing key isbluepill
Let’s learn to send a request to a queue
Following shell code``
[code lang=bash] SDS-bash3.2$ cat create_request.py from tasks import wrapped_main
if name == “main”: print(wrapped_main.delay(15)) [/code]
Executing above code
[code lang=bash] SDS-bash3.2$ python create_request.py 94c77870-2262-4ccd-af22-b45326c3c920 [/code]
In Rabbit MQM, we can see a message came to Celery
from Queues Page.
Issue is, we need this message to our `test-queue1`.
[code lang=bash]
taking a backup copy
SDS-bash3.2$ cp -p tasks.py tasks_old.py
update the file
SDS-bash3.2$ vi tasks_old.py
check the differences
SDS-bash3.2$ diff tasks.py tasks_old.py 8c8 < @app.task(queue=’test-queue1’) —
@app.task [/code]
In Rabbit MQM, we can see a message came to test-queue1
from Queues Page.
Next Issue, we need to see this output message.
[code lang=text] SDS-bash3.2$ cat tasks.py
from celery import Celery from kombu import Exchange, Queue from main import main
pika_cred = pika.PlainCredentials(‘guest’, ‘guest’) pika_conn_params = pika.ConnectionParameters(‘localhost’, credentials=pika_cred) pika_connection = pika.BlockingConnection(pika_conn_params) channel = pika_connection.channel()
creating architecture - queues
channel.queue_declare(queue=’test-queue1’) channel.queue_declare(queue=’test-queue2’)
creating architecture - queues
channel.exchange_declare(exchange=’test-exchange’, exchange_type=’direct’)
creating architecture - queues
channel.queue_bind(exchange=’test-exchange’, queue=’test-queue1’, routing_key=’redpill’) channel.queue_bind(exchange=’test-exchange’, queue=’test-queue2’, routing_key=’bluepill’)
app = Celery(“tasks”, broker=”pyamqp://guest@localhost//”)
@app.task(queue=’test-queue1’) def wrapped_main(value): return main(value) [/code]
Adding a backend process now.
[code lang=text]
def pika_messager(message):
“"”A messenger to send message to Queue.”””
header = “Host: “ + str(os.uname()[1]) + “\tDatetime: “ + str(time.ctime()) + ‘\n\n’
print(‘start Pika Connection ‘ + header)
pika_connection = pika.BlockingConnection(pika_conn_params)
channel = pika_connection.channel()
channel.basic_publish(exchange=’variantCallerExchange’, routing_key=’variantCallerRoutingKey’, body=header+message)
print(‘Pika Connection is close now’)
@app.task(queue=’test-queue1’) def wrapped_main(value): result = main(value) pika_messager(“Job finished ! Result is” + result) return result
Complete Code & Summary
We have a main.py
which our main script(product) which can do a certain job. We are usinga RabbitMq, a message queue manager(broker) to manage requests. We will use pika
for creating requests, creating message(queues, exchanges, routing keys) and we will use celery
for reporting completed jobs.
Note: Complete jobs means both success or failure.
filename: main.py
[code lang=python] #! main.py import random import time
def fibo(t): “"”Return the smallest fibonaci number less than given value.””” a = 0 b = 1 while (b < t): a, b = b, a + b return a
def main(value): time.sleep(random.random() * 10) return fibo(value) [/code]
filename: task.py
[code lang=python] from celery import Celery from kombu import Exchange, Queue from main import main
pika_cred = pika.PlainCredentials(‘guest’, ‘guest’) pika_conn_params = pika.ConnectionParameters(‘localhost’, credentials=pika_cred) pika_connection = pika.BlockingConnection(pika_conn_params) channel = pika_connection.channel()
creating architecture - queues
channel.queue_declare(queue=’test-queue1’) channel.queue_declare(queue=’test-queue2’)
creating architecture - queues
channel.exchange_declare(exchange=’test-exchange’, exchange_type=’direct’)
creating architecture - queues
channel.queue_bind(exchange=’test-exchange’, queue=’test-queue1’, routing_key=’redpill’) channel.queue_bind(exchange=’test-exchange’, queue=’test-queue2’, routing_key=’bluepill’)
app = Celery(“tasks”, broker=”pyamqp://guest@localhost//”)
def pika_messager(message): “"”A messenger to send message to Queue.””” header = “Host: “ + str(os.uname()[1]) + “\tDatetime: “ + str(time.ctime()) + ‘\n\n’ print(‘start Pika Connection ‘ + header) pika_connection = pika.BlockingConnection(pika_conn_params) channel = pika_connection.channel() channel.basic_publish(exchange=’variantCallerExchange’, routing_key=’variantCallerRoutingKey’, body=header+message) channel.close() pika_connection.close() print(‘Pika Connection is close now’)
@app.task(queue=’test-queue1’) def wrapped_main(value): result = main(value) pika_messager(“Job finished ! Result is” + result) return result
[code lang=python] import os import time import pika
Pika - result messages
pika_cred = pika.PlainCredentials(‘guest’, ‘guest’) pika_conn_params = pika.ConnectionParameters(‘localhost’, credentials=pika_cred)
pika_connection = pika.BlockingConnection(pika_conn_params) channel = pika_connection.channel() pika_connection = pika.BlockingConnection(pika_conn_params) channel.queue_declare(queue=’test-queue1’)
sending messages
header = “Host: “ + str(os.uname()[1]) + “\tDatetime: “ + str(time.ctime()) + ‘\n\n’ channel.basic_publish(exchange=’test-exchange’, routing_key=’bluepill’, body=header + message) ###################################################################### [/code]