Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion modnet/models/vanilla.py
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ class OR only return the most probable class.

# post-process based on training data
if remap_out_of_bounds:
if max(self.num_classes.values()) <= 2: # regression
if max(self.num_classes.values()) < 2: # regression
for i, vals in enumerate(p):
yrange = self.max_y[i] - self.min_y[i]
upper_bound = self.max_y[i] + 0.25 * yrange
Expand Down
180 changes: 99 additions & 81 deletions modnet/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from __future__ import annotations

from functools import partial
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from typing import Callable, Dict, Hashable, Iterable, List, Optional, Tuple, Union

Expand Down Expand Up @@ -57,8 +57,17 @@ def compute_mi(
return mi, x_name, y_name


def map_mi(kwargs):
return compute_mi(**kwargs)
def map_mi(task):
try:
return compute_mi(**task)
except Exception:
x_name = task.get("x_name", "unknown_x")
y_name = task.get("y_name", "unknown_y")
return 0.0, x_name, y_name


def is_valid_feature(entropy, drop_thr, values):
return entropy >= drop_thr and abs(values.max() - values.min()) >= EPS


def nmi_target(
Expand Down Expand Up @@ -146,7 +155,8 @@ def _mapArrayToInt(a):

mutual_info.loc[:, target_name] = _mifun(df_feat, df_target[target_name], **kwargs)

# Compute the "self" mutual information (i.e. information entropy) of the target variable and of the input features
# Compute the "self" mutual information (i.e. information entropy)
# of the target variable and of the input features
target_mi = _self_mifun(
df_target[target_name].values.reshape(-1, 1), df_target[target_name], **kwargs
)[0]
Expand All @@ -172,120 +182,128 @@ def get_cross_nmi(
df_feat: pd.DataFrame,
drop_thr: float = 0.2,
return_entropy=False,
timeout: int = 60,
n_jobs: int = None,
**kwargs,
) -> pd.DataFrame:
) -> pd.DataFrame | tuple[pd.DataFrame, dict]:
"""
Computes the Normalized Mutual Information (NMI) between input features.

Args:
df_feat (pandas.DataFrame): Dataframe containing the input features for
which the NMI is to be computed.
drop_thr: Features having an information entropy (or self mutual information) threshold below this value will be dropped.
return_entropy: If set to True, the information entropy of each feature is also returned
drop_thr: Features having an information entropy (or self mutual information)
threshold below this value will be dropped.
return_entropy: If set to True, the information entropy of each feature
is also returned
**kwargs: Keyword arguments to be passed down to the
:py:func:`mutual_info_regression` function from scikit-learn. This
can be useful e.g. for testing purposes.

Returns:
mutual_info: pandas.DataFrame containing the Normalized Mutual Information between features.
if return_entropy=True : (mutual_info, diag): With diag a dictionary with all features as keys and information entropy as values.
mutual_info: pandas.DataFrame containing the Normalized Mutual Information
between features.
if return_entropy=True: (mutual_info, diag): With diag a dictionary with
all features as keys and information entropy as values.
"""

if kwargs.get("random_state"):
seed = kwargs.pop("random_state")
else:
seed = np.random.RandomState()
seed = kwargs.pop("random_state", np.random.randint(0, 1000000))

if kwargs.get("n_neighbors"):
n_neighbors = kwargs.pop("n_neighbors")
else:
n_neighbors = 3
n_neighbors = kwargs.pop("n_neighbors", 3)

seen_errors = set()

# preprocess the input matrix
if (
df_feat.isna().any().any()
): # only preprocess if nans are present to preserve past behaviour
if df_feat.isna().any().any():
scaler = MinMaxScaler(feature_range=(-0.5, 0.5))
x = df_feat.values
x = scaler.fit_transform(x)
x = np.nan_to_num(x, nan=-1)
df_feat = pd.DataFrame(x, index=df_feat.index, columns=df_feat.columns)

# Prepare the output DataFrame and compute the mutual information
mutual_info = pd.DataFrame([], columns=df_feat.columns, index=df_feat.columns)

# create pool of workers
if n_jobs is None:
n_jobs = 1
pool = Pool(processes=n_jobs)
mutual_info = pd.DataFrame(0.0, columns=df_feat.columns, index=df_feat.columns)

LOG.info(f"Multiprocessing on {n_jobs} workers.")
LOG.info("Computing 'self' MI (i.e. information entropy) of features")

# Compute the "self" mutual information (i.e. information entropy) of the features
LOG.info('Computing "self" MI (i.e. information entropy) of features')
diag = {}
tasks = []
for x_feat in df_feat.columns:
tasks += [
{
"x": df_feat[x_feat].values,
"y": df_feat[x_feat].values,
"x_name": x_feat,
"y_name": x_feat,
"random_state": seed,
"n_neighbors": n_neighbors,
}
]

to_drop = []
for res in tqdm.tqdm(
pool.imap_unordered(map_mi, tasks, chunksize=100), total=len(tasks)
):
feat_name = res[1]
diag[feat_name] = res[0]
if (
diag[feat_name] < drop_thr
or abs(df_feat[feat_name].max() - df_feat[feat_name].min()) < EPS
):
to_drop.append(feat_name)
else:
mutual_info.loc[feat_name, feat_name] = 1.0
tasks = [
{
"x": df_feat[x_feat].values,
"y": df_feat[x_feat].values,
"x_name": x_feat,
"y_name": x_feat,
"random_state": seed,
"n_neighbors": n_neighbors,
}
for x_feat in df_feat.columns
]

with ProcessPoolExecutor(max_workers=n_jobs) as executor:
future_to_task = {executor.submit(map_mi, t): t for t in tasks}
for fut in tqdm.tqdm(as_completed(future_to_task), total=len(future_to_task)):
task = future_to_task[fut]

try:
res = fut.result(timeout=timeout)
feat_name = res[1]
diag[feat_name] = res[0]
if is_valid_feature(diag[feat_name], drop_thr, df_feat[feat_name]):
mutual_info.loc[feat_name, feat_name] = 1.0
except Exception as e:
key = (task["x_name"], task["y_name"])
if key not in seen_errors:
seen_errors.add(key)
LOG.warning(f"[Entropy task error] Failed for {key[0]}: {e}")
continue

to_drop = [
f
for f in mutual_info.columns
if f not in diag or not is_valid_feature(diag[f], drop_thr, df_feat[f])
]
mutual_info.drop(to_drop, axis=0, inplace=True)
mutual_info.drop(to_drop, axis=1, inplace=True)

tasks = []
LOG.info("Computing cross NMI between all features...")
for idx, x_feat in enumerate(mutual_info.columns):
for y_feat in mutual_info.columns[idx + 1 :]:
tasks += [
{
"x": df_feat[x_feat].values,
"y": df_feat[y_feat].values,
"x_name": x_feat,
"y_name": y_feat,
"random_state": seed,
"n_neighbors": n_neighbors,
}
]

for res in tqdm.tqdm(
pool.imap_unordered(map_mi, tasks, chunksize=100), total=len(tasks)
):
mutual_info.loc[res[1], res[2]] = mutual_info.loc[res[2], res[1]] = res[0] / (
0.5 * (diag[res[1]] + diag[res[2]])
)
pool.close()
pool.join()

mutual_info.fillna(0, inplace=True) # if na => no relation => set to zero
tasks = [
{
"x": df_feat[x_feat].values,
"y": df_feat[y_feat].values,
"x_name": x_feat,
"y_name": y_feat,
"random_state": seed,
"n_neighbors": n_neighbors,
}
for idx, x_feat in enumerate(mutual_info.columns)
for y_feat in mutual_info.columns[idx + 1 :]
]

with ProcessPoolExecutor(max_workers=n_jobs) as executor:
future_to_task = {executor.submit(map_mi, t): t for t in tasks}
for fut in tqdm.tqdm(as_completed(future_to_task), total=len(future_to_task)):
task = future_to_task[fut]
try:
res = fut.result(timeout=timeout)
mi, x, y = res[0], res[1], res[2]
if x in diag and y in diag and diag[x] > 0 and diag[y] > 0:
denom = 0.5 * (diag[x] + diag[y])
mutual_info.loc[x, y] = mutual_info.loc[y, x] = (
mi / denom if denom > 0 else 0.0
)
except Exception as e:
key = (task["x_name"], task["y_name"])
if key not in seen_errors:
seen_errors.add(key)
LOG.warning(f"[Cross NMI error] Skipping {key[0]} x {key[1]}: {e}")
continue

if return_entropy:
return (
mutual_info,
diag,
) # diag can be useful for future elimination based on entropy without the need of recomputing the cross NMI
# diag can be useful for future elimination based on
# entropy without the need of recomputing the cross NMI
return mutual_info, diag
else:
return mutual_info

Expand Down