Test if a celery task is still being processed

How can I test if a task (task_id) is still processed in celery? I have the following scenario:

  1. Start a task in a Django view
  2. Store the BaseAsyncResult in the session
  3. Shutdown the celery daemon (hard) so the task is not processed anymore
  4. Check if the task is 'dead'

Any ideas? Can a lookup all task being processed by celery and check if mine is still there?


define a field (PickledObjectField) in your model to store the celery task:

class YourModel(models.Model):
    celery_task = PickledObjectField()

    def task():
        self.celery_task = SubmitTask.apply_async(args = self.task_detail())

In case your task is not specific on any model you should create one specifically for the celery tasks.

or else I suggest using django-celery. It has a nice monitoring feature: http://ask.github.com/celery/userguide/monitoring.html#django-admin-monitor, saves the tasks details in a django model in a nice graphical way.

Need Your Help

FluentAssertions ShouldBeEquivalentTo() versus Should().BeEquivalentTo()

unit-testing fluent-assertions

I have a test that verifies the collection output of a method. This variation of the test passes:

WordPress Post Thumbnail Problem

php html wordpress wordpress-theming

I have two types of posts in the theme I am creating Trending and Normal. Post thumbnail image size is: 300x169 and I show it as that size when a trending post is displayed. But when this post is not