Size: 4767
Comment:
|
← Revision 25 as of 2025-01-24 18:19:59 ⇥
Size: 7362
Comment:
|
Deletions are marked like this. | Additions are marked like this. |
Line 2: | Line 2: |
https://www.rabbitmq.com/ The most widely deployed open source message broker. https://www.upsolver.com/blog/kafka-versus-rabbitmq-architecture-performance-use-case RabbitMQ uses a '''push model''' where messages are immediately pushed to any subscribed consumer. RabbitMQ is a queue, so messages are removed once consumed and acknowledged. |
|
Line 5: | Line 16: |
* su * cd /tmp * wget http://slackbuilds.org/slackbuilds/13.0/development/erlang-otp.tar.gz * tar xvzf erlang-otp.tar.gz * cd erlang-otp * wget http://www.erlang.org/download/otp_src_R13B03.tar.gz * wget http://www.erlang.org/download/otp_doc_man_R13B03.tar.gz * ./erlang-otp.SlackBuild * installpkg /tmp/erlang-otp-13B03-i486-1_SBo.tgz |
{{{#!highlight sh su cd /tmp wget http://slackbuilds.org/slackbuilds/13.0/development/erlang-otp.tar.gz tar xvzf erlang-otp.tar.gz cd erlang-otp wget http://www.erlang.org/download/otp_src_R13B03.tar.gz wget http://www.erlang.org/download/otp_doc_man_R13B03.tar.gz ./erlang-otp.SlackBuild installpkg /tmp/erlang-otp-13B03-i486-1_SBo.tgz }}} |
Line 16: | Line 29: |
* wget http://slackbuilds.org/slackbuilds/14.1/development/erlang-otp.tar.gz * tar xvzf erlang-otp.tar.gz * cd erlang-otp * wget http://www.erlang.org/download/otp_src_R16B02.tar.gz * wget http://www.erlang.org/download/otp_doc_man_R16B02.tar.gz * ./erlang-otp.SlackBuild * installpkg /tmp/erlang-otp-16B02-x86_64-1_SBo.tgz |
{{{#!highlight sh wget http://slackbuilds.org/slackbuilds/14.1/development/erlang-otp.tar.gz tar xvzf erlang-otp.tar.gz cd erlang-otp wget http://www.erlang.org/download/otp_src_R16B02.tar.gz wget http://www.erlang.org/download/otp_doc_man_R16B02.tar.gz ./erlang-otp.SlackBuild installpkg /tmp/erlang-otp-16B02-x86_64-1_SBo.tgz }}} |
Line 25: | Line 40: |
* cd /opt * wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.3.1/rabbitmq-server-generic-unix-3.3.1.tar.gz * tar xvzf rabbitmq-server-generic-unix-3.3.1.tar.gz * ln -s rabbitmq_server-3.3.1 rabbitmq * /opt/rabbitmq/sbin/rabbitmq-server |
{{{#!highlight sh cd /opt wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.3.1/rabbitmq-server-generic-unix-3.3.1.tar.gz tar xvzf rabbitmq-server-generic-unix-3.3.1.tar.gz ln -s rabbitmq_server-3.3.1 rabbitmq /opt/rabbitmq/sbin/rabbitmq-server }}} |
Line 32: | Line 49: |
* pip install pika # python client | {{{#!highlight sh pip install pika # python client }}} |
Line 37: | Line 56: |
'''Producer''' | === Producer === |
Line 49: | Line 68: |
'''Consumer''' | === Consumer === |
Line 66: | Line 85: |
Publish/Subscribe. produceTopic.py |
Publish/Subscribe, sent message goes to all topic subscribers. === produceTopic.py === |
Line 77: | Line 96: |
repetition= int( sys.argv[2] ) | |
Line 81: | Line 101: |
channel.basic_publish(exchange=exchangeName, routing_key='', body=message) print(" [x] Producer sent message %s to topic %s" % (message , exchangeName) ) |
for count in range(1,repetition+1): msg='%s %d'%(message,count) channel.basic_publish(exchange=exchangeName, routing_key='', body=msg) print(" [x] Producer sent message %s to topic %s" % (msg , exchangeName) ) |
Line 87: | Line 109: |
consumeTopic.py | === consumeTopic.py === |
Line 111: | Line 133: |
Load balancer. Workers. produceQueue.py |
Several subscribers are bound to a queue. Each message is only consumed/processed by one of the subscribers. === produceQueue.py === |
Line 130: | Line 153: |
print " [x] Sent message '%s' to queue %s"%(msg , queueName) | print(" [x] Sent message '%s' to queue %s"%(msg , queueName) ) |
Line 134: | Line 157: |
consumeQueue.py | === consumeQueue.py === |
Line 141: | Line 164: |
print " [x] %s received message %s" % (sys.argv[1], body) | print(" [x] %s received message %s" % (sys.argv[1], body) ) |
Line 149: | Line 173: |
print ' [*] %s Waiting for messages. To exit press CTRL+C'%( consumerName ) | print(' [*] %s Waiting for messages. To exit press CTRL+C'%( consumerName ) ) |
Line 151: | Line 175: |
channel.start_consuming() }}} |
try: channel.start_consuming() except KeyboardInterrupt as ex: pass }}} == Enable STOMP in RabbitMQ == See [[STOMP]] and https://stomp.github.io/ . {{{#!highlight sh rabbitmq-plugins enable rabbitmq_stomp pip install stomp.py #pip2 pip3 }}} === stompProduce.py === {{{#!highlight python import time import sys import stomp if __name__=='__main__': if len(sys.argv)==3: conn = stomp.Connection([('127.0.0.1',61613)]) conn.start() conn.connect('rmq', 'rmq', wait=True) print('Sending %s to %s'%( sys.argv[2] , sys.argv[1] )) conn.send(body=sys.argv[2], destination=sys.argv[1]) conn.disconnect() else: print('Usage: stompProduce.py <destination /topic/topicx /queue/queuex> <message>') }}} === stompConsume.py === {{{#!highlight python import time import sys import stomp class MessageListener(stomp.ConnectionListener): def __init__(self,consumerName): self.consumerName = consumerName def on_error(self, headers, message): print('received an error "%s" %s ' %( message , headers )) def on_message(self, headers, message): print('%s received a message "%s"' % (self.consumerName, message )) if __name__=='__main__': if len(sys.argv)==3: conn = stomp.Connection([('127.0.0.1',61613)]) conn.set_listener('', MessageListener(sys.argv[1])) conn.start() conn.connect('rmq', 'rmq', wait=True) conn.subscribe(destination=sys.argv[2], id=1, ack='auto') print('Waiting for messages for %s'%(sys.argv[2] ) ) try: while(True): time.sleep(1) except KeyboardInterrupt as ex: pass conn.disconnect() print('Disconnected') else: print('Usage: stompComsume.pt <Consumer name> </topic/topicx or /queue/queuex to consume messages from>') }}} |
RabbitMQ
The most widely deployed open source message broker.
https://www.upsolver.com/blog/kafka-versus-rabbitmq-architecture-performance-use-case
RabbitMQ uses a push model where messages are immediately pushed to any subscribed consumer.
RabbitMQ is a queue, so messages are removed once consumed and acknowledged.
Erlang installation on Slackware
Requires Erlang:
1 su
2 cd /tmp
3 wget http://slackbuilds.org/slackbuilds/13.0/development/erlang-otp.tar.gz
4 tar xvzf erlang-otp.tar.gz
5 cd erlang-otp
6 wget http://www.erlang.org/download/otp_src_R13B03.tar.gz
7 wget http://www.erlang.org/download/otp_doc_man_R13B03.tar.gz
8 ./erlang-otp.SlackBuild
9 installpkg /tmp/erlang-otp-13B03-i486-1_SBo.tgz
Slackbuild
1 wget http://slackbuilds.org/slackbuilds/14.1/development/erlang-otp.tar.gz
2 tar xvzf erlang-otp.tar.gz
3 cd erlang-otp
4 wget http://www.erlang.org/download/otp_src_R16B02.tar.gz
5 wget http://www.erlang.org/download/otp_doc_man_R16B02.tar.gz
6 ./erlang-otp.SlackBuild
7 installpkg /tmp/erlang-otp-16B02-x86_64-1_SBo.tgz
UNIX server installation
Python client
Examples
http://www.rabbitmq.com/tutorials/tutorial-one-python.html
Producer
1 #!/usr/bin/env python
2 import pika
3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
4 channel = connection.channel()
5 channel.queue_declare(queue='hello')
6 channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
7 print " [x] Sent 'Hello World!'"
8 connection.close()
Consumer
1 #!/usr/bin/env python
2 import pika
3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
4 channel = connection.channel()
5 channel.queue_declare(queue='hello')
6 print ' [*] Waiting for messages. To exit press CTRL+C'
7
8 def callback(ch, method, properties, body):
9 print " [x] Received %r" % (body,)
10
11 channel.basic_consume(callback, queue='hello', no_ack=True)
12 channel.start_consuming()
Topic
Publish/Subscribe, sent message goes to all topic subscribers.
produceTopic.py
1 #!/usr/bin/env python
2 import pika
3 import sys
4
5 if __name__=='__main__':
6 exchangeName='broadcast'
7 message = sys.argv[1]
8 repetition= int( sys.argv[2] )
9
10 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
11 channel = connection.channel()
12 channel.exchange_declare(exchange=exchangeName, type='fanout')
13 for count in range(1,repetition+1):
14 msg='%s %d'%(message,count)
15 channel.basic_publish(exchange=exchangeName, routing_key='', body=msg)
16 print(" [x] Producer sent message %s to topic %s" % (msg , exchangeName) )
17
18 connection.close()
consumeTopic.py
1 #!/usr/bin/env python
2 import pika
3 import sys
4
5 def consumeTopic(ch, method, properties, body):
6 print(" [x] %s received message %s" % (sys.argv[1] , body ) )
7
8 if __name__=='__main__':
9 topicName='broadcast'
10 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
11 channel = connection.channel()
12 channel.exchange_declare(exchange=topicName, type='fanout')
13 result = channel.queue_declare(exclusive=True)
14 privateQueue = result.method.queue
15 channel.queue_bind(exchange=topicName, queue=privateQueue)
16
17 print(' [*] %s waiting for broadcasts. To exit press CTRL+C'%( sys.argv[1] ) )
18 channel.basic_consume(consumeTopic, queue=privateQueue, no_ack=True)
19 channel.start_consuming()
Queue
Several subscribers are bound to a queue. Each message is only consumed/processed by one of the subscribers.
produceQueue.py
1 #!/usr/bin/env python
2 import pika
3 import sys
4
5 if __name__=='__main__':
6 queueName='hello'
7 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
8 channel = connection.channel()
9 channel.queue_declare(queue=queueName)
10 payload = sys.argv[1]
11 maxCount = int(sys.argv[2]) + 1
12
13 for count in range(1,maxCount):
14 msg = '%s %d!'%(payload , count)
15 channel.basic_publish(exchange='', routing_key=queueName, body=msg )
16 print(" [x] Sent message '%s' to queue %s"%(msg , queueName) )
17 connection.close()
consumeQueue.py
1 #!/usr/bin/env python
2 import pika
3 import sys
4
5 def messageHandler(ch, method, properties, body):
6 print(" [x] %s received message %s" % (sys.argv[1], body) )
7
8
9 if __name__=='__main__':
10 queueName='hello'
11 consumerName = sys.argv[1]
12 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
13 channel = connection.channel()
14 channel.queue_declare(queue=queueName)
15 print(' [*] %s Waiting for messages. To exit press CTRL+C'%( consumerName ) )
16 channel.basic_consume(messageHandler, queue=queueName, no_ack=True)
17 try:
18 channel.start_consuming()
19 except KeyboardInterrupt as ex:
20 pass
Enable STOMP in RabbitMQ
See STOMP and https://stomp.github.io/ .
stompProduce.py
1 import time
2 import sys
3 import stomp
4
5 if __name__=='__main__':
6 if len(sys.argv)==3:
7 conn = stomp.Connection([('127.0.0.1',61613)])
8 conn.start()
9 conn.connect('rmq', 'rmq', wait=True)
10 print('Sending %s to %s'%( sys.argv[2] , sys.argv[1] ))
11 conn.send(body=sys.argv[2], destination=sys.argv[1])
12 conn.disconnect()
13 else:
14 print('Usage: stompProduce.py <destination /topic/topicx /queue/queuex> <message>')
stompConsume.py
1 import time
2 import sys
3 import stomp
4
5 class MessageListener(stomp.ConnectionListener):
6 def __init__(self,consumerName):
7 self.consumerName = consumerName
8 def on_error(self, headers, message):
9 print('received an error "%s" %s ' %( message , headers ))
10 def on_message(self, headers, message):
11 print('%s received a message "%s"' % (self.consumerName, message ))
12
13 if __name__=='__main__':
14 if len(sys.argv)==3:
15 conn = stomp.Connection([('127.0.0.1',61613)])
16 conn.set_listener('', MessageListener(sys.argv[1]))
17 conn.start()
18 conn.connect('rmq', 'rmq', wait=True)
19 conn.subscribe(destination=sys.argv[2], id=1, ack='auto')
20 print('Waiting for messages for %s'%(sys.argv[2] ) )
21 try:
22 while(True):
23 time.sleep(1)
24 except KeyboardInterrupt as ex:
25 pass
26 conn.disconnect()
27 print('Disconnected')
28 else:
29 print('Usage: stompComsume.pt <Consumer name> </topic/topicx or /queue/queuex to consume messages from>')