Skip to content

chord callback not working when migrating from celery 4.4.7 to 5.2.7 #2399

@saadizm

Description

@saadizm

Ever since I migrated from celery 4.4.7 to 5.2.7, I have been experiencing a problem in my application where one of the celery task chord callback features is not working properly. Here is the code for that

def process_pool():

    res = download_preproc(lslist, download=False, skipexist=False, start=False, context=context)
    # res is a celery grouptask and we are making a chord with a job_status done callback
    updater = chord(res, update_pool_job_status.si(pool_d, str(pool_id), WORK_DONE.code))

    applied_updater = updater.apply_async(retry=True, retry_policy={'max_retries': 5})
    # we store the groupresult to track individual progress of documents
    applied_pool = applied_updater.parent
    applied_pool.save()

def download_preproc(lslist, lvl=None, download=True, preproc=True, skipexist=True, filt=None, chunk_size=25,
                     start=True, context:TaskConfig = None):
    part1 = group(celery_nouns.s([str(_id) for _id in ids], lvl_dict, ls.to_dict(add_host=ls.host), lvl.excludelocations, context=context)
                      for ls in lslist for ids in chunks(set([x['_id'] for x in ls.get_coll().find(ls.get_filter(filter_dict=filt), {'_id': 1})]), chunk_size))
   return part1

The callback function that is not executing

@app.task(base=LsTask, autoretry_for=(Exception,), default_retry_delay=5, max_retries=5)
def update_pool_job_status(pool_d:dict, pool_id:str, job_status:int):
    """
    Update status of pool when processing
    :param pool_d: pools collection (dict or collection)
    :param pool_id: pool object
    :param status: status to change
    :return:
    """
    from bson import ObjectId
    from datetime import datetime
    from .utilities import mongocollection_from_dict
    print(f"In update pool job status")

    if isinstance(pool_d, dict):
        pool_d = mongocollection_from_dict(pool_d)

    dt = datetime.utcnow().isoformat()
    pool_d.update_one({"_id": ObjectId(pool_id)}, {"$set": {"lastModified": dt, "job_status": job_status}})

Now the funny thing is that, I have another celery task in the same application that is using the chord feature, but that task callback feature is working.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions