@@ -85,45 +85,7 @@ def load_json_schema(filename: str) -> Dict[str, Any]:
8585)
8686
8787
88- # Bulk ES helper
89- def bulk_send_to_elastic (documents ):
90- """Send a batch of documents to Elasticsearch using the _bulk API."""
91- if not documents :
92- return True
9388
94- bulk_lines = []
95- for doc in documents :
96- # Flatten isolate_data if present (same logic as send_to_elastic2)
97- if 'isolate_data' in doc and doc ['isolate_data' ]:
98- isolate_data = doc ['isolate_data' ]
99- if isinstance (isolate_data , str ):
100- try :
101- isolate_data = json .loads (isolate_data )
102- except Exception :
103- isolate_data = {}
104- if isinstance (isolate_data , dict ):
105- for key , value in isolate_data .items ():
106- if key not in doc :
107- doc [key ] = value
108- del doc ['isolate_data' ]
109- doc_id = doc .get ("id" )
110- action = {"index" : {"_id" : doc_id }} if doc_id else {"index" : {}}
111- bulk_lines .append (json .dumps (action ))
112- # Use json_serial for datetime/date serialization
113- bulk_lines .append (json .dumps (doc , default = json_serial ))
114- bulk_data = "\n " .join (bulk_lines ) + "\n "
115-
116- url = f"{ settings .ELASTICSEARCH_URL } /{ settings .ELASTICSEARCH_INDEX } /_bulk"
117- headers = {"Content-Type" : "application/x-ndjson" }
118- try :
119- response = requests .post (url , data = bulk_data , headers = headers , timeout = 30 )
120- if response .status_code not in (200 , 201 ):
121- print (f"Bulk indexing failed: { response .text } " )
122- return False
123- return True
124- except Exception as e :
125- print (f"Exception during bulk ES indexing: { e } " )
126- return False
12789
12890sg_api_key = SENDGRID_API_KEY
12991sg_from_email = SENDGRID_FROM_EMAIL
@@ -1462,4 +1424,44 @@ def delete_from_elastic(submission_id):
14621424 return False
14631425 except Exception as e :
14641426 print (f"Error deleting documents from Elasticsearch: { e } " )
1427+ return False
1428+
1429+ # Bulk ES helper
1430+ def bulk_send_to_elastic (documents ):
1431+ """Send a batch of documents to Elasticsearch using the _bulk API."""
1432+ if not documents :
1433+ return True
1434+
1435+ bulk_lines = []
1436+ for doc in documents :
1437+ # Flatten isolate_data if present (same logic as send_to_elastic2)
1438+ if 'isolate_data' in doc and doc ['isolate_data' ]:
1439+ isolate_data = doc ['isolate_data' ]
1440+ if isinstance (isolate_data , str ):
1441+ try :
1442+ isolate_data = json .loads (isolate_data )
1443+ except Exception :
1444+ isolate_data = {}
1445+ if isinstance (isolate_data , dict ):
1446+ for key , value in isolate_data .items ():
1447+ if key not in doc :
1448+ doc [key ] = value
1449+ del doc ['isolate_data' ]
1450+ doc_id = doc .get ("id" )
1451+ action = {"index" : {"_id" : doc_id }} if doc_id else {"index" : {}}
1452+ bulk_lines .append (json .dumps (action ))
1453+ # Use json_serial for datetime/date serialization
1454+ bulk_lines .append (json .dumps (doc , default = json_serial ))
1455+ bulk_data = "\n " .join (bulk_lines ) + "\n "
1456+
1457+ url = f"{ settings .ELASTICSEARCH_URL } /{ settings .ELASTICSEARCH_INDEX } /_bulk"
1458+ headers = {"Content-Type" : "application/x-ndjson" }
1459+ try :
1460+ response = requests .post (url , data = bulk_data , headers = headers , timeout = 30 )
1461+ if response .status_code not in (200 , 201 ):
1462+ print (f"Bulk indexing failed: { response .text } " )
1463+ return False
1464+ return True
1465+ except Exception as e :
1466+ print (f"Exception during bulk ES indexing: { e } " )
14651467 return False
0 commit comments