| 
  
   Size: 563 
  
  Comment:  
 | 
  
   Size: 7281 
  
  Comment:  
 | 
| Deletions are marked like this. | Additions are marked like this. | 
| Line 2: | Line 2: | 
https://www.upsolver.com/blog/kafka-versus-rabbitmq-architecture-performance-use-case RabbitMQ uses a '''push model''' where messages are immediately pushed to any subscribed consumer. RabbitMQ is a queue, so messages are removed once consumed and acknowledged.  | 
|
| Line 5: | Line 11: | 
|  * su * cd /tmp * wget http://slackbuilds.org/slackbuilds/13.0/development/erlang-otp.tar.gz * tar xvzf erlang-otp.tar.gz * cd erlang-otp * wget http://www.erlang.org/download/otp_src_R13B03.tar.gz * wget http://www.erlang.org/download/otp_doc_man_R13B03.tar.gz * ./erlang-otp.SlackBuild * installpkg  | 
{{{#!highlight sh su cd /tmp wget http://slackbuilds.org/slackbuilds/13.0/development/erlang-otp.tar.gz tar xvzf erlang-otp.tar.gz cd erlang-otp wget http://www.erlang.org/download/otp_src_R13B03.tar.gz wget http://www.erlang.org/download/otp_doc_man_R13B03.tar.gz ./erlang-otp.SlackBuild installpkg /tmp/erlang-otp-13B03-i486-1_SBo.tgz }}} == Slackbuild == {{{#!highlight sh wget http://slackbuilds.org/slackbuilds/14.1/development/erlang-otp.tar.gz tar xvzf erlang-otp.tar.gz cd erlang-otp wget http://www.erlang.org/download/otp_src_R16B02.tar.gz wget http://www.erlang.org/download/otp_doc_man_R16B02.tar.gz ./erlang-otp.SlackBuild installpkg /tmp/erlang-otp-16B02-x86_64-1_SBo.tgz }}}  | 
| Line 16: | Line 35: | 
| Get http://www.rabbitmq.com/releases/rabbitmq-server/v3.3.1/rabbitmq-server-generic-unix-3.3.1.tar.gz Extract it into /opt/rabbitmq  | 
{{{#!highlight sh cd /opt wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.3.1/rabbitmq-server-generic-unix-3.3.1.tar.gz tar xvzf rabbitmq-server-generic-unix-3.3.1.tar.gz ln -s rabbitmq_server-3.3.1 rabbitmq /opt/rabbitmq/sbin/rabbitmq-server }}} == Python client == {{{#!highlight sh pip install pika # python client }}} == Examples == http://www.rabbitmq.com/tutorials/tutorial-one-python.html === Producer === {{{#!highlight python #!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print " [x] Sent 'Hello World!'" connection.close() }}} === Consumer === {{{#!highlight python #!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) channel.basic_consume(callback, queue='hello', no_ack=True) channel.start_consuming() }}} == Topic == Publish/Subscribe, sent message goes to all topic subscribers. === produceTopic.py === {{{#!highlight python #!/usr/bin/env python import pika import sys if __name__=='__main__': exchangeName='broadcast' message = sys.argv[1] repetition= int( sys.argv[2] ) connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange=exchangeName, type='fanout') for count in range(1,repetition+1): msg='%s %d'%(message,count) channel.basic_publish(exchange=exchangeName, routing_key='', body=msg) print(" [x] Producer sent message %s to topic %s" % (msg , exchangeName) ) connection.close() }}} === consumeTopic.py === {{{#!highlight python #!/usr/bin/env python import pika import sys def consumeTopic(ch, method, properties, body): print(" [x] %s received message %s" % (sys.argv[1] , body ) ) if __name__=='__main__': topicName='broadcast' connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange=topicName, type='fanout') result = channel.queue_declare(exclusive=True) privateQueue = result.method.queue channel.queue_bind(exchange=topicName, queue=privateQueue) print(' [*] %s waiting for broadcasts. To exit press CTRL+C'%( sys.argv[1] ) ) channel.basic_consume(consumeTopic, queue=privateQueue, no_ack=True) channel.start_consuming() }}} == Queue == Several subscribers are bound to a queue. Each message is only consumed/processed by one of the subscribers. === produceQueue.py === {{{#!highlight python #!/usr/bin/env python import pika import sys if __name__=='__main__': queueName='hello' connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue=queueName) payload = sys.argv[1] maxCount = int(sys.argv[2]) + 1 for count in range(1,maxCount): msg = '%s %d!'%(payload , count) channel.basic_publish(exchange='', routing_key=queueName, body=msg ) print(" [x] Sent message '%s' to queue %s"%(msg , queueName) ) connection.close() }}} === consumeQueue.py === {{{#!highlight python #!/usr/bin/env python import pika import sys def messageHandler(ch, method, properties, body): print(" [x] %s received message %s" % (sys.argv[1], body) ) if __name__=='__main__': queueName='hello' consumerName = sys.argv[1] connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue=queueName) print(' [*] %s Waiting for messages. To exit press CTRL+C'%( consumerName ) ) channel.basic_consume(messageHandler, queue=queueName, no_ack=True) try: channel.start_consuming() except KeyboardInterrupt as ex: pass }}} == Enable STOMP in RabbitMQ == See [[STOMP]] and https://stomp.github.io/ . {{{#!highlight sh * rabbitmq-plugins enable rabbitmq_stomp * pip install stomp.py #pip2 pip3 }}} === stompProduce.py === {{{#!highlight python import time import sys import stomp if __name__=='__main__': if len(sys.argv)==3: conn = stomp.Connection([('127.0.0.1',61613)]) conn.start() conn.connect('rmq', 'rmq', wait=True) print('Sending %s to %s'%( sys.argv[2] , sys.argv[1] )) conn.send(body=sys.argv[2], destination=sys.argv[1]) conn.disconnect() else: print('Usage: stompProduce.py <destination /topic/topicx /queue/queuex> <message>') }}} === stompConsume.py === {{{#!highlight python import time import sys import stomp class MessageListener(stomp.ConnectionListener): def __init__(self,consumerName): self.consumerName = consumerName def on_error(self, headers, message): print('received an error "%s" %s ' %( message , headers )) def on_message(self, headers, message): print('%s received a message "%s"' % (self.consumerName, message )) if __name__=='__main__': if len(sys.argv)==3: conn = stomp.Connection([('127.0.0.1',61613)]) conn.set_listener('', MessageListener(sys.argv[1])) conn.start() conn.connect('rmq', 'rmq', wait=True) conn.subscribe(destination=sys.argv[2], id=1, ack='auto') print('Waiting for messages for %s'%(sys.argv[2] ) ) try: while(True): time.sleep(1) except KeyboardInterrupt as ex: pass conn.disconnect() print('Disconnected') else: print('Usage: stompComsume.pt <Consumer name> </topic/topicx or /queue/queuex to consume messages from>') }}}  | 
RabbitMQ
https://www.upsolver.com/blog/kafka-versus-rabbitmq-architecture-performance-use-case
RabbitMQ uses a push model where messages are immediately pushed to any subscribed consumer.
RabbitMQ is a queue, so messages are removed once consumed and acknowledged.
Erlang installation on Slackware
Requires Erlang:
   1 su
   2 cd /tmp
   3 wget http://slackbuilds.org/slackbuilds/13.0/development/erlang-otp.tar.gz
   4 tar xvzf erlang-otp.tar.gz
   5 cd erlang-otp
   6 wget http://www.erlang.org/download/otp_src_R13B03.tar.gz
   7 wget http://www.erlang.org/download/otp_doc_man_R13B03.tar.gz
   8 ./erlang-otp.SlackBuild
   9 installpkg /tmp/erlang-otp-13B03-i486-1_SBo.tgz
Slackbuild
   1 wget http://slackbuilds.org/slackbuilds/14.1/development/erlang-otp.tar.gz
   2 tar xvzf erlang-otp.tar.gz
   3 cd erlang-otp
   4 wget http://www.erlang.org/download/otp_src_R16B02.tar.gz
   5 wget http://www.erlang.org/download/otp_doc_man_R16B02.tar.gz
   6 ./erlang-otp.SlackBuild
   7 installpkg  /tmp/erlang-otp-16B02-x86_64-1_SBo.tgz
UNIX server installation
Python client
Examples
http://www.rabbitmq.com/tutorials/tutorial-one-python.html
Producer
   1 #!/usr/bin/env python
   2 import pika
   3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
   4 channel = connection.channel()
   5 channel.queue_declare(queue='hello')
   6 channel.basic_publish(exchange='',  routing_key='hello', body='Hello World!')
   7 print " [x] Sent 'Hello World!'"
   8 connection.close()
Consumer
   1 #!/usr/bin/env python
   2 import pika
   3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
   4 channel = connection.channel()
   5 channel.queue_declare(queue='hello')
   6 print ' [*] Waiting for messages. To exit press CTRL+C'
   7 
   8 def callback(ch, method, properties, body):
   9     print " [x] Received %r" % (body,)
  10 
  11 channel.basic_consume(callback, queue='hello', no_ack=True)
  12 channel.start_consuming()
Topic
Publish/Subscribe, sent message goes to all topic subscribers.
produceTopic.py
   1 #!/usr/bin/env python
   2 import pika
   3 import sys
   4 
   5 if __name__=='__main__':
   6     exchangeName='broadcast'
   7     message = sys.argv[1]
   8     repetition= int( sys.argv[2] )
   9 
  10     connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  11     channel = connection.channel()
  12     channel.exchange_declare(exchange=exchangeName, type='fanout')
  13     for count in range(1,repetition+1):
  14         msg='%s %d'%(message,count)
  15         channel.basic_publish(exchange=exchangeName, routing_key='', body=msg)
  16         print(" [x] Producer sent message %s to topic %s" % (msg , exchangeName) )
  17 
  18     connection.close()
consumeTopic.py
   1 #!/usr/bin/env python
   2 import pika
   3 import sys
   4 
   5 def consumeTopic(ch, method, properties, body):
   6     print(" [x] %s received message %s" % (sys.argv[1] ,  body  ) )
   7 
   8 if __name__=='__main__':
   9     topicName='broadcast'
  10     connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
  11     channel = connection.channel()
  12     channel.exchange_declare(exchange=topicName, type='fanout')
  13     result = channel.queue_declare(exclusive=True)
  14     privateQueue = result.method.queue
  15     channel.queue_bind(exchange=topicName, queue=privateQueue)
  16 
  17     print(' [*] %s waiting for broadcasts. To exit press CTRL+C'%( sys.argv[1] ) )
  18     channel.basic_consume(consumeTopic, queue=privateQueue, no_ack=True)
  19     channel.start_consuming()
Queue
Several subscribers are bound to a queue. Each message is only consumed/processed by one of the subscribers.
produceQueue.py
   1 #!/usr/bin/env python
   2 import pika
   3 import sys
   4 
   5 if __name__=='__main__':
   6     queueName='hello'
   7     connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
   8     channel = connection.channel()
   9     channel.queue_declare(queue=queueName)
  10     payload = sys.argv[1]
  11     maxCount = int(sys.argv[2]) + 1
  12 
  13     for count in range(1,maxCount):
  14         msg = '%s %d!'%(payload , count)
  15         channel.basic_publish(exchange='',  routing_key=queueName, body=msg  )
  16         print(" [x] Sent message '%s' to queue %s"%(msg , queueName) )
  17     connection.close()
consumeQueue.py
   1 #!/usr/bin/env python
   2 import pika
   3 import sys
   4 
   5 def messageHandler(ch, method, properties, body):
   6     print(" [x] %s received message %s" % (sys.argv[1], body) )
   7 
   8 
   9 if __name__=='__main__':
  10     queueName='hello'
  11     consumerName =  sys.argv[1]
  12     connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  13     channel = connection.channel()
  14     channel.queue_declare(queue=queueName)
  15     print(' [*] %s  Waiting for messages. To exit press CTRL+C'%( consumerName ) )
  16     channel.basic_consume(messageHandler, queue=queueName, no_ack=True)
  17     try:
  18         channel.start_consuming()
  19     except KeyboardInterrupt as ex:
  20         pass
Enable STOMP in RabbitMQ
See STOMP and https://stomp.github.io/ .
stompProduce.py
   1 import time
   2 import sys
   3 import stomp
   4 
   5 if __name__=='__main__':
   6     if len(sys.argv)==3:
   7         conn = stomp.Connection([('127.0.0.1',61613)])
   8         conn.start()
   9         conn.connect('rmq', 'rmq', wait=True)
  10         print('Sending %s to %s'%( sys.argv[2] , sys.argv[1] ))
  11         conn.send(body=sys.argv[2], destination=sys.argv[1])
  12         conn.disconnect()
  13     else:
  14         print('Usage: stompProduce.py <destination /topic/topicx /queue/queuex> <message>')
stompConsume.py
   1 import time
   2 import sys
   3 import stomp
   4 
   5 class MessageListener(stomp.ConnectionListener):
   6     def __init__(self,consumerName):
   7         self.consumerName = consumerName
   8     def on_error(self, headers, message):
   9         print('received an error "%s" %s ' %( message , headers ))
  10     def on_message(self, headers, message):
  11         print('%s received a message "%s"' % (self.consumerName, message ))
  12 
  13 if __name__=='__main__':
  14     if len(sys.argv)==3:
  15         conn = stomp.Connection([('127.0.0.1',61613)])
  16         conn.set_listener('', MessageListener(sys.argv[1]))
  17         conn.start()
  18         conn.connect('rmq', 'rmq', wait=True)
  19         conn.subscribe(destination=sys.argv[2], id=1, ack='auto')
  20         print('Waiting for messages for %s'%(sys.argv[2] ) )
  21         try:
  22             while(True):
  23                 time.sleep(1)
  24         except KeyboardInterrupt as ex:
  25             pass
  26         conn.disconnect()
  27         print('Disconnected')
  28     else:
  29         print('Usage: stompComsume.pt <Consumer name> </topic/topicx or /queue/queuex to consume messages from>')
