@@ -35,10 +35,11 @@ def insert_or_do_nothing_on_conflict(table, conn, keys, data_iter):
3535 :param data_iter: the data to insert
3636 :type data_iter: iterator over dictionaries
3737 """
38- insert_stmt = insert (table .table ).values (list (data_iter ))
39- on_conflict_stmt = insert_stmt .on_conflict_do_nothing (index_elements = [keys ])
40- conn .execute (on_conflict_stmt )
41-
38+ data = [dict (zip (keys , row )) for row in data_iter ]
39+ stmt = insert (table .table ).values (data ).on_conflict_do_nothing ()
40+ result = conn .execute (stmt )
41+ return result .rowcount
42+
4243
4344# ref: https://stackoverflow.com/questions/30867390/python-pandas-to-sql-how-to-create-a-table-with-a-primary-key
4445def create_table (
@@ -213,15 +214,13 @@ def request_data(url, timeout):
213214
214215 """
215216 logging .debug ("requesting {} ..." .format (url ))
216- try :
217- r = requests .get (url , timeout = timeout )
218- except :
219- return None
217+ r = requests .get (url , timeout = timeout )
218+
220219 if r .status_code == 200 :
221220 return r .text
222221 else :
223- logging .debug ("Request error {}" .format (r .status_code ))
224- return None
222+ logging .error ("Request error {}" .format (r .status_code ))
223+ exit ( 1 )
225224
226225
227226def download_data (conf , station , timeout = 15 ):
@@ -244,7 +243,7 @@ def download_data(conf, station, timeout=15):
244243 logging .error (
245244 f"Error when downloading data for station { station } using url { url } ...\n { request } "
246245 )
247- return None
246+ exit ( 1 )
248247
249248
250249def prepare_dataframe (data : Dict , datetime_column : str ) -> pd .DataFrame :
@@ -316,7 +315,7 @@ def write_to_database(
316315 not "constrained_columns" in contrains
317316 or datetime_column not in contrains ["constrained_columns" ]
318317 ):
319- logging .error (
318+ logging .warning (
320319 f"Table { table_name } has no primary key set on { datetime_column } column, this can result in duplicated data in table ..."
321320 )
322321
@@ -327,18 +326,17 @@ def write_to_database(
327326 "if_exists" : "append" ,
328327 "index" : False ,
329328 "chunksize" : 1024 ,
330- "method" : insert_ignore_conflicts # insert_or_do_nothing_on_conflict,
329+ "method" : insert_or_do_nothing_on_conflict
331330 }
332331
333332 try :
334333 num_rows = len (df )
335334 logging .info (f"Attempting to insert { num_rows } rows into { table_name } ..." )
336- #breakpoint()
337335 df .to_sql (** to_sql_arugments )
338336 logging .info (f"succesfully writing data ..." )
339337 except Exception as E :
340338 logging .error (f"failed writing data: { str (E )[:200 ]} " ) # Print first 200 chars of error
341-
339+ exit ( 1 )
342340
343341def manage_dl_db (
344342 conf ,
0 commit comments