Celery

Celery is an asynchronous task queue/job queue based on distributed message passing.

http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html

Slackware 14 installation

Check installation:

   1 python
   2 Python 2.7.3 (default, Jul  3 2012, 19:58:39) 
   3 [GCC 4.7.1] on linux2
   4 Type "help", "copyright", "credits" or "license" for more information.
   5 >>> import celery
   6 >>> celery.__version__
   7 '3.1.7'
   8 >>>

Test app with Redis

Run redis:

   1 $redis-server & 
   2 $redis-cli     
   3 redis 127.0.0.1:6379> quit

App:

   1 # celeryTest.py
   2 from celery import Celery
   3 
   4 app = Celery('tasks', broker='redis://localhost')
   5 app.conf.update(CELERY_RESULT_BACKEND="redis://")
   6 
   7 @app.task
   8 def add(x, y):
   9     return x + y

   1 $celery -A celeryTest worker --loglevel=info

Run task

   1 $ python
   2 Python 2.7.3 (default, Jul  3 2012, 19:58:39) 
   3 [GCC 4.7.1] on linux2
   4 Type "help", "copyright", "credits" or "license" for more information.
   5 >>> from celeryTest import add
   6 >>> xx=add.delay(3,3)
   7 >>> xx.ready()
   8 True
   9 >>> print(xx.result)
  10 6

Docker compose example

app.py

   1 import time
   2 import redis
   3 from flask import Flask,jsonify
   4 from celery import Celery
   5 
   6 celery = Celery()
   7 celery.config_from_object('celeryconfig')
   8 
   9 """
  10 docker-compose build
  11 docker-compose run
  12 docker exec -it celerytest_web_1 sh
  13 docker-compose buid
  14 docker-compose restart
  15 
  16 docker-compose stop
  17 docker system prune -a
  18 docker-compose up
  19 docker exec -it celerytest_web_1 sh
  20 pip freeze
  21 
  22 """
  23 
  24 app = Flask(__name__)
  25 cache = redis.Redis(host='redis', port=6379)
  26 
  27 def get_hit_count():
  28     retries = 5
  29     while True:
  30         try:
  31             return cache.incr('hits')
  32         except redis.exceptions.ConnectionError as exc:
  33             if retries == 0:
  34                 raise exc
  35             retries -= 1
  36             time.sleep(0.5)
  37 
  38 @app.route('/')
  39 def hello():
  40     count = get_hit_count()
  41     return 'Hello World!?= I have been seen {} times.\n'.format(count)
  42 
  43 @app.route('/add/<int:op1>/<int:op2>')
  44 def add(op1=None,op2=None):
  45     """
  46     http://localhost:5000/add/2/8
  47     """
  48     result=(celery.send_task('celery_worker.add', (op1,op2)))
  49     return jsonify( add_res=result.get())
  50 
  51 @app.route('/mul/<int:op1>/<int:op2>')
  52 def mul(op1=None,op2=None):
  53     """
  54     http://localhost:5000/mul/2/8
  55     """
  56     result=(celery.send_task('celery_worker.mul', (op1,op2)))
  57     return jsonify( add_res=result.get())

Dockerfile-celery

FROM python:3.7-alpine
WORKDIR /code
RUN apk add --no-cache gcc musl-dev linux-headers
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
EXPOSE 5000
COPY . .
CMD ["celery","-A","celery_worker","worker","--loglevel=info"]

Dockerfile

FROM python:3.7-alpine
WORKDIR /code
ENV FLASK_APP=app.py
ENV FLASK_RUN_HOST=0.0.0.0
RUN apk add --no-cache gcc musl-dev linux-headers
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
EXPOSE 5000
COPY . .
CMD ["flask", "run"]

docker-compose.yml

   1 version: "3.3"
   2 services:
   3   web:
   4     build: .
   5     ports:
   6       - "5000:5000"
   7     volumes:
   8       - .:/code
   9     environment:
  10       FLASK_ENV: development
  11   redis:
  12     image: "redis:alpine"
  13   celery:
  14     build:
  15       context: .
  16       dockerfile: Dockerfile-celery

celery_client.py

   1 from celery import Celery
   2 
   3 def callback(taskid):
   4     print("callback taskid %s"%(taskid))
   5     print("callback ready %d task id %s"%(result.get() , result.task_id  ))
   6 
   7 celery = Celery()
   8 celery.config_from_object('celeryconfig')
   9 result=(celery.send_task('celery_worker.add', (2,2)))
  10 
  11 result.on_ready.then(callback) 
  12 print(result.get())

celery_worker.py

   1 from celery import Celery
   2 from celery import shared_task
   3 
   4 app = Celery('tasks', broker='redis://redis')
   5 app.conf.update(CELERY_RESULT_BACKEND="redis://redis")
   6 
   7 @app.task
   8 def add(x, y):
   9     return x + y
  10 
  11 @shared_task
  12 def mul(x,y):
  13     return x*y

requirements.txt

flask
redis
celery

celeryconfig.py

   1 broker_url = 'redis://redis'
   2 result_backend = 'redis://redis'
   3 
   4 task_serializer = 'json'
   5 result_serializer = 'json'
   6 accept_content = ['json']
   7 timezone = 'Europe/Lisbon'
   8 enable_utc = True

Python/Celery (last edited 2021-08-01 00:50:21 by localhost)