[Enh]: Add Expr.map_batches to pyspark#3579
Conversation
c95fa6f to
e5b315f
Compare
|
|
|
I don't understand this test failure: https://github.com/narwhals-dev/narwhals/actions/runs/25050142610/job/73375481418?pr=3579 This test coverage test is also strange: https://github.com/narwhals-dev/narwhals/actions/runs/25050142614/job/73375481310?pr=3579 |
491bdd7 to
0d622c0
Compare
|
Hey @pedro-villanueva-bcom - thanks for taking the initiative! I am not sure we should support map_batches for lazy backends - I am open to see how this will play out! Regarding your questions:
I am not sure, but it would not be the first time that something is passing for pyspark but not for pyspark-connect.
Coverage is calculated with SQLFrame backend, so you will need to add a |
Any specific reason for this? In my mind (and use case) udfs are just another type of expression to create a column. It has performance implications for sure, but in my case there's no other choice (that's mostly statistical functions like getting a p-value from a column with a z score for example). My use case is a library of statistical functions for large datasets that works for pyspark, pyspark connect and snowpark. I want to make it work for in-memory backends too to deal with small data and make testing faster too. I discovered narwhals and I'm quite happy with it. The syntax is nice (nicer than ibis) and migrating is not super hard. |
Expr.map _batches to pysparkExpr.map_batches to pyspark
for more information, see https://pre-commit.ci
1572e5f to
00eaa83
Compare
|
thanks for the PR! looking at the docs
I'm also inclined to decline this feature i'm afraid, as it looks like a massive performance footgun Which statistical summaries are you looking to do? In any case, I think I'd suggest making a helper function within your code for this, I'm extremely hesitant to use anything from the pandas api in pyspark |
They are less performant that native expressions, but there's a reason pyspark (and snowpark, and all other backends I know of) allow for udfs: sometimes that's the only way to get something done. I think that the end user has to make the choices and tradeoffs, not the library. We can emit a warning like when using the pandas backend apply has to be called with a complex aggregation. Example udf: @udf(packages=["scipy"])
def norm_p_value(values): # type: ignore[no-untyped-def]
"""Normal distribution survival function (vectorized)."""
import scipy.stats as stats
if value is None:
return None
result = stats.norm.sf(value)
return result.item() if np.ndim(result) == 0 else result
Then I have to use it like tmp_input_col = f"__{col}_abs"
df = df.with_columns(nw.col(z_col).abs().alias(tmp_input_col))
df = norm_p_value(df, tmp_input_col, output_col)f.drop(tmp_input_col)Instead of simply which then I can keep chaining. Not having |
|
thanks - would it have to be pandas udf, or would arrow udf be fine? |
I think that it can be a "normal" udf, I used a pandas one because the map_batches documentation says:
pandas udfs are also more performant that normal ones, but I'm ok changing the PR if you think it's better (we could even let the user select the type with an optional param) |
Description
Expr.map _batchescan be used when native expressions aren't enough, for example for statistical functions. Pyspark has several types of UDFs, including pandas UDFS that matches very well withmap _batches. This PR implementsmap _batchesusing pandas UDFs. The optional paramreturns_scalaris not supported, as pyspark doesn't allow this. UDF must return either a pandas Series, something that can be transformed into one, or a scalar that will be broadcast to one.The only change external to the spark backend is in the kind of the map_batches node, which has been changes from ordered to unordered.
Additionally, the testing fixture that creates the spark session now add the
PYSPARK_PYTHONenv var so that UDFs use that python to run (including using whatever packages are installed).What type of PR is this? (check all applicable)
Related issues
Checklist