Skip to content

Commit 8789de7

Browse files
authored
Merge pull request #150 from multinet-app/upload-downloads
Make the network JSON uploader more flexible and allow JSON table uploads
2 parents b0e9383 + e0aef36 commit 8789de7

File tree

14 files changed

+744
-13
lines changed

14 files changed

+744
-13
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Generated by Django 3.2.18 on 2023-02-28 22:04
2+
3+
from django.db import migrations, models
4+
5+
6+
class Migration(migrations.Migration):
7+
8+
dependencies = [
9+
('api', '0012_aqlquery_bind_vars'),
10+
]
11+
12+
operations = [
13+
migrations.AlterField(
14+
model_name='upload',
15+
name='data_type',
16+
field=models.CharField(choices=[('CSV', 'Csv'), ('JSON', 'Json'), ('D3_JSON', 'D3 Json'), ('NESTED_JSON', 'Nested Json'), ('NEWICK', 'Newick')], max_length=20),
17+
),
18+
]

multinet/api/models/tasks.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class Upload(Task):
3232

3333
class DataType(models.TextChoices):
3434
CSV = 'CSV'
35+
JSON = 'JSON'
3536
D3_JSON = 'D3_JSON'
3637
NESTED_JSON = 'NESTED_JSON'
3738
NEWICK = 'NEWICK'
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from .common import ProcessUploadTask
22
from .csv import process_csv
33
from .d3_json import process_d3_json
4+
from .json_table import process_json_table
45

5-
__all__ = ['ProcessUploadTask', 'process_csv', 'process_d3_json']
6+
__all__ = ['ProcessUploadTask', 'process_csv', 'process_d3_json', 'process_json_table']

multinet/api/tasks/upload/d3_json.py

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,29 +7,44 @@
77
from multinet.api.models import Network, Table, Upload
88

99
from .common import ProcessUploadTask
10+
from .exceptions import DataFormatError
1011

1112
logger = get_task_logger(__name__)
1213

1314

1415
def d3_node_to_arango_doc(node: Dict) -> Dict:
1516
new_node = dict(node)
1617

17-
# Return None if necessary
18-
node_id = new_node.pop('id', None)
18+
# Check if we have a field we can use for key. _key is preferred, then id
19+
if '_key' in new_node.keys():
20+
node_id = new_node.get('_key', None)
21+
elif 'id' in new_node.keys():
22+
node_id = new_node.pop('id', None)
23+
else:
24+
node_id = None
25+
1926
if node_id is None:
2027
return None
2128

22-
# Assign and return
2329
new_node['_key'] = str(node_id)
2430
return new_node
2531

2632

2733
def d3_link_to_arango_doc(link: Dict, node_table_name: str) -> Dict:
2834
new_link = dict(link)
2935

30-
# Return None if necessary
31-
source = new_link.pop('source', None)
32-
target = new_link.pop('target', None)
36+
# Check if we have a field we can use for from and to. _from and _to are preferred
37+
# then source and target
38+
if '_to' in new_link.keys() and '_from' in new_link.keys():
39+
source = new_link.get('_to', None).split('/')[-1]
40+
target = new_link.get('_from', None).split('/')[-1]
41+
elif 'source' in new_link.keys() and 'target' in new_link.keys():
42+
source = new_link.pop('source', None)
43+
target = new_link.pop('target', None)
44+
else:
45+
source = None
46+
target = None
47+
3348
if source is None or target is None:
3449
return None
3550

@@ -60,9 +75,19 @@ def process_d3_json(
6075
for node in (d3_node_to_arango_doc(node) for node in d3_dict['nodes'])
6176
if node is not None
6277
]
63-
d3_dict['links'] = [
78+
79+
if 'links' in d3_dict.keys():
80+
link_property_name = 'links'
81+
elif 'edges' in d3_dict.keys():
82+
link_property_name = 'edges'
83+
else:
84+
raise DataFormatError("JSON network file missing 'links' or 'edges' property")
85+
86+
d3_dict[link_property_name] = [
6487
link
65-
for link in (d3_link_to_arango_doc(link, node_table_name) for link in d3_dict['links'])
88+
for link in (
89+
d3_link_to_arango_doc(link, node_table_name) for link in d3_dict[link_property_name]
90+
)
6691
if link is not None
6792
]
6893

@@ -80,7 +105,7 @@ def process_d3_json(
80105

81106
# Insert rows
82107
node_table.put_rows(d3_dict['nodes'])
83-
edge_table.put_rows(d3_dict['links'])
108+
edge_table.put_rows(d3_dict[link_property_name])
84109

85110
# Create network
86111
Network.create_with_edge_definition(
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
class DataFormatError(Exception):
2+
def __init__(self, message):
3+
# Call the base class constructor with the parameters it needs
4+
super().__init__(message)
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import json
2+
from typing import Any, BinaryIO, Dict
3+
4+
from celery import shared_task
5+
6+
from multinet.api.models import Table, TableTypeAnnotation, Upload
7+
8+
from .common import ProcessUploadTask
9+
from .utils import processor_dict
10+
11+
12+
def process_row(row: Dict[str, Any], cols: Dict[str, TableTypeAnnotation.Type]) -> Dict:
13+
new_row = dict(row)
14+
15+
# Check for _key or id, if missing, skip row
16+
if not (new_row.get('_key') or new_row.get('id')):
17+
return None
18+
19+
for col_key, col_type in cols.items():
20+
entry = row.get(col_key)
21+
22+
# If null entry, skip
23+
if entry is None:
24+
continue
25+
26+
process_func = processor_dict.get(col_type)
27+
if process_func is not None:
28+
try:
29+
new_row[col_key] = process_func(entry)
30+
except ValueError:
31+
# If error processing row, keep as string
32+
pass
33+
34+
return new_row
35+
36+
37+
@shared_task(base=ProcessUploadTask)
38+
def process_json_table(
39+
task_id: int,
40+
table_name: str,
41+
edge: bool,
42+
columns: Dict[str, TableTypeAnnotation.Type],
43+
) -> None:
44+
upload: Upload = Upload.objects.get(id=task_id)
45+
46+
# Create new table
47+
table: Table = Table.objects.create(
48+
name=table_name,
49+
edge=edge,
50+
workspace=upload.workspace,
51+
)
52+
53+
# Create type annotations
54+
TableTypeAnnotation.objects.bulk_create(
55+
[
56+
TableTypeAnnotation(table=table, column=col_key, type=col_type)
57+
for col_key, col_type in columns.items()
58+
]
59+
)
60+
61+
# Download data from S3/MinIO
62+
with upload.blob as blob_file:
63+
blob_file: BinaryIO = blob_file
64+
imported_json = json.loads(blob_file.read().decode('utf-8'))
65+
66+
processed_rows = [
67+
new_row
68+
for new_row in [process_row(row, columns) for row in imported_json]
69+
if new_row is not None
70+
]
71+
72+
# Put rows in the table
73+
table.put_rows(processed_rows)
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
[
2+
{
3+
"_key": "0",
4+
"name": "Myriel",
5+
"group": "1"
6+
},
7+
{
8+
"_key": "1",
9+
"name": "Napoleon",
10+
"group": "1"
11+
},
12+
{
13+
"_key": "2",
14+
"name": "Mlle.Baptistine",
15+
"group": "1"
16+
},
17+
{
18+
"_key": "3",
19+
"name": "Mme.Magloire",
20+
"group": "1"
21+
},
22+
{
23+
"_key": "4",
24+
"name": "CountessdeLo",
25+
"group": "1"
26+
}
27+
]

0 commit comments

Comments
 (0)