@@ -1427,38 +1427,56 @@ def delete_from_elastic(submission_id):
14271427def bulk_send_to_elastic (documents ):
14281428 """Send a batch of documents to Elasticsearch using the _bulk API."""
14291429 if not documents :
1430+ logger .info ("No documents to send to Elasticsearch (bulk)" )
14301431 return True
14311432
14321433 bulk_lines = []
1433- for doc in documents :
1434- # Flatten isolate_data if present (same logic as send_to_elastic2)
1435- if 'isolate_data' in doc and doc ['isolate_data' ]:
1436- isolate_data = doc ['isolate_data' ]
1437- if isinstance (isolate_data , str ):
1438- try :
1439- isolate_data = json .loads (isolate_data )
1440- except Exception :
1441- isolate_data = {}
1442- if isinstance (isolate_data , dict ):
1443- for key , value in isolate_data .items ():
1444- if key not in doc :
1445- doc [key ] = value
1446- del doc ['isolate_data' ]
1447- doc_id = doc .get ("id" )
1448- action = {"index" : {"_id" : doc_id }} if doc_id else {"index" : {}}
1449- bulk_lines .append (json .dumps (action ))
1450- # Use json_serial for datetime/date serialization
1451- bulk_lines .append (json .dumps (doc , default = json_serial ))
1434+ for idx , doc in enumerate (documents ):
1435+ logger .debug (f"Preparing document { idx } for bulk ES: id={ doc .get ('id' )} , keys={ list (doc .keys ())} " )
1436+ # Flatten isolate_data if present (same logic as send_to_elastic2)
1437+ if 'isolate_data' in doc and doc ['isolate_data' ]:
1438+ isolate_data = doc ['isolate_data' ]
1439+ if isinstance (isolate_data , str ):
1440+ try :
1441+ isolate_data = json .loads (isolate_data )
1442+ except Exception :
1443+ logger .warning (f"Could not parse isolate_data for doc id={ doc .get ('id' )} " )
1444+ isolate_data = {}
1445+ if isinstance (isolate_data , dict ):
1446+ for key , value in isolate_data .items ():
1447+ if key not in doc :
1448+ doc [key ] = value
1449+ del doc ['isolate_data' ]
1450+ doc_id = doc .get ("id" )
1451+ action = {"index" : {"_id" : doc_id }} if doc_id else {"index" : {}}
1452+ bulk_lines .append (json .dumps (action ))
1453+ # Use json_serial for datetime/date serialization
1454+ bulk_lines .append (json .dumps (doc , default = json_serial ))
1455+ logger .debug (f"Bulk action: { action } , doc: { doc } " )
14521456 bulk_data = "\n " .join (bulk_lines ) + "\n "
14531457
1458+ logger .info (f"Sending bulk data to Elasticsearch: url={ settings .ELASTICSEARCH_URL } /{ settings .ELASTICSEARCH_INDEX } /_bulk, num_docs={ len (documents )} " )
1459+ # Optionally log the bulk_data (can be large)
1460+ logger .debug (f"Bulk payload (truncated): { bulk_data [:1000 ]} ..." )
1461+
14541462 url = f"{ settings .ELASTICSEARCH_URL } /{ settings .ELASTICSEARCH_INDEX } /_bulk"
14551463 headers = {"Content-Type" : "application/x-ndjson" }
14561464 try :
14571465 response = requests .post (url , data = bulk_data , headers = headers , timeout = 30 )
1466+ logger .info (f"Bulk ES response status: { response .status_code } " )
14581467 if response .status_code not in (200 , 201 ):
1459- print (f"Bulk indexing failed: { response .text } " )
1468+ logger . error (f"Bulk indexing failed: { response .text } " )
14601469 return False
1470+ # Log errors from ES bulk response if present
1471+ try :
1472+ resp_json = response .json ()
1473+ if resp_json .get ('errors' ):
1474+ logger .error (f"Bulk ES response contains errors: { resp_json } " )
1475+ else :
1476+ logger .info ("Bulk ES response: all documents indexed successfully" )
1477+ except Exception as e :
1478+ logger .warning (f"Could not parse bulk ES response as JSON: { e } " )
14611479 return True
14621480 except Exception as e :
1463- print (f"Exception during bulk ES indexing: { e } " )
1481+ logger . error (f"Exception during bulk ES indexing: { e } " )
14641482 return False
0 commit comments