Size: 2019
Comment:
|
Size: 4767
Comment:
|
Deletions are marked like this. | Additions are marked like this. |
Line 37: | Line 37: |
{{{{{{#!highlight python | '''Producer''' {{{#!highlight python |
Line 48: | Line 49: |
'''Consumer''' | |
Line 62: | Line 64: |
== Topic == Publish/Subscribe. produceTopic.py {{{#!highlight python #!/usr/bin/env python import pika import sys if __name__=='__main__': exchangeName='broadcast' message = sys.argv[1] connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange=exchangeName, type='fanout') channel.basic_publish(exchange=exchangeName, routing_key='', body=message) print(" [x] Producer sent message %s to topic %s" % (message , exchangeName) ) connection.close() }}} consumeTopic.py {{{#!highlight python #!/usr/bin/env python import pika import sys def consumeTopic(ch, method, properties, body): print(" [x] %s received message %s" % (sys.argv[1] , body ) ) if __name__=='__main__': topicName='broadcast' connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange=topicName, type='fanout') result = channel.queue_declare(exclusive=True) privateQueue = result.method.queue channel.queue_bind(exchange=topicName, queue=privateQueue) print(' [*] %s waiting for broadcasts. To exit press CTRL+C'%( sys.argv[1] ) ) channel.basic_consume(consumeTopic, queue=privateQueue, no_ack=True) channel.start_consuming() }}} == Queue == Load balancer. Workers. produceQueue.py {{{#!highlight python #!/usr/bin/env python import pika import sys if __name__=='__main__': queueName='hello' connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue=queueName) payload = sys.argv[1] maxCount = int(sys.argv[2]) + 1 for count in range(1,maxCount): msg = '%s %d!'%(payload , count) channel.basic_publish(exchange='', routing_key=queueName, body=msg ) print " [x] Sent message '%s' to queue %s"%(msg , queueName) connection.close() }}} consumeQueue.py {{{#!highlight python #!/usr/bin/env python import pika import sys def messageHandler(ch, method, properties, body): print " [x] %s received message %s" % (sys.argv[1], body) if __name__=='__main__': queueName='hello' consumerName = sys.argv[1] connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue=queueName) print ' [*] %s Waiting for messages. To exit press CTRL+C'%( consumerName ) channel.basic_consume(messageHandler, queue=queueName, no_ack=True) channel.start_consuming() }}} |
RabbitMQ
Erlang installation on Slackware
Requires Erlang:
- su
- cd /tmp
wget http://slackbuilds.org/slackbuilds/13.0/development/erlang-otp.tar.gz
- tar xvzf erlang-otp.tar.gz
- cd erlang-otp
wget http://www.erlang.org/download/otp_doc_man_R13B03.tar.gz
./erlang-otp.SlackBuild
- installpkg /tmp/erlang-otp-13B03-i486-1_SBo.tgz
Slackbuild
wget http://slackbuilds.org/slackbuilds/14.1/development/erlang-otp.tar.gz
- tar xvzf erlang-otp.tar.gz
- cd erlang-otp
wget http://www.erlang.org/download/otp_doc_man_R16B02.tar.gz
./erlang-otp.SlackBuild
- installpkg /tmp/erlang-otp-16B02-x86_64-1_SBo.tgz
UNIX server installation
- cd /opt
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.3.1/rabbitmq-server-generic-unix-3.3.1.tar.gz
- tar xvzf rabbitmq-server-generic-unix-3.3.1.tar.gz
- ln -s rabbitmq_server-3.3.1 rabbitmq
- /opt/rabbitmq/sbin/rabbitmq-server
Python client
- pip install pika # python client
Examples
http://www.rabbitmq.com/tutorials/tutorial-one-python.html
Producer
1 #!/usr/bin/env python
2 import pika
3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
4 channel = connection.channel()
5 channel.queue_declare(queue='hello')
6 channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
7 print " [x] Sent 'Hello World!'"
8 connection.close()
Consumer
1 #!/usr/bin/env python
2 import pika
3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
4 channel = connection.channel()
5 channel.queue_declare(queue='hello')
6 print ' [*] Waiting for messages. To exit press CTRL+C'
7
8 def callback(ch, method, properties, body):
9 print " [x] Received %r" % (body,)
10
11 channel.basic_consume(callback, queue='hello', no_ack=True)
12 channel.start_consuming()
Topic
Publish/Subscribe.
produceTopic.py
1 #!/usr/bin/env python
2 import pika
3 import sys
4
5 if __name__=='__main__':
6 exchangeName='broadcast'
7 message = sys.argv[1]
8
9 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
10 channel = connection.channel()
11 channel.exchange_declare(exchange=exchangeName, type='fanout')
12
13 channel.basic_publish(exchange=exchangeName, routing_key='', body=message)
14 print(" [x] Producer sent message %s to topic %s" % (message , exchangeName) )
15 connection.close()
consumeTopic.py
1 #!/usr/bin/env python
2 import pika
3 import sys
4
5 def consumeTopic(ch, method, properties, body):
6 print(" [x] %s received message %s" % (sys.argv[1] , body ) )
7
8 if __name__=='__main__':
9 topicName='broadcast'
10 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
11 channel = connection.channel()
12 channel.exchange_declare(exchange=topicName, type='fanout')
13 result = channel.queue_declare(exclusive=True)
14 privateQueue = result.method.queue
15 channel.queue_bind(exchange=topicName, queue=privateQueue)
16
17 print(' [*] %s waiting for broadcasts. To exit press CTRL+C'%( sys.argv[1] ) )
18 channel.basic_consume(consumeTopic, queue=privateQueue, no_ack=True)
19 channel.start_consuming()
Queue
Load balancer. Workers.
produceQueue.py
1 #!/usr/bin/env python
2 import pika
3 import sys
4
5 if __name__=='__main__':
6 queueName='hello'
7 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
8 channel = connection.channel()
9 channel.queue_declare(queue=queueName)
10 payload = sys.argv[1]
11 maxCount = int(sys.argv[2]) + 1
12
13 for count in range(1,maxCount):
14 msg = '%s %d!'%(payload , count)
15 channel.basic_publish(exchange='', routing_key=queueName, body=msg )
16 print " [x] Sent message '%s' to queue %s"%(msg , queueName)
17 connection.close()
consumeQueue.py
1 #!/usr/bin/env python
2 import pika
3 import sys
4
5 def messageHandler(ch, method, properties, body):
6 print " [x] %s received message %s" % (sys.argv[1], body)
7
8 if __name__=='__main__':
9 queueName='hello'
10 consumerName = sys.argv[1]
11 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
12 channel = connection.channel()
13 channel.queue_declare(queue=queueName)
14 print ' [*] %s Waiting for messages. To exit press CTRL+C'%( consumerName )
15 channel.basic_consume(messageHandler, queue=queueName, no_ack=True)
16 channel.start_consuming()