Discussion:
[celery-users] Celery-Redis broadcast not working
Sapna D'Souza
2017-05-11 00:07:42 UTC
Permalink
hi,

I am trying to develop a system that broadcasts a task to all celery
workers after which tasks will be posted and picked up the regular way from
the queue in a distributed manner. however before picking up these tasks in
a distributed manner per worker, the broadcasted task should have run. I am
able to get the distributed tasks working, however i am unable to get the
broadcast to work where all workers can pick up the same broadcasted task.

i am on 3.1.25 as i need a version that works on windows and i am using
redis for backend and as the broker which is running on a linux machine

can someone please help me out with getting the broadcast to work?

here is my sample code

help is greatly appreciated!

from celery import Celery
from kombu.common import Broadcast,Queue, Exchange

app = Celery('tasks', broker='redis://myps:6380', backend='redis://myps:6380')

app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {'tasks.add': {'queue': 'broadcast_tasks'}}
app.conf.broker_transport_options = {'fanout_prefix': True}
app.conf.broker_transport_options = {'fanout_patterns': True}
app.conf.enable_utc = True
app.conf.task_serializer = "json"
app.conf.result_serializer = "json"
app.conf.accept_content = ['pickle', 'json', 'msgpack', 'yaml']
app.conf.worker_disable_rate_limits = True
app.conf.task_always_eager = False

app.conf.update(
CELERY_ACCEPT_CONTENT=['json'],

CELERY_TASK_SERIALIZER='json',

CELERY_RESULT_SERIALIZER='json',

CELERY_QUEUES=(Queue('default',

Exchange('default'),

routing_key='default'),

Queue('low_priority',

Exchange('low_priority'),

routing_key='low_priority'),

Broadcast('broadcast_tasks',

routing_key='broadcast_tasks')
, ),

CELERY_ROUTES={'tasks.add':

{'queue': 'broadcast_tasks'},

'tasks.low_task':

{'queue': 'low_priority'},

},

CELERY_DEFAULT_QUEUE = 'default',

CELERY_DEFAULT_EXCHANGE = 'default',

CELERY_DEFAULT_ROUTING_KEY = 'default'
)

@app.task
def add(x, y):
return x + y

@app.task
def multiply(x, y):
return x * y

def low_task(string):
return string
--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+***@googlegroups.com.
To post to this group, send email to celery-***@googlegroups.com.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
m***@eisti.eu
2017-05-11 10:28:59 UTC
Permalink
If someone have the answer, I'm also interested
Post by Sapna D'Souza
hi,
I am trying to develop a system that broadcasts a task to all celery
workers after which tasks will be posted and picked up the regular way from
the queue in a distributed manner. however before picking up these tasks in
a distributed manner per worker, the broadcasted task should have run. I am
able to get the distributed tasks working, however i am unable to get the
broadcast to work where all workers can pick up the same broadcasted task.
i am on 3.1.25 as i need a version that works on windows and i am using
redis for backend and as the broker which is running on a linux machine
can someone please help me out with getting the broadcast to work?
here is my sample code
help is greatly appreciated!
from celery import Celery
from kombu.common import Broadcast,Queue, Exchange
app = Celery('tasks', broker='redis://myps:6380', backend='redis://myps:6380')
app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {'tasks.add': {'queue': 'broadcast_tasks'}}
app.conf.broker_transport_options = {'fanout_prefix': True}
app.conf.broker_transport_options = {'fanout_patterns': True}
app.conf.enable_utc = True
app.conf.task_serializer = "json"
app.conf.result_serializer = "json"
app.conf.accept_content = ['pickle', 'json', 'msgpack', 'yaml']
app.conf.worker_disable_rate_limits = True
app.conf.task_always_eager = False
app.conf.update(
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERY_QUEUES=(Queue('default',
Exchange('default'),
routing_key='default'),
Queue('low_priority',
Exchange('low_priority'),
routing_key='low_priority'),
Broadcast('broadcast_tasks',
routing_key='broadcast_tasks')
, ),
{'queue': 'broadcast_tasks'},
{'queue': 'low_priority'},
},
CELERY_DEFAULT_QUEUE = 'default',
CELERY_DEFAULT_EXCHANGE = 'default',
CELERY_DEFAULT_ROUTING_KEY = 'default'
)
@app.task
return x + y
@app.task
return x * y
return string
--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+***@googlegroups.com.
To post to this group, send email to celery-***@googlegroups.com.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Loading...