Skip to content

Commit c5b31e7

Browse files
committed
Dataframe* - move to library, split loading & data transform logic
update `build_dataframe` to not only return the dataframe but also save it to `self.rollup` split off loading of batches into `iter_batches` Dataframe*.merge: work on self.rollup, move all merges to a .merge method collection status & db host metric use a simpler merge implementation (pd.concat), the rest uses base.merge which does more, but needs `unique_index_columns`, `data_columns`, `operations` & `cast_types` implemented And move dataframes to library
1 parent a1b2e88 commit c5b31e7

File tree

12 files changed

+104
-680
lines changed

12 files changed

+104
-680
lines changed

metrics_utility/anonymized_rollups/base_anonymized_rollup.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,16 @@
88
import pandas as pd
99

1010
from metrics_utility.anonymized_rollups.helpers import sanitize_json
11+
from metrics_utility.library.dataframes import BaseDataframe
1112

1213

13-
class BaseAnonymizedRollup:
14+
class BaseAnonymizedRollup(BaseDataframe):
1415
def __init__(self, rollup_name: str):
16+
super().__init__()
17+
1518
self.rollup_name = rollup_name
1619
self.collector_names = []
1720

18-
def merge(self, dataframe_all, dataframe_new):
19-
return pd.concat([dataframe_all, dataframe_new], ignore_index=True)
20-
2121
def rollup(self, dataframe_all, dataframe_new):
2222
# not implemented in base class, return empty dataframe
2323
return pd.DataFrame()
Lines changed: 42 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
import datetime
22

3-
from functools import reduce
4-
5-
import pandas as pd
6-
73
from dateutil.relativedelta import relativedelta
84

95

@@ -39,70 +35,59 @@ def list_dates(start_date, end_date, granularity):
3935
return dates_arr
4036

4137

42-
# For JSON/dict columns: update one dict with the other (later values overwrite earlier ones)
43-
def combine_json(json1, json2):
44-
merged = {}
45-
if isinstance(json1, dict):
46-
merged.update(json1)
47-
if isinstance(json2, dict):
48-
merged.update(json2)
49-
return merged
50-
51-
52-
# For set columns: take the union of the two sets
53-
def combine_set(set1, set2):
54-
"""
55-
Combine two collections (set or list) into a single set of unique items.
56-
If an input is a list, it is first converted to a set.
57-
If an input is not a list or a set, it is treated as empty.
58-
"""
59-
# Convert to set if input is a list; otherwise, if not a set, default to an empty set.
60-
if isinstance(set1, list):
61-
set1 = set(set1)
62-
elif not isinstance(set1, set):
63-
set1 = set()
64-
65-
if isinstance(set2, list):
66-
set2 = set(set2)
67-
elif not isinstance(set2, set):
68-
set2 = set()
38+
class Base:
39+
def __init__(self, extractor, month, extra_params, klass):
40+
self.extractor = extractor
41+
self.month = month
42+
self.extra_params = extra_params
43+
self.klass = klass
6944

70-
# Return the union of both sets.
71-
return set1.union(set2)
45+
def build_dataframe(self):
46+
o = self.klass()
47+
o.from_tarballs(self.iter_batches(o.TARBALL_NAMES))
48+
if o.rollup is not None:
49+
return o.rollup
50+
return o.empty()
7251

52+
def dedup(self, dataframe, hostname_mapping=None, scope_dataframe=None):
53+
return self.klass().dedup(dataframe, hostname_mapping=hostname_mapping, scope_dataframe=scope_dataframe)
7354

74-
def merge_sets(x):
75-
return set().union(*x)
55+
def iter_batches(self, names):
56+
collections = []
57+
optional = []
58+
datas = map(lambda x: x.replace('.csv', '').replace('.json', ''), names)
59+
names = [*names]
7660

61+
if 'config.json' in names:
62+
optional.append('config')
63+
names.remove('config.json')
64+
if 'data_collection_status.csv' in names:
65+
optional.append('data_collection_status')
66+
names.remove('data_collection_status.csv')
7767

78-
def merge_setdicts(x):
79-
return reduce(combine_json_values, x, {})
68+
collections = list(map(lambda x: x.replace('.csv', ''), names))
69+
if len(collections) == 0:
70+
collections = None
8071

72+
for date in self.dates():
73+
for data in self.extractor.iter_batches(date=date, collections=collections, optional=optional):
74+
tup = tuple()
75+
nonempty = 0
8176

82-
# Helper function to combine two JSON values.
83-
# For each key, it builds a set of non-null, non-empty values from both inputs.
84-
def combine_json_values(val1, val2):
85-
merged = {}
86-
for d in [val1, val2]:
87-
if isinstance(d, dict):
88-
for key, value in d.items():
89-
if value is not None and value != '':
90-
if isinstance(value, set):
91-
merged.setdefault(key, set()).update(value)
92-
else:
93-
merged.setdefault(key, set()).add(value)
77+
for name in datas:
78+
batch = data[name]
79+
tup = (*tup, batch)
9480

95-
return merged
81+
if name != 'config' and not batch.empty:
82+
nonempty += 1
9683

84+
if nonempty < 1:
85+
continue
9786

98-
class Base:
99-
def __init__(self, extractor, month, extra_params):
100-
self.extractor = extractor
101-
self.month = month
102-
self.extra_params = extra_params
87+
if len(tup) == 1:
88+
tup = tup[0]
10389

104-
def build_dataframe(self):
105-
pass
90+
yield tup
10691

10792
def dates(self):
10893
if self.extra_params.get('since_date') is not None:
@@ -114,85 +99,3 @@ def dates(self):
11499

115100
dates_list = list_dates(start_date=beginning_of_the_month, end_date=end_of_the_month, granularity='daily')
116101
return dates_list
117-
118-
def cast_dataframe(self, df, types):
119-
levels = []
120-
if len(self.unique_index_columns()) == 1:
121-
# Special behavior if the index is not composite, but only 1 column
122-
# Casting index field to object
123-
df.index = df.index.astype(object)
124-
else:
125-
# Composite index branch
126-
# Casting index field to object
127-
for index, _level in enumerate(df.index.levels):
128-
casted_level = df.index.levels[index].astype(object)
129-
levels.append(casted_level)
130-
131-
df.index = df.index.set_levels(levels)
132-
133-
return df.astype(types)
134-
135-
def summarize_merged_dataframes(self, df, columns, operations={}):
136-
for col in columns:
137-
if operations.get(col) == 'min':
138-
df[col] = df[[f'{col}_x', f'{col}_y']].min(axis=1)
139-
elif operations.get(col) == 'max':
140-
df[col] = df[[f'{col}_x', f'{col}_y']].max(axis=1)
141-
elif operations.get(col) == 'combine_set':
142-
df[col] = df.apply(lambda row: combine_set(row.get(f'{col}_x'), row.get(f'{col}_y')), axis=1)
143-
elif operations.get(col) == 'combine_json':
144-
df[col] = df.apply(lambda row: combine_json(row.get(f'{col}_x'), row.get(f'{col}_y')), axis=1)
145-
elif operations.get(col) == 'combine_json_values':
146-
df[col] = df.apply(lambda row: combine_json_values(row.get(f'{col}_x'), row.get(f'{col}_y')), axis=1)
147-
else:
148-
df[col] = df[[f'{col}_x', f'{col}_y']].sum(axis=1)
149-
del df[f'{col}_x']
150-
del df[f'{col}_y']
151-
return df
152-
153-
def empty(self):
154-
return pd.DataFrame(columns=self.unique_index_columns() + self.data_columns())
155-
156-
# Multipart collection, merge the dataframes and sum counts
157-
def merge(self, rollup, new_group):
158-
if rollup is None:
159-
return new_group
160-
161-
rollup = pd.merge(rollup.loc[:,], new_group.loc[:,], on=self.unique_index_columns(), how='outer')
162-
rollup = self.summarize_merged_dataframes(rollup, self.data_columns(), operations=self.operations())
163-
return self.cast_dataframe(rollup, self.cast_types())
164-
165-
def dedup(self, dataframe, hostname_mapping=None):
166-
if dataframe is None or dataframe.empty:
167-
return self.empty()
168-
169-
if not hostname_mapping:
170-
return dataframe
171-
172-
# map hostnames to canonical value
173-
df = dataframe.copy()
174-
175-
df['host_name'] = df['host_name'].map(hostname_mapping).fillna(df['host_name'])
176-
177-
# multiple rows can now have the same hostname, regroup
178-
df_grouped = self.regroup(df)
179-
180-
# cast types to match the table
181-
df_grouped = self.cast_dataframe(df_grouped, self.cast_types())
182-
return df_grouped.reset_index()
183-
184-
@staticmethod
185-
def unique_index_columns():
186-
pass
187-
188-
@staticmethod
189-
def data_columns():
190-
pass
191-
192-
@staticmethod
193-
def cast_types():
194-
pass
195-
196-
@staticmethod
197-
def operations():
198-
pass

metrics_utility/automation_controller_billing/dataframe_engine/dataframe_collection_status.py

Lines changed: 0 additions & 32 deletions
This file was deleted.

0 commit comments

Comments
 (0)