Grisha Klots
2016-07-21 15:16:09 UTC
Hi all,
We're trying to implement a *Publish / Subscribe* infrastructure with
RabbitMQ.
When I say Publish / Subscribe, I mean something like the following:
1. a process may "send" a message to a specific topic (topic is a string)
2. another process may "listen" to this topic and call a method
(callback) whenever a message of said topic is "heard".
3. the sender doesn't need to know anything about the receiver process,
not even if such a process exists.
4. Messages that are not "consumed" by anyone are forever lost.
So far, we've had very good success using the Pika library.
I would like to know whether it is possible to implement a publish /
subscribe pattern using Celery only. Any code examples would be great! I
tried to subclass the ConsumerStep class but for some reason it doesn't
work. Here' the full code:
(this is the only example I was able to find and it doesn't seem to work)
Thanks!
*from* time *import* sleep
*from* celery *import* Celery
*from* celery *import* bootsteps
*from* kombu *import* Consumer, Exchange, Queue
my_queue = Queue('custom', Exchange('custom'), 'routing_key')
app = Celery(broker='amqp://localhost')
*class* MyConsumerStep(bootsteps.ConsumerStep):
*def* get_consumers(*self*, channel):
*return* [Consumer(channel,
queues=[my_queue],
callbacks=[self.handle_message],
accept=None)]
*def* handle_message(*self*, body, message):
print('Received message: {0!r}'.format(body))
message.ack()
app.steps['consumer'].add(MyConsumerStep)
*def* send_me_a_message(who='world!', producer=None):
with app.producer_or_acquire(producer) as producer:
producer.publish_task(
{'hello': who},
serializer='json',
exchange=my_queue.exchange,
routing_key='routing_key',
declare=[my_queue],
retry=True,
)
*if* __name__ == '__main__':
send_me_a_message('celery')
sleep(10)
We're trying to implement a *Publish / Subscribe* infrastructure with
RabbitMQ.
When I say Publish / Subscribe, I mean something like the following:
1. a process may "send" a message to a specific topic (topic is a string)
2. another process may "listen" to this topic and call a method
(callback) whenever a message of said topic is "heard".
3. the sender doesn't need to know anything about the receiver process,
not even if such a process exists.
4. Messages that are not "consumed" by anyone are forever lost.
So far, we've had very good success using the Pika library.
I would like to know whether it is possible to implement a publish /
subscribe pattern using Celery only. Any code examples would be great! I
tried to subclass the ConsumerStep class but for some reason it doesn't
work. Here' the full code:
(this is the only example I was able to find and it doesn't seem to work)
Thanks!
*from* time *import* sleep
*from* celery *import* Celery
*from* celery *import* bootsteps
*from* kombu *import* Consumer, Exchange, Queue
my_queue = Queue('custom', Exchange('custom'), 'routing_key')
app = Celery(broker='amqp://localhost')
*class* MyConsumerStep(bootsteps.ConsumerStep):
*def* get_consumers(*self*, channel):
*return* [Consumer(channel,
queues=[my_queue],
callbacks=[self.handle_message],
accept=None)]
*def* handle_message(*self*, body, message):
print('Received message: {0!r}'.format(body))
message.ack()
app.steps['consumer'].add(MyConsumerStep)
*def* send_me_a_message(who='world!', producer=None):
with app.producer_or_acquire(producer) as producer:
producer.publish_task(
{'hello': who},
serializer='json',
exchange=my_queue.exchange,
routing_key='routing_key',
declare=[my_queue],
retry=True,
)
*if* __name__ == '__main__':
send_me_a_message('celery')
sleep(10)
--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+***@googlegroups.com.
To post to this group, send email to celery-***@googlegroups.com.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+***@googlegroups.com.
To post to this group, send email to celery-***@googlegroups.com.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.