Skip to content

Commit 138cb82

Browse files
authored
Merge pull request #146 from multinet-app/batch-inserts
Make CSV uploader insert in batches
2 parents 9493c35 + abb1069 commit 138cb82

File tree

1 file changed

+21
-17
lines changed
  • multinet/api/tasks/upload

1 file changed

+21
-17
lines changed

multinet/api/tasks/upload/csv.py

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -47,21 +47,6 @@ def process_csv(
4747
) -> None:
4848
upload: Upload = Upload.objects.get(id=task_id)
4949

50-
# Download data from S3/MinIO
51-
with upload.blob as blob_file:
52-
blob_file: BinaryIO = blob_file
53-
csv_rows = list(
54-
csv.DictReader(
55-
StringIO(blob_file.read().decode('utf-8')),
56-
delimiter=delimiter,
57-
quotechar=quotechar,
58-
)
59-
)
60-
61-
# Cast entries in each row to appropriate type, if necessary
62-
for i, row in enumerate(csv_rows):
63-
csv_rows[i] = process_row(row, columns)
64-
6550
# Create new table
6651
table: Table = Table.objects.create(
6752
name=table_name,
@@ -77,8 +62,27 @@ def process_csv(
7762
]
7863
)
7964

80-
# Insert rows
81-
table.put_rows(csv_rows)
65+
# Download data from S3/MinIO
66+
with upload.blob as blob_file:
67+
blob_file: BinaryIO = blob_file
68+
csv_reader = csv.DictReader(
69+
StringIO(blob_file.read().decode('utf-8')),
70+
delimiter=delimiter,
71+
quotechar=quotechar,
72+
)
73+
74+
# Cast entries in each row to appropriate type, if necessary
75+
processed_rows = []
76+
for row in csv_reader:
77+
processed_rows.append(process_row(row, columns))
78+
79+
# Insert rows
80+
if len(processed_rows) == 100000:
81+
table.put_rows(processed_rows)
82+
processed_rows = []
83+
84+
# Put remaining rows
85+
table.put_rows(processed_rows)
8286

8387

8488
def maybe_insert_join_statement(query: str, bind_vars: Dict, table_dict: Dict) -> Tuple[str, Dict]:

0 commit comments

Comments
 (0)