Stephan Doliov
2018-02-12 17:16:53 UTC
Hi,
I just started to play with both RabbitMQ and Celery, and I ran into a
problem that seemed strange to me.
My setup is
Ubuntu 16.04
Python 3.6.4
Celery 4.1
RabbitMQ 3.7.3
I created a direct queue using pika (following pika docs):
import sys
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host=
'localhost'))
channel = connection.channel()
channel.queue_declare(queue='getting_started')
channel.basic_publish(exchange='',
routing_key='steve_testing',
body=" ".join(sys.argv[1:]))
print(" [x] Sent ", " ".join(sys.argv[1:]))
I published messages to this queue (and simple receiver program echoed the
message). The rabbitmq admin shows me that this queue is not durable (pika
uses durable=False as default)
Now, when I tried to define my Celery app, I received the following (code
first, then will show the error I received:
celery.py
from __future__ import absolute_import, unicode_literals
import os
import time
from kombu import Queue
from celery import Celery
from celery import shared_task
from celery.utils.log import get_task_logger
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'pg3.settings')
app = Celery('pg3')
app.conf.broker_url = 'amqp://localhost:5672'
app.conf.task_default_queue = 'getting_started'
app.conf.task_default_routing_key = 'getting_started'
app.conf.task_always_eager = False
app.conf.task_default_exchange_type = 'direct'
app.conf.accept_content = ['json', 'application/json']
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
logger = get_task_logger(__name__)
@app.task(bind=False)
def do_something_dumb():
time.sleep(5)
logger.info("Something dumb being done")
return 1
Launching celery with the above code with
celery -E -A pg3 worker --loglevel=info
yields the following stack trace (last few lines only shown):
File
"/home/steve/.virtualenvs/pg3/lib/python3.6/site-packages/amqp/connection.py"
, line 481, in on_inbound_method
method_sig, payload, content,
File
"/home/steve/.virtualenvs/pg3/lib/python3.6/site-packages/amqp/abstract_channel.py"
, line 128, in dispatch_method
listener(*args)
File
"/home/steve/.virtualenvs/pg3/lib/python3.6/site-packages/amqp/channel.py",
line 279, in _on_close
reply_code, reply_text, (class_id, method_id), ChannelError,
amqp.exceptions.PreconditionFailed: Queue.declare: (406)
PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'getting_started'
in vhost '/': received 'true' but current is 'false'
So, after much gnashing of teeth, I figured out that pika creates the queue
as non-durable by default, yet Celery wants to do the opposite.
It *seems* to me that Celery isn't honoring what (little) I currently
understand about AMQP, Rabbit, pika and Celery. The docs I had read said if
a queue is inadvertantly redeclared, that should be a no-op. But it also
appears that Celery is trying to declare the queue, in this case
getting_started
with a parameter of durable = "True" and then RabbitMQ rightfully complains
that a queue cannot be both durable and non durable.
So, my questions are:
1) Should Celery honor pre-existing queues that are not default, but
direct? (In otherwords, should it test for a queue's existence before
trying to create it with its own set of default parameters
2) Is it shown somewhere in the Celery docs where I could let Celery no
that this already existing queue is not durable; and if so, how would I do
that?
3) Are the AMQP docs
<http://amqp.readthedocs.io/en/latest/reference/amqp.channel.html#amqp.channel.Channel.queue_declare>
in error to show that durable gets initialized to false?
4) Following the link in the AMQP docs to the source code of queue_declare
<http://amqp.readthedocs.io/en/latest/_modules/amqp/channel.html#Channel.queue_declare> states
quite clearly:
<Loading Image...
>
Does this mean that Celery has a bug since it is breaking the rule of
"server Must ignore the durable field if the queue already exists" or does
this mean there is a bug in RabbitMQ (as it is the server that perhaps is
not ignoring the rule?
Thanks for taking the time. I have worked around the problem by having
Celery create the queues I want to create first, and then I allow pika to
connect to them with it's defaults without breaking the behavior. The fact
that it only breaks when first creating a durable queue with Celery and
then successfully connecting to said queue with pika using pika's defaults
leads me to believe it's a bug on Celery's end but I am not sure if I am
right in this.
Steve
I just started to play with both RabbitMQ and Celery, and I ran into a
problem that seemed strange to me.
My setup is
Ubuntu 16.04
Python 3.6.4
Celery 4.1
RabbitMQ 3.7.3
I created a direct queue using pika (following pika docs):
import sys
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host=
'localhost'))
channel = connection.channel()
channel.queue_declare(queue='getting_started')
channel.basic_publish(exchange='',
routing_key='steve_testing',
body=" ".join(sys.argv[1:]))
print(" [x] Sent ", " ".join(sys.argv[1:]))
I published messages to this queue (and simple receiver program echoed the
message). The rabbitmq admin shows me that this queue is not durable (pika
uses durable=False as default)
Now, when I tried to define my Celery app, I received the following (code
first, then will show the error I received:
celery.py
from __future__ import absolute_import, unicode_literals
import os
import time
from kombu import Queue
from celery import Celery
from celery import shared_task
from celery.utils.log import get_task_logger
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'pg3.settings')
app = Celery('pg3')
app.conf.broker_url = 'amqp://localhost:5672'
app.conf.task_default_queue = 'getting_started'
app.conf.task_default_routing_key = 'getting_started'
app.conf.task_always_eager = False
app.conf.task_default_exchange_type = 'direct'
app.conf.accept_content = ['json', 'application/json']
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
logger = get_task_logger(__name__)
@app.task(bind=False)
def do_something_dumb():
time.sleep(5)
logger.info("Something dumb being done")
return 1
Launching celery with the above code with
celery -E -A pg3 worker --loglevel=info
yields the following stack trace (last few lines only shown):
File
"/home/steve/.virtualenvs/pg3/lib/python3.6/site-packages/amqp/connection.py"
, line 481, in on_inbound_method
method_sig, payload, content,
File
"/home/steve/.virtualenvs/pg3/lib/python3.6/site-packages/amqp/abstract_channel.py"
, line 128, in dispatch_method
listener(*args)
File
"/home/steve/.virtualenvs/pg3/lib/python3.6/site-packages/amqp/channel.py",
line 279, in _on_close
reply_code, reply_text, (class_id, method_id), ChannelError,
amqp.exceptions.PreconditionFailed: Queue.declare: (406)
PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'getting_started'
in vhost '/': received 'true' but current is 'false'
So, after much gnashing of teeth, I figured out that pika creates the queue
as non-durable by default, yet Celery wants to do the opposite.
It *seems* to me that Celery isn't honoring what (little) I currently
understand about AMQP, Rabbit, pika and Celery. The docs I had read said if
a queue is inadvertantly redeclared, that should be a no-op. But it also
appears that Celery is trying to declare the queue, in this case
getting_started
with a parameter of durable = "True" and then RabbitMQ rightfully complains
that a queue cannot be both durable and non durable.
So, my questions are:
1) Should Celery honor pre-existing queues that are not default, but
direct? (In otherwords, should it test for a queue's existence before
trying to create it with its own set of default parameters
2) Is it shown somewhere in the Celery docs where I could let Celery no
that this already existing queue is not durable; and if so, how would I do
that?
3) Are the AMQP docs
<http://amqp.readthedocs.io/en/latest/reference/amqp.channel.html#amqp.channel.Channel.queue_declare>
in error to show that durable gets initialized to false?
4) Following the link in the AMQP docs to the source code of queue_declare
<http://amqp.readthedocs.io/en/latest/_modules/amqp/channel.html#Channel.queue_declare> states
quite clearly:
<Loading Image...
Does this mean that Celery has a bug since it is breaking the rule of
"server Must ignore the durable field if the queue already exists" or does
this mean there is a bug in RabbitMQ (as it is the server that perhaps is
not ignoring the rule?
Thanks for taking the time. I have worked around the problem by having
Celery create the queues I want to create first, and then I allow pika to
connect to them with it's defaults without breaking the behavior. The fact
that it only breaks when first creating a durable queue with Celery and
then successfully connecting to said queue with pika using pika's defaults
leads me to believe it's a bug on Celery's end but I am not sure if I am
right in this.
Steve
--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+***@googlegroups.com.
To post to this group, send email to celery-***@googlegroups.com.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+***@googlegroups.com.
To post to this group, send email to celery-***@googlegroups.com.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.