MoinMoin Logo
  • Comments
  • Immutable Page
  • Menu
    • Navigation
    • RecentChanges
    • FindPage
    • Local Site Map
    • Help
    • HelpContents
    • HelpOnMoinWikiSyntax
    • Display
    • Attachments
    • Info
    • Raw Text
    • Print View
    • Edit
    • Load
    • Save
  • Login

Navigation

  • Start
  • Sitemap
Revision 12 as of 2021-08-01 00:49:27
  • Python
  • Celery

Celery

Celery is an asynchronous task queue/job queue based on distributed message passing.

http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html

Slackware 14 installation

  • easy_install celery
  • pip install celery

Check installation:

   1 python
   2 Python 2.7.3 (default, Jul  3 2012, 19:58:39) 
   3 [GCC 4.7.1] on linux2
   4 Type "help", "copyright", "credits" or "license" for more information.
   5 >>> import celery
   6 >>> celery.__version__
   7 '3.1.7'
   8 >>>

Test app with Redis

Run redis:

   1 $redis-server & 
   2 $redis-cli     
   3 redis 127.0.0.1:6379> quit

App:

   1 # celeryTest.py
   2 from celery import Celery
   3 
   4 app = Celery('tasks', broker='redis://localhost')
   5 app.conf.update(CELERY_RESULT_BACKEND="redis://")
   6 
   7 @app.task
   8 def add(x, y):
   9     return x + y

   1 $celery -A celeryTest worker --loglevel=info

Run task

   1 $ python
   2 Python 2.7.3 (default, Jul  3 2012, 19:58:39) 
   3 [GCC 4.7.1] on linux2
   4 Type "help", "copyright", "credits" or "license" for more information.
   5 >>> from celeryTest import add
   6 >>> xx=add.delay(3,3)
   7 >>> xx.ready()
   8 True
   9 >>> print(xx.result)
  10 6

Docker compose example

  • docker-compose stop
  • docker-compose build
  • docker-compose up

app.py

   1 import time
   2 import redis
   3 from flask import Flask,jsonify
   4 from celery import Celery
   5 
   6 celery = Celery()
   7 celery.config_from_object('celeryconfig')
   8 
   9 """
  10 docker-compose build
  11 docker-compose run
  12 docker exec -it celerytest_web_1 sh
  13 docker-compose buid
  14 docker-compose restart
  15 
  16 docker-compose stop
  17 docker system prune -a
  18 docker-compose up
  19 docker exec -it celerytest_web_1 sh
  20 pip freeze
  21 """
  22 
  23 app = Flask(__name__)
  24 cache = redis.Redis(host='redis', port=6379)
  25 
  26 def get_hit_count():
  27     retries = 5
  28     while True:
  29         try:
  30             return cache.incr('hits')
  31         except redis.exceptions.ConnectionError as exc:
  32             if retries == 0:
  33                 raise exc
  34             retries -= 1
  35             time.sleep(0.5)
  36 
  37 @app.route('/')
  38 def hello():
  39     count = get_hit_count()
  40     return 'Hello World!?= I have been seen {} times.\n'.format(count)
  41 
  42 @app.route('/add/<int:op1>/<int:op2>')
  43 def add(op1=None,op2=None):
  44     """
  45     http://localhost:5000/add/2/8
  46     """
  47     result=(celery.send_task('celery_worker.add', (op1,op2)))
  48     return jsonify( add_res=result.get())

Dockerfile-celery

FROM python:3.7-alpine
WORKDIR /code
RUN apk add --no-cache gcc musl-dev linux-headers
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
EXPOSE 5000
COPY . .
CMD ["celery","-A","celery_worker","worker","--loglevel=info"]

Dockerfile

FROM python:3.7-alpine
WORKDIR /code
ENV FLASK_APP=app.py
ENV FLASK_RUN_HOST=0.0.0.0
RUN apk add --no-cache gcc musl-dev linux-headers
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
EXPOSE 5000
COPY . .
CMD ["flask", "run"]

docker-compose.yml

   1 version: "3.3"
   2 services:
   3   web:
   4     build: .
   5     ports:
   6       - "5000:5000"
   7     volumes:
   8       - .:/code
   9     environment:
  10       FLASK_ENV: development
  11   redis:
  12     image: "redis:alpine"
  13   celery:
  14     build:
  15       context: .
  16       dockerfile: Dockerfile-celery

celery_client.py

   1 from celery import Celery
   2 
   3 def callback(taskid):
   4     print("callback taskid %s"%(taskid))
   5     print("callback ready %d task id %s"%(result.get() , result.task_id  ))
   6 
   7 celery = Celery()
   8 celery.config_from_object('celeryconfig')
   9 result=(celery.send_task('celery_worker.add', (2,2)))
  10 
  11 result.on_ready.then(callback) 
  12 print(result.get())

celery_worker.py

   1 from celery import Celery
   2 from celery import shared_task
   3 
   4 app = Celery('tasks', broker='redis://redis')
   5 app.conf.update(CELERY_RESULT_BACKEND="redis://redis")
   6 
   7 @app.task
   8 def add(x, y):
   9     return x + y
  10 
  11 @shared_task
  12 def mul(x,y):
  13     return x*y

requirements.txt

flask
redis
celery

celeryconfig.py

   1 broker_url = 'redis://redis'
   2 result_backend = 'redis://redis'
   3 
   4 task_serializer = 'json'
   5 result_serializer = 'json'
   6 accept_content = ['json']
   7 timezone = 'Europe/Lisbon'
   8 enable_utc = True
  • MoinMoin Powered
  • Python Powered
  • GPL licensed
  • Valid HTML 4.01