Skip to content

feat: add parallel_backend parameter to analyze and encode methods#263

Open
MrCosta57 wants to merge 2 commits intomostly-ai:mainfrom
MrCosta57:add-joblib-backend-param
Open

feat: add parallel_backend parameter to analyze and encode methods#263
MrCosta57 wants to merge 2 commits intomostly-ai:mainfrom
MrCosta57:add-joblib-backend-param

Conversation

@MrCosta57
Copy link
Contributor

@MrCosta57 MrCosta57 commented Feb 27, 2026

Summary

This PR introduces a new parallel_backend parameter to the analyze() and encode() methods, allowing users to control the parallelization strategy used by joblib during data processing operations. The default remains "loky" (process-based), but users can now switch to "threading" or other backends depending on their use case.

Motivation

When processing large datasets, the current process-based parallelization (using the "loky" backend) creates independent copies of the data for each worker process. This behavior leads to significant memory overhead, as each process maintains its own copy of df[column], even though it is only accessed in read-only mode during analysis and encoding. This results in inefficient memory usage.

Changes Made

Public API Changes

  1. mostlyai.engine.analysis.analyze()

    • Added parallel_backend: str = "loky" parameter
    • Documented parameter in docstring
  2. mostlyai.engine.encoding.encode()

    • Added parallel_backend: str = "loky" parameter
    • Documented parameter in docstring

Internal Implementation

The parameter is propagated through the encoding and analysis pipeline:

  • mostlyai.engine._tabular.encoding.encode()_encode_partition()encode_df()
  • mostlyai.engine._language.encoding.encode()_encode_partition()
  • mostlyai.engine.analysis._analyze_partition()

All parallel operations now use joblib.parallel_config(parallel_backend, n_jobs=n_jobs) context manager to respect the user's backend choice.

Usage Example

from mostlyai import engine

# Default behavior (process-based, higher memory usage)
engine.analyze(workspace_dir="engine-ws")
engine.encode(workspace_dir="engine-ws")

# Memory-efficient threading backend for large datasets
engine.analyze(
    workspace_dir="engine-ws",
    parallel_backend="threading"  # Shared memory, reduced footprint
)
engine.encode(
    workspace_dir="engine-ws",
    parallel_backend="threading"
)

Backwards Compatibility

Fully backward compatible - The default value of "loky" maintains existing behavior for all users who don't explicitly specify the parameter.


Note

Medium Risk
Touches core analysis/encoding parallelization behavior; incorrect backend selection or backend-specific joblib behavior could impact performance or memory usage, though defaults preserve existing behavior.

Overview
Exposes a new parallel_backend parameter (default "loky") on the public analyze() and encode() entrypoints to let callers choose the joblib backend used for parallel work.

Propagates this option through the tabular analysis/encoding pipeline so parallel_config() uses the selected backend when analyzing columns and encoding dataframes, and updates docstrings accordingly.

Written by Cursor Bugbot for commit 9b5a1e0. This will update automatically on new commits. Configure here.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Bugbot Free Tier Details

Your team is on the Bugbot Free tier. On this plan, Bugbot will review limited PRs each billing cycle for each member of your team.

To receive Bugbot reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

def encode(
workspace_dir: str | Path | None = None,
update_progress: ProgressCallback | None = None,
parallel_backend: str = "loky",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Language encoding silently ignores parallel_backend parameter

Low Severity

The parallel_backend parameter is added to the language encode() function but is never referenced in the function body. Unlike the tabular path (where it's properly propagated through _encode_partition()encode_df()parallel_config()), the language encode() simply accepts the parameter and discards it. A user passing parallel_backend="threading" for a language model workspace would get no effect, with no warning or error indicating the parameter was ignored.

Fix in Cursor Fix in Web

@mplatzer
Copy link
Contributor

mplatzer commented Mar 2, 2026

can you please provide an ideally reproducible scenario where one would run into memory issues during analyze / encode ?

@MrCosta57
Copy link
Contributor Author

For a project I'm working on, I'm synthesizing large dataframes that are stored in parquet format and read using Pandas with the PyArrow backend for efficiency reasons. Since the dataframes occupy more than 2GB, it's also necessary to convert string columns to pyarrow large_string, otherwise I get an overflow error from PyArrow.

The result is a 40GB sequential dataframe in RAM (90 million rows and 45 columns) using PyArrow types, while the context data is less than 500MB (3 million rows and 10 columns). I'm executing on a VM code similar to this example, but with the larger dataset I mentioned. It might be a bit difficult for you to reproduce.

import pathlib

import pandas as pd
import pyarrow as pa

from mostlyai import engine

# Define configs
WORKSPACE_DIR = pathlib.Path("baseball-ws")
DATA_DIR = WORKSPACE_DIR / "data"
DATA_DIR.mkdir(exist_ok=True, parents=True)


# Util functions
def convert_to_large_string(df: pd.DataFrame) -> pd.DataFrame:
    for col in df.columns:
        dtype = df[col].dtype
        if pd.api.types.is_string_dtype(dtype):
            df[col] = df[col].astype(pd.ArrowDtype(pa.large_string()))
    return df


# Prepare data
url = "https://github.com/mostly-ai/public-demo-data/raw/refs/heads/dev/baseball"
trn_ctx_df = pd.read_csv(f"{url}/players.csv.gz")  # context data
trn_tgt_df = pd.read_csv(f"{url}/batting.csv.gz")  # target data
trn_ctx_df.to_parquet(DATA_DIR / "players.parquet", index=False)
trn_tgt_df.to_parquet(DATA_DIR / "batting.parquet", index=False)

# Load data, split, analyze, encode and train
trn_ctx_df = pd.read_parquet(DATA_DIR / "players.parquet", engine="pyarrow", dtype_backend="pyarrow")
trn_tgt_df = pd.read_parquet(DATA_DIR / "batting.parquet", engine="pyarrow", dtype_backend="pyarrow")

# Convert string columns to large string type
trn_ctx_df = convert_to_large_string(trn_ctx_df)
trn_tgt_df = convert_to_large_string(trn_tgt_df)

engine.split(
    workspace_dir=WORKSPACE_DIR,
    tgt_data=trn_tgt_df,
    ctx_data=trn_ctx_df,
    tgt_context_key="players_id",
    ctx_primary_key="id",
)
engine.analyze(workspace_dir=WORKSPACE_DIR)

While executing the analyze phase, the process is killed by the VM due to an out-of-memory error. I solved the problem for now by setting the n_partitions parameter to >1 and by deleting input dataframes and running the garbage collector before engine.analyze().

I think that in any case, using threads with joblib could be beneficial because Pandas Series seem to be serialized and copied among processes, as the joblib documentation states (https://joblib.readthedocs.io/en/stable/parallel.html#working-with-numerical-data-in-shared-memory-memmapping). The special case that allows data sharing is if numpy arrays are used, because they are passed as np.memmap and read without duplication, but I think this is not triggered for Pandas Series or PyArrow arrays.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants