Sapna D'Souza
2017-05-11 00:07:42 UTC
hi,
I am trying to develop a system that broadcasts a task to all celery
workers after which tasks will be posted and picked up the regular way from
the queue in a distributed manner. however before picking up these tasks in
a distributed manner per worker, the broadcasted task should have run. I am
able to get the distributed tasks working, however i am unable to get the
broadcast to work where all workers can pick up the same broadcasted task.
i am on 3.1.25 as i need a version that works on windows and i am using
redis for backend and as the broker which is running on a linux machine
can someone please help me out with getting the broadcast to work?
here is my sample code
help is greatly appreciated!
from celery import Celery
from kombu.common import Broadcast,Queue, Exchange
app = Celery('tasks', broker='redis://myps:6380', backend='redis://myps:6380')
app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {'tasks.add': {'queue': 'broadcast_tasks'}}
app.conf.broker_transport_options = {'fanout_prefix': True}
app.conf.broker_transport_options = {'fanout_patterns': True}
app.conf.enable_utc = True
app.conf.task_serializer = "json"
app.conf.result_serializer = "json"
app.conf.accept_content = ['pickle', 'json', 'msgpack', 'yaml']
app.conf.worker_disable_rate_limits = True
app.conf.task_always_eager = False
app.conf.update(
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERY_QUEUES=(Queue('default',
Exchange('default'),
routing_key='default'),
Queue('low_priority',
Exchange('low_priority'),
routing_key='low_priority'),
Broadcast('broadcast_tasks',
routing_key='broadcast_tasks')
, ),
CELERY_ROUTES={'tasks.add':
{'queue': 'broadcast_tasks'},
'tasks.low_task':
{'queue': 'low_priority'},
},
CELERY_DEFAULT_QUEUE = 'default',
CELERY_DEFAULT_EXCHANGE = 'default',
CELERY_DEFAULT_ROUTING_KEY = 'default'
)
@app.task
def add(x, y):
return x + y
@app.task
def multiply(x, y):
return x * y
def low_task(string):
return string
I am trying to develop a system that broadcasts a task to all celery
workers after which tasks will be posted and picked up the regular way from
the queue in a distributed manner. however before picking up these tasks in
a distributed manner per worker, the broadcasted task should have run. I am
able to get the distributed tasks working, however i am unable to get the
broadcast to work where all workers can pick up the same broadcasted task.
i am on 3.1.25 as i need a version that works on windows and i am using
redis for backend and as the broker which is running on a linux machine
can someone please help me out with getting the broadcast to work?
here is my sample code
help is greatly appreciated!
from celery import Celery
from kombu.common import Broadcast,Queue, Exchange
app = Celery('tasks', broker='redis://myps:6380', backend='redis://myps:6380')
app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {'tasks.add': {'queue': 'broadcast_tasks'}}
app.conf.broker_transport_options = {'fanout_prefix': True}
app.conf.broker_transport_options = {'fanout_patterns': True}
app.conf.enable_utc = True
app.conf.task_serializer = "json"
app.conf.result_serializer = "json"
app.conf.accept_content = ['pickle', 'json', 'msgpack', 'yaml']
app.conf.worker_disable_rate_limits = True
app.conf.task_always_eager = False
app.conf.update(
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERY_QUEUES=(Queue('default',
Exchange('default'),
routing_key='default'),
Queue('low_priority',
Exchange('low_priority'),
routing_key='low_priority'),
Broadcast('broadcast_tasks',
routing_key='broadcast_tasks')
, ),
CELERY_ROUTES={'tasks.add':
{'queue': 'broadcast_tasks'},
'tasks.low_task':
{'queue': 'low_priority'},
},
CELERY_DEFAULT_QUEUE = 'default',
CELERY_DEFAULT_EXCHANGE = 'default',
CELERY_DEFAULT_ROUTING_KEY = 'default'
)
@app.task
def add(x, y):
return x + y
@app.task
def multiply(x, y):
return x * y
def low_task(string):
return string
--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+***@googlegroups.com.
To post to this group, send email to celery-***@googlegroups.com.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+***@googlegroups.com.
To post to this group, send email to celery-***@googlegroups.com.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.