Discussion:
Undocumented parameter in Task.apply_async - task_id
jot-SO3KZ/
2012-12-19 11:54:43 UTC
Permalink
Hi,
First of all sorry for my english (if something is unclear - feel free to
correct me/ask me to be more specific). I'm trying to figure out how to
apply celery in workflow of my application.
It would be cool if there was possibility to change i.e. eta of task which
is waiting for execution and execute it later/faster.
So, I investigated all possibilities and spotted that i can pass task_id in
apply_async but i'm not sure what it does (if it updates "meta" of task and
really changes behaviour of execution or creates new task based on previous
options?).

Thanks in advance
--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To view this discussion on the web visit https://groups.google.com/d/msg/celery-users/-/mwiMWS3PPeUJ.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFF+G/***@public.gmane.org
To unsubscribe from this group, send email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/***@public.gmane.org
For more options, visit this group at http://groups.google.com/group/celery-users?hl=en.
msantos
2013-01-09 04:55:41 UTC
Permalink
By default, taskIds are uuids. You can override the id of your task via
the task_id option.

Refer to FAQ:
http://docs.celeryproject.org/en/latest/faq.html#can-i-specify-a-custom-task-id

I find it useful for when I submit a group of related tasks, I prefer to
have each task in the group to have the same baseId and just a postfix for
the unique task rather than random Ids.
Additionally, if you are storing taskIds for tracking in a DB, you can make
the task_id "DB-friendly" or correlated to an actual DB key.

The eta option is also a supported option for apply_async.
Post by jot-SO3KZ/
Hi,
First of all sorry for my english (if something is unclear - feel free to
correct me/ask me to be more specific). I'm trying to figure out how to
apply celery in workflow of my application.
It would be cool if there was possibility to change i.e. eta of task which
is waiting for execution and execute it later/faster.
So, I investigated all possibilities and spotted that i can pass task_id
in apply_async but i'm not sure what it does (if it updates "meta" of task
and really changes behaviour of execution or creates new task based on
previous options?).
Thanks in advance
--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To view this discussion on the web visit https://groups.google.com/d/msg/celery-users/-/rHYlE9xnqL0J.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFF+G/***@public.gmane.org
To unsubscribe from this group, send email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/***@public.gmane.org
For more options, visit this group at http://groups.google.com/group/celery-users?hl=en.
Ask Solem
2013-01-09 13:34:32 UTC
Permalink
By default, taskIds are uuids. You can override the id of your task via the task_id option.
Refer to FAQ: http://docs.celeryproject.org/en/latest/faq.html#can-i-specify-a-custom-task-id
I find it useful for when I submit a group of related tasks, I prefer to have each task in the group to have the same baseId and just a postfix for the unique task rather than random Ids.
Additionally, if you are storing taskIds for tracking in a DB, you can make the task_id "DB-friendly" or correlated to an actual DB key.
The eta option is also a supported option for apply_async.
Hi,
First of all sorry for my english (if something is unclear - feel free to correct me/ask me to be more specific). I'm trying to figure out how to apply celery in workflow of my application.
It would be cool if there was possibility to change i.e. eta of task which is waiting for execution and execute it later/faster.
So, I investigated all possibilities and spotted that i can pass task_id in apply_async but i'm not sure what it does (if it updates "meta" of task and really changes behaviour of execution or creates new task based on previous options?).
Thanks in advance
You cannot modify a message that has already been sent, and so you cannot change the eta/countdown either.
This is typical for message-passing systems, you can think of it as a stream of messages;
Messages aren't stored and accessible from a single database source,
they can originate from anywhere and be in-flight on the network, in another broker waiting to be federated, on the queue, or already delivered to a worker.

Read http://docs.celeryproject.org/en/latest/userguide/tasks.html#state

So back to the problem you want to solve, it is actually possible but you need to store the new eta value in a database somewhere (or memcached works too), then the task itself would check if there is a new value
and reschedule itself:

from datetime import timedelta

from celery import Task, current_app
from celery.exceptions import Ignore

RESCHEDULE_KEY = 'task-reschedule-{0}'

class Reschedules(Task):

def __call__(self, *args, **kwargs):
new_eta = CACHE.get(RESCHEDULE_KEY.format(self.request.id))
if new_eta:
# new eta stored as string in ISO 8601 date format.
self.subtask_from_request(self.request, eta=maybe_iso8601(new_eta)).delay()

# raising Ignore here means that it will not update the state of the task (i.e. SUCCESS).
raise Ignore()
return super(Reschedules, self).__call__(*args, **kwargs)


def reschedule_task(id, new_eta):
if isinstance(new_eta, (int, float)): # support int (countdown-style)
# celery_app.now gives the current datetime in UTC depending on
# the CELERY_ENABLE_UTC setting.
new_eta = current_app.now() + timedelta(seconds=new_eta)
CACHE.set(RESCHEDULE_KEY.format(id), new_eta.isoformat())


@task(base=Reschedules)
def some_task():
from time import sleep
sleep(2)

Here you would have to implement the cache (or database) operations yourself.

The task_id argument is only to specify a custom id for when you don't want it to be automatically generated.
Especially useful when you need to prepare result instances in advance, the celery.canvas module uses this a lot.

uuid's are generated by the 'celery.uuid' function:

from celery import uuid
from proj.tasks import add
task_id = uuid()
premade_result = add.AsyncResult(task_id)

add.apply_async(…, task_id=task_id)
--
Ask Solem
twitter.com/asksol
--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFF+G/***@public.gmane.org
To unsubscribe from this group, send email to celery-users+***@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/celery-users?hl=en.
Loading...