Discussion:
[celery-users] Concurrency with Consumers running in a Celery Worker
Anthony Lukach
2017-06-16 21:13:44 UTC
Permalink
I have a custom `bootstep.ConsumerStep` class that produces two consumers
to read of two separate queues and handle their messages. One of these
queues includes standard Celery-formatted tasks, however rather than
execute these tasks the Consumer simply records their properties in a
database. The other queue contains Celery-formatted result messages, and
records the state of the tasks stored in our database. The Celery worker
is actually not processing any tasks on its own. The custom ConsumerStep
is configured with my app via `app.steps['consumer'].add(MyConsumerStep`.

I'm using the Celery worker command rather than a custom script written
with Kombu as the Celery worker takes care of some things like setting up
the AsyncHub and comes with tooling for controlling concurrency (or so I
thought). Upon testing the command, it appears that the consumers don't
ever run concurrently regardless of the value of the `-c` argument
provided. Additionally, it's awkward to have the worker running and watch
a queue that will never have tasks.

* Am I correct that the ConsumerSteps don't respect the concurrency of a
worker?
* Is there a way to run Consumers concurrently via Celery?
* Am I better off avoiding Celery and just doing this with Kombu? If so,
are there any good examples of building in concurrency tooling into a Kombu
Consumer? Also, any examples of how to start up the AsyncHub in a Kombu
Consumer?
--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+***@googlegroups.com.
To post to this group, send email to celery-***@googlegroups.com.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
n***@gmail.com
2018-08-23 00:36:08 UTC
Permalink
Hi Anthony,

I'm having the same issue and to be frank, I wasn't expecting this
behavior. Currently, I'm using the 'kombu consumer' as the mediator to read
the vanilla message from the custom SQS queue (broker=SQS) and forwarding
it to the default broker queue for concurrent processing by celery. But
obviously, this is not the solution one would expect. Let me know if you
were able to solve the issue.

Thanks!
Post by Anthony Lukach
I have a custom `bootstep.ConsumerStep` class that produces two consumers
to read of two separate queues and handle their messages. One of these
queues includes standard Celery-formatted tasks, however rather than
execute these tasks the Consumer simply records their properties in a
database. The other queue contains Celery-formatted result messages, and
records the state of the tasks stored in our database. The Celery worker
is actually not processing any tasks on its own. The custom ConsumerStep
is configured with my app via `app.steps['consumer'].add(MyConsumerStep`.
I'm using the Celery worker command rather than a custom script written
with Kombu as the Celery worker takes care of some things like setting up
the AsyncHub and comes with tooling for controlling concurrency (or so I
thought). Upon testing the command, it appears that the consumers don't
ever run concurrently regardless of the value of the `-c` argument
provided. Additionally, it's awkward to have the worker running and watch
a queue that will never have tasks.
* Am I correct that the ConsumerSteps don't respect the concurrency of a
worker?
* Is there a way to run Consumers concurrently via Celery?
* Am I better off avoiding Celery and just doing this with Kombu? If so,
are there any good examples of building in concurrency tooling into a Kombu
Consumer? Also, any examples of how to start up the AsyncHub in a Kombu
Consumer?
--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+***@googlegroups.com.
To post to this group, send email to celery-***@googlegroups.com.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Josue B. C.
2018-09-07 17:51:15 UTC
Permalink
I've seen this question a few times and I haven't had time to dive into it.
The main thing is that when one does:
app.steps['consumer'].add(MyConsumer)
This is actually adding that step to the main thread. If you set logging to
debug you can see when you send a task to the queue the custom consumer
will process the Main Process is the one who takes care of it.
I believe in order to take advantage of concurrency one would have to tell
the main worker process to use a specific consumer class. The consumer
class to use is set by a default value in the
configuraitoin: http://docs.celeryproject.org/en/latest/userguide/configuration.html#worker-consumer
Post by n***@gmail.com
Hi Anthony,
I'm having the same issue and to be frank, I wasn't expecting this
behavior. Currently, I'm using the 'kombu consumer' as the mediator to read
the vanilla message from the custom SQS queue (broker=SQS) and forwarding
it to the default broker queue for concurrent processing by celery. But
obviously, this is not the solution one would expect. Let me know if you
were able to solve the issue.
Thanks!
Post by Anthony Lukach
I have a custom `bootstep.ConsumerStep` class that produces two consumers
to read of two separate queues and handle their messages. One of these
queues includes standard Celery-formatted tasks, however rather than
execute these tasks the Consumer simply records their properties in a
database. The other queue contains Celery-formatted result messages, and
records the state of the tasks stored in our database. The Celery worker
is actually not processing any tasks on its own. The custom ConsumerStep
is configured with my app via `app.steps['consumer'].add(MyConsumerStep`.
I'm using the Celery worker command rather than a custom script written
with Kombu as the Celery worker takes care of some things like setting up
the AsyncHub and comes with tooling for controlling concurrency (or so I
thought). Upon testing the command, it appears that the consumers don't
ever run concurrently regardless of the value of the `-c` argument
provided. Additionally, it's awkward to have the worker running and watch
a queue that will never have tasks.
* Am I correct that the ConsumerSteps don't respect the concurrency of a
worker?
* Is there a way to run Consumers concurrently via Celery?
* Am I better off avoiding Celery and just doing this with Kombu? If so,
are there any good examples of building in concurrency tooling into a Kombu
Consumer? Also, any examples of how to start up the AsyncHub in a Kombu
Consumer?
--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+***@googlegroups.com.
To post to this group, send email to celery-***@googlegroups.com.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Loading...