Discussion:
[celery-users] Publish / Subscribe with Celery
Grisha Klots
2016-07-21 15:16:09 UTC
Permalink
Hi all,

We're trying to implement a *Publish / Subscribe* infrastructure with
RabbitMQ.
When I say Publish / Subscribe, I mean something like the following:

1. a process may "send" a message to a specific topic (topic is a string)
2. another process may "listen" to this topic and call a method
(callback) whenever a message of said topic is "heard".
3. the sender doesn't need to know anything about the receiver process,
not even if such a process exists.
4. Messages that are not "consumed" by anyone are forever lost.

So far, we've had very good success using the Pika library.
I would like to know whether it is possible to implement a publish /
subscribe pattern using Celery only. Any code examples would be great! I
tried to subclass the ConsumerStep class but for some reason it doesn't
work. Here' the full code:
(this is the only example I was able to find and it doesn't seem to work)

Thanks!


*from* time *import* sleep
*from* celery *import* Celery
*from* celery *import* bootsteps
*from* kombu *import* Consumer, Exchange, Queue

my_queue = Queue('custom', Exchange('custom'), 'routing_key')

app = Celery(broker='amqp://localhost')


*class* MyConsumerStep(bootsteps.ConsumerStep):

*def* get_consumers(*self*, channel):
*return* [Consumer(channel,
queues=[my_queue],
callbacks=[self.handle_message],
accept=None)]

*def* handle_message(*self*, body, message):
print('Received message: {0!r}'.format(body))
message.ack()

app.steps['consumer'].add(MyConsumerStep)


*def* send_me_a_message(who='world!', producer=None):

with app.producer_or_acquire(producer) as producer:
producer.publish_task(
{'hello': who},
serializer='json',
exchange=my_queue.exchange,
routing_key='routing_key',
declare=[my_queue],
retry=True,
)

*if* __name__ == '__main__':
send_me_a_message('celery')
sleep(10)
--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+***@googlegroups.com.
To post to this group, send email to celery-***@googlegroups.com.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Stephen Fuhry
2016-07-22 02:08:00 UTC
Permalink
If you hook up celery with a result backend, I don't think you need to do
any of this complicated stuff.


Your subscriber would be a worker that runs this task:

@celery.task def do_the_thing(a, b): return a + b
and your publisher can simply publish a task like this:

task = do_the_thing.delay(1, 2) result = task.get()

Or if you want to publish remotely:

task = celery.send_task('do_the_thing', args=(1, 2)) result = task.get()


In celery 4.0, which isn't released yet, redis uses pub/sub, so with a
rabbitmq / redis backend, this should be really fast. However, currently in
3.1, .get() will poll redis at an interval, so it will be slow. There are
workarounds to this though.

http://docs.celeryproject.org/en/master/whatsnew-4.0.html#rpc-is-now-using-pub-sub-for-streaming-task-results

Hope I'm not misunderstanding your question!

-Steve
Post by Grisha Klots
Hi all,
We're trying to implement a *Publish / Subscribe* infrastructure with
RabbitMQ.
1. a process may "send" a message to a specific topic (topic is a string)
2. another process may "listen" to this topic and call a method
(callback) whenever a message of said topic is "heard".
3. the sender doesn't need to know anything about the receiver
process, not even if such a process exists.
4. Messages that are not "consumed" by anyone are forever lost.
So far, we've had very good success using the Pika library.
I would like to know whether it is possible to implement a publish /
subscribe pattern using Celery only. Any code examples would be great! I
tried to subclass the ConsumerStep class but for some reason it doesn't
(this is the only example I was able to find and it doesn't seem to work)
Thanks!
*from* time *import* sleep
*from* celery *import* Celery
*from* celery *import* bootsteps
*from* kombu *import* Consumer, Exchange, Queue
my_queue = Queue('custom', Exchange('custom'), 'routing_key')
app = Celery(broker='amqp://localhost')
*return* [Consumer(channel,
queues=[my_queue],
callbacks=[self.handle_message],
accept=None)]
print('Received message: {0!r}'.format(body))
message.ack()
app.steps['consumer'].add(MyConsumerStep)
producer.publish_task(
{'hello': who},
serializer='json',
exchange=my_queue.exchange,
routing_key='routing_key',
declare=[my_queue],
retry=True,
)
send_me_a_message('celery')
sleep(10)
--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+***@googlegroups.com.
To post to this group, send email to celery-***@googlegroups.com.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Continue reading on narkive:
Loading...