-
-
Notifications
You must be signed in to change notification settings - Fork 973
Open
Description
Ever since I migrated from celery 4.4.7 to 5.2.7, I have been experiencing a problem in my application where one of the celery task chord callback features is not working properly. Here is the code for that
def process_pool():
res = download_preproc(lslist, download=False, skipexist=False, start=False, context=context)
# res is a celery grouptask and we are making a chord with a job_status done callback
updater = chord(res, update_pool_job_status.si(pool_d, str(pool_id), WORK_DONE.code))
applied_updater = updater.apply_async(retry=True, retry_policy={'max_retries': 5})
# we store the groupresult to track individual progress of documents
applied_pool = applied_updater.parent
applied_pool.save()
def download_preproc(lslist, lvl=None, download=True, preproc=True, skipexist=True, filt=None, chunk_size=25,
start=True, context:TaskConfig = None):
part1 = group(celery_nouns.s([str(_id) for _id in ids], lvl_dict, ls.to_dict(add_host=ls.host), lvl.excludelocations, context=context)
for ls in lslist for ids in chunks(set([x['_id'] for x in ls.get_coll().find(ls.get_filter(filter_dict=filt), {'_id': 1})]), chunk_size))
return part1
The callback function that is not executing
@app.task(base=LsTask, autoretry_for=(Exception,), default_retry_delay=5, max_retries=5)
def update_pool_job_status(pool_d:dict, pool_id:str, job_status:int):
"""
Update status of pool when processing
:param pool_d: pools collection (dict or collection)
:param pool_id: pool object
:param status: status to change
:return:
"""
from bson import ObjectId
from datetime import datetime
from .utilities import mongocollection_from_dict
print(f"In update pool job status")
if isinstance(pool_d, dict):
pool_d = mongocollection_from_dict(pool_d)
dt = datetime.utcnow().isoformat()
pool_d.update_one({"_id": ObjectId(pool_id)}, {"$set": {"lastModified": dt, "job_status": job_status}})
Now the funny thing is that, I have another celery task in the same application that is using the chord feature, but that task callback feature is working.
Metadata
Metadata
Assignees
Labels
No labels