Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mostlyai/engine/_language/encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ def _encode_partition(
def encode(
workspace_dir: str | Path | None = None,
update_progress: ProgressCallback | None = None,
parallel_backend: str = "loky",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Language encoding silently ignores parallel_backend parameter

Low Severity

The parallel_backend parameter is added to the language encode() function but is never referenced in the function body. Unlike the tabular path (where it's properly propagated through _encode_partition()encode_df()parallel_config()), the language encode() simply accepts the parameter and discards it. A user passing parallel_backend="threading" for a language model workspace would get no effect, with no warning or error indicating the parameter was ignored.

Fix in Cursor Fix in Web

) -> None:
_LOG.info("ENCODE_LANGUAGE started")
t0 = time.time()
Expand Down
9 changes: 8 additions & 1 deletion mostlyai/engine/_tabular/encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
def encode(
workspace_dir: str | Path | None = None,
update_progress: ProgressCallback | None = None,
parallel_backend: str = "loky",
) -> None:
_LOG.info("ENCODE_TABULAR started")
t0 = time.time()
Expand Down Expand Up @@ -84,6 +85,7 @@ def encode(
ctx_partition_file=ctx_pqt_partitions[i] if has_context else None,
ctx_stats=ctx_stats if has_context else None,
n_jobs=min(16, max(1, cpu_count() - 1)),
parallel_backend=parallel_backend,
)
progress.update(completed=i, total=len(tgt_pqt_partitions) + 1)
_LOG.info(f"ENCODE_TABULAR finished in {time.time() - t0:.2f}s")
Expand All @@ -97,6 +99,7 @@ def _encode_partition(
ctx_partition_file: Path | None = None,
ctx_stats: dict | None = None,
n_jobs: int = 1,
parallel_backend: str = "loky",
) -> None:
seq_len_stats = get_sequence_length_stats(tgt_stats)
is_sequential = tgt_stats["is_sequential"]
Expand All @@ -112,6 +115,7 @@ def _encode_partition(
ctx_primary_key=None,
tgt_context_key=tgt_context_key,
n_jobs=n_jobs,
parallel_backend=parallel_backend,
)

has_context = ctx_partition_file is not None and tgt_context_key and ctx_primary_key
Expand All @@ -128,6 +132,7 @@ def _encode_partition(
ctx_primary_key=ctx_primary_key,
tgt_context_key=None,
n_jobs=n_jobs,
parallel_backend=parallel_backend,
)
# pad each list with one extra item
df_ctx = pad_ctx_sequences(df_ctx)
Expand Down Expand Up @@ -185,6 +190,7 @@ def encode_df(
ctx_primary_key: str | None = None,
tgt_context_key: str | None = None,
n_jobs: int = 1,
parallel_backend: str = "loky",
) -> tuple[pd.DataFrame, str | None, str | None]:
"""
Encodes a given table represented by a DataFrame object. The result will be delivered
Expand All @@ -194,6 +200,7 @@ def encode_df(
:param stats: stats for each of the columns
:param ctx_primary_key: context primary key
:param tgt_context_key: target context key
:param parallel_backend: joblib parallel backend to use
:return: encoded data and keys following columns' naming conventions
"""

Expand Down Expand Up @@ -238,7 +245,7 @@ def encode_df(
)
)
if delayed_encodes:
with parallel_config("loky", n_jobs=n_jobs):
with parallel_config(parallel_backend, n_jobs=n_jobs):
df_columns.extend(Parallel()(delayed_encodes))

df = pd.concat(df_columns, axis=1) if df_columns else pd.DataFrame()
Expand Down
8 changes: 6 additions & 2 deletions mostlyai/engine/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def analyze(
differential_privacy: DifferentialPrivacyConfig | None = None,
workspace_dir: str | Path = "engine-ws",
update_progress: ProgressCallback | None = None,
parallel_backend: str = "loky",
) -> None:
"""
Generates (privacy-safe) column-level statistics of the original data, that has been `split` into the workspace.
Expand All @@ -122,6 +123,7 @@ def analyze(
value_protection: Whether to enable value protection for rare values.
workspace_dir: Path to workspace directory containing partitioned data.
update_progress: Optional callback to update progress during analysis.
parallel_backend: Joblib parallel backend to use. Options include 'loky', 'threading', 'multiprocessing', etc.
"""

_LOG.info("ANALYZE started")
Expand Down Expand Up @@ -167,6 +169,7 @@ def analyze(
ctx_primary_key=ctx_primary_key if has_context else None,
ctx_root_key=ctx_root_key,
n_jobs=min(16, max(1, cpu_count() - 1)),
parallel_backend=parallel_backend,
)
progress.update(completed=i, total=len(tgt_pqt_partitions) + 1)

Expand Down Expand Up @@ -221,6 +224,7 @@ def _analyze_partition(
ctx_primary_key: str | None = None,
ctx_root_key: str | None = None,
n_jobs: int = 1,
parallel_backend: str = "loky",
) -> None:
"""
Calculates partial statistics about a single partition.
Expand Down Expand Up @@ -252,7 +256,7 @@ def _analyze_partition(
ctx_root_keys = ctx_primary_keys.rename("__rkey")

# analyze all target columns
with parallel_config("loky", n_jobs=n_jobs):
with parallel_config(parallel_backend, n_jobs=n_jobs):
results = Parallel()(
delayed(_analyze_col)(
values=tgt_df[column],
Expand Down Expand Up @@ -293,7 +297,7 @@ def _analyze_partition(

# analyze all context columns
assert isinstance(ctx_encoding_types, dict)
with parallel_config("loky", n_jobs=n_jobs):
with parallel_config(parallel_backend, n_jobs=n_jobs):
results = Parallel()(
delayed(_analyze_col)(
values=ctx_df[column],
Expand Down
10 changes: 8 additions & 2 deletions mostlyai/engine/encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def encode(
*,
workspace_dir: str | Path = "engine-ws",
update_progress: ProgressCallback | None = None,
parallel_backend: str = "loky",
) -> None:
"""
Encodes data in the workspace that has already been split and analyzed.
Expand All @@ -34,13 +35,18 @@ def encode(
Args:
workspace_dir: Directory path for workspace.
update_progress: Callback for progress updates.
parallel_backend: Joblib parallel backend to use. Options include 'loky', 'threading', 'multiprocessing', etc.
"""
model_type = resolve_model_type(workspace_dir)
if model_type == ModelType.tabular:
from mostlyai.engine._tabular.encoding import encode as encode_tabular

return encode_tabular(workspace_dir=workspace_dir, update_progress=update_progress)
return encode_tabular(
workspace_dir=workspace_dir, update_progress=update_progress, parallel_backend=parallel_backend
)
else:
from mostlyai.engine._language.encoding import encode as encode_language

return encode_language(workspace_dir=workspace_dir, update_progress=update_progress)
return encode_language(
workspace_dir=workspace_dir, update_progress=update_progress, parallel_backend=parallel_backend
)