MoinMoin Logo
  • Comments
  • Immutable Page
  • Menu
    • Navigation
    • RecentChanges
    • FindPage
    • Local Site Map
    • Help
    • HelpContents
    • HelpOnMoinWikiSyntax
    • Display
    • Attachments
    • Info
    • Raw Text
    • Print View
    • Edit
    • Load
    • Save
  • Login

Navigation

  • Start
  • Sitemap

Upload page content

You can upload content for the page named below. If you change the page name, you can also upload content for another page. If the page name is empty, we derive the page name from the file name.

File to load page content from
Page name
Comment

  • Python
  • Celery

Celery

Celery is an asynchronous task queue/job queue based on distributed message passing.

http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html

Slackware 14 installation

  • easy_install celery
  • pip install celery

Check installation:

   1 python
   2 Python 2.7.3 (default, Jul  3 2012, 19:58:39) 
   3 [GCC 4.7.1] on linux2
   4 Type "help", "copyright", "credits" or "license" for more information.
   5 >>> import celery
   6 >>> celery.__version__
   7 '3.1.7'
   8 >>>

Test app with Redis

Run redis:

   1 $redis-server & 
   2 $redis-cli     
   3 redis 127.0.0.1:6379> quit

App:

   1 # celeryTest.py
   2 from celery import Celery
   3 
   4 app = Celery('tasks', broker='redis://localhost')
   5 app.conf.update(CELERY_RESULT_BACKEND="redis://")
   6 
   7 @app.task
   8 def add(x, y):
   9     return x + y

   1 $celery -A celeryTest worker --loglevel=info

Run task

   1 $ python
   2 Python 2.7.3 (default, Jul  3 2012, 19:58:39) 
   3 [GCC 4.7.1] on linux2
   4 Type "help", "copyright", "credits" or "license" for more information.
   5 >>> from celeryTest import add
   6 >>> xx=add.delay(3,3)
   7 >>> xx.ready()
   8 True
   9 >>> print(xx.result)
  10 6

Docker compose example

  • docker-compose stop
  • docker-compose build
  • docker-compose up

app.py

   1 import time
   2 import redis
   3 from flask import Flask,jsonify
   4 from celery import Celery
   5 
   6 celery = Celery()
   7 celery.config_from_object('celeryconfig')
   8 
   9 """
  10 docker-compose build
  11 docker-compose run
  12 docker exec -it celerytest_web_1 sh
  13 docker-compose buid
  14 docker-compose restart
  15 
  16 docker-compose stop
  17 docker system prune -a
  18 docker-compose up
  19 docker exec -it celerytest_web_1 sh
  20 pip freeze
  21 
  22 """
  23 
  24 app = Flask(__name__)
  25 cache = redis.Redis(host='redis', port=6379)
  26 
  27 def get_hit_count():
  28     retries = 5
  29     while True:
  30         try:
  31             return cache.incr('hits')
  32         except redis.exceptions.ConnectionError as exc:
  33             if retries == 0:
  34                 raise exc
  35             retries -= 1
  36             time.sleep(0.5)
  37 
  38 @app.route('/')
  39 def hello():
  40     count = get_hit_count()
  41     return 'Hello World!?= I have been seen {} times.\n'.format(count)
  42 
  43 @app.route('/add/<int:op1>/<int:op2>')
  44 def add(op1=None,op2=None):
  45     """
  46     http://localhost:5000/add/2/8
  47     """
  48     result=(celery.send_task('celery_worker.add', (op1,op2)))
  49     return jsonify( add_res=result.get())
  50 
  51 @app.route('/mul/<int:op1>/<int:op2>')
  52 def mul(op1=None,op2=None):
  53     """
  54     http://localhost:5000/mul/2/8
  55     """
  56     result=(celery.send_task('celery_worker.mul', (op1,op2)))
  57     return jsonify( add_res=result.get())

Dockerfile-celery

FROM python:3.7-alpine
WORKDIR /code
RUN apk add --no-cache gcc musl-dev linux-headers
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
EXPOSE 5000
COPY . .
CMD ["celery","-A","celery_worker","worker","--loglevel=info"]

Dockerfile

FROM python:3.7-alpine
WORKDIR /code
ENV FLASK_APP=app.py
ENV FLASK_RUN_HOST=0.0.0.0
RUN apk add --no-cache gcc musl-dev linux-headers
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
EXPOSE 5000
COPY . .
CMD ["flask", "run"]

docker-compose.yml

   1 version: "3.3"
   2 services:
   3   web:
   4     build: .
   5     ports:
   6       - "5000:5000"
   7     volumes:
   8       - .:/code
   9     environment:
  10       FLASK_ENV: development
  11   redis:
  12     image: "redis:alpine"
  13   celery:
  14     build:
  15       context: .
  16       dockerfile: Dockerfile-celery

celery_client.py

   1 from celery import Celery
   2 
   3 def callback(taskid):
   4     print("callback taskid %s"%(taskid))
   5     print("callback ready %d task id %s"%(result.get() , result.task_id  ))
   6 
   7 celery = Celery()
   8 celery.config_from_object('celeryconfig')
   9 result=(celery.send_task('celery_worker.add', (2,2)))
  10 
  11 result.on_ready.then(callback) 
  12 print(result.get())

celery_worker.py

   1 from celery import Celery
   2 from celery import shared_task
   3 
   4 app = Celery('tasks', broker='redis://redis')
   5 app.conf.update(CELERY_RESULT_BACKEND="redis://redis")
   6 
   7 @app.task
   8 def add(x, y):
   9     return x + y
  10 
  11 @shared_task
  12 def mul(x,y):
  13     return x*y

requirements.txt

flask
redis
celery

celeryconfig.py

   1 broker_url = 'redis://redis'
   2 result_backend = 'redis://redis'
   3 
   4 task_serializer = 'json'
   5 result_serializer = 'json'
   6 accept_content = ['json']
   7 timezone = 'Europe/Lisbon'
   8 enable_utc = True
  • MoinMoin Powered
  • Python Powered
  • GPL licensed
  • Valid HTML 4.01