diff --git a/docs/source/user-guide/common-operations/joins.rst b/docs/source/user-guide/common-operations/joins.rst index 035d7488d..1d9d70385 100644 --- a/docs/source/user-guide/common-operations/joins.rst +++ b/docs/source/user-guide/common-operations/joins.rst @@ -107,9 +107,9 @@ Duplicate Keys -------------- It is common to join two DataFrames on a common column name. Starting in -version 51.0.0, ``datafusion-python``` will now drop duplicate column names by +version 51.0.0, ``datafusion-python``` will now coalesce on column with identical names by default. This reduces problems with ambiguous column selection after joins. -You can disable this feature by setting the parameter ``drop_duplicate_keys`` +You can disable this feature by setting the parameter ``coalesce_duplicate_keys`` to ``False``. .. ipython:: python @@ -133,4 +133,4 @@ In contrast to the above example, if we wish to get both columns: .. ipython:: python - left.join(right, "id", how="inner", drop_duplicate_keys=False) + left.join(right, "id", how="inner", coalesce_duplicate_keys=False) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 12f1145a8..d302c12a5 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -778,7 +778,7 @@ def join( left_on: None = None, right_on: None = None, join_keys: None = None, - drop_duplicate_keys: bool = True, + coalesce_duplicate_keys: bool = True, ) -> DataFrame: ... @overload @@ -791,7 +791,7 @@ def join( left_on: str | Sequence[str], right_on: str | Sequence[str], join_keys: tuple[list[str], list[str]] | None = None, - drop_duplicate_keys: bool = True, + coalesce_duplicate_keys: bool = True, ) -> DataFrame: ... @overload @@ -804,7 +804,7 @@ def join( join_keys: tuple[list[str], list[str]], left_on: None = None, right_on: None = None, - drop_duplicate_keys: bool = True, + coalesce_duplicate_keys: bool = True, ) -> DataFrame: ... def join( @@ -816,7 +816,7 @@ def join( left_on: str | Sequence[str] | None = None, right_on: str | Sequence[str] | None = None, join_keys: tuple[list[str], list[str]] | None = None, - drop_duplicate_keys: bool = True, + coalesce_duplicate_keys: bool = True, ) -> DataFrame: """Join this :py:class:`DataFrame` with another :py:class:`DataFrame`. @@ -829,9 +829,9 @@ def join( "right", "full", "semi", "anti". left_on: Join column of the left dataframe. right_on: Join column of the right dataframe. - drop_duplicate_keys: When True, the columns from the right DataFrame - that have identical names in the ``on`` fields to the left DataFrame - will be dropped. + coalesce_duplicate_keys: When True, coalesce the columns + from the right DataFrame and left DataFrame + that have identical names in the ``on`` fields. join_keys: Tuple of two lists of column names to join on. [Deprecated] Returns: @@ -879,7 +879,7 @@ def join( right_on = [right_on] return DataFrame( - self.df.join(right.df, how, left_on, right_on, drop_duplicate_keys) + self.df.join(right.df, how, left_on, right_on, coalesce_duplicate_keys) ) def join_on( diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index 486159165..254a1e0b6 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -663,7 +663,7 @@ def test_join(): df1 = ctx.create_dataframe([[batch]], "r") df2 = df.join(df1, on="a", how="inner") - df2 = df2.sort(column("l.a")) + df2 = df2.sort(column("a")) table = pa.Table.from_batches(df2.collect()) expected = {"a": [1, 2], "c": [8, 10], "b": [4, 5]} @@ -673,8 +673,10 @@ def test_join(): # Since we may have a duplicate column name and pa.Table() # hides the fact, instead we need to explicitly check the # resultant arrays. - df2 = df.join(df1, left_on="a", right_on="a", how="inner", drop_duplicate_keys=True) - df2 = df2.sort(column("l.a")) + df2 = df.join( + df1, left_on="a", right_on="a", how="inner", coalesce_duplicate_keys=True + ) + df2 = df2.sort(column("a")) result = df2.collect()[0] assert result.num_columns == 3 assert result.column(0) == pa.array([1, 2], pa.int64()) @@ -682,7 +684,7 @@ def test_join(): assert result.column(2) == pa.array([8, 10], pa.int64()) df2 = df.join( - df1, left_on="a", right_on="a", how="inner", drop_duplicate_keys=False + df1, left_on="a", right_on="a", how="inner", coalesce_duplicate_keys=False ) df2 = df2.sort(column("l.a")) result = df2.collect()[0] @@ -695,7 +697,7 @@ def test_join(): # Verify we don't make a breaking change to pre-43.0.0 # where users would pass join_keys as a positional argument df2 = df.join(df1, (["a"], ["a"]), how="inner") - df2 = df2.sort(column("l.a")) + df2 = df2.sort(column("a")) table = pa.Table.from_batches(df2.collect()) expected = {"a": [1, 2], "c": [8, 10], "b": [4, 5]} @@ -720,7 +722,7 @@ def test_join_invalid_params(): with pytest.deprecated_call(): df2 = df.join(df1, join_keys=(["a"], ["a"]), how="inner") df2.show() - df2 = df2.sort(column("l.a")) + df2 = df2.sort(column("a")) table = pa.Table.from_batches(df2.collect()) expected = {"a": [1, 2], "c": [8, 10], "b": [4, 5]} @@ -778,6 +780,35 @@ def test_join_on(): assert table.to_pydict() == expected +def test_join_full_with_drop_duplicate_keys(): + ctx = SessionContext() + + batch = pa.RecordBatch.from_arrays( + [pa.array([1, 3, 5, 7, 9]), pa.array([True, True, True, True, True])], + names=["log_time", "key_frame"], + ) + key_frame = ctx.create_dataframe([[batch]]) + + batch = pa.RecordBatch.from_arrays( + [pa.array([2, 4, 6, 8, 10])], + names=["log_time"], + ) + query_times = ctx.create_dataframe([[batch]]) + + merged = query_times.join( + key_frame, + left_on="log_time", + right_on="log_time", + how="full", + coalesce_duplicate_keys=True, + ) + merged = merged.sort(column("log_time")) + result = merged.collect()[0] + + assert result.num_columns == 2 + assert result.column(0).to_pylist() == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + + def test_join_on_invalid_expr(): ctx = SessionContext() diff --git a/src/dataframe.rs b/src/dataframe.rs index 21eb6e0e2..fa0181ecb 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -649,7 +649,7 @@ impl PyDataFrame { how: &str, left_on: Vec, right_on: Vec, - drop_duplicate_keys: bool, + coalesce_keys: bool, ) -> PyDataFusionResult { let join_type = match how { "inner" => JoinType::Inner, @@ -676,7 +676,7 @@ impl PyDataFrame { None, )?; - if drop_duplicate_keys { + if coalesce_keys { let mutual_keys = left_keys .iter() .zip(right_keys.iter()) @@ -684,15 +684,16 @@ impl PyDataFrame { .map(|(key, _)| *key) .collect::>(); - let fields_to_drop = mutual_keys + let fields_to_coalesce = mutual_keys .iter() .map(|name| { - df.logical_plan() + let qualified_fields = df + .logical_plan() .schema() - .qualified_fields_with_unqualified_name(name) + .qualified_fields_with_unqualified_name(name); + (*name, qualified_fields) }) - .filter(|r| r.len() == 2) - .map(|r| r[1]) + .filter(|(_, fields)| fields.len() == 2) .collect::>(); let expr: Vec = df @@ -702,8 +703,23 @@ impl PyDataFrame { .into_iter() .enumerate() .map(|(idx, _)| df.logical_plan().schema().qualified_field(idx)) - .filter(|(qualifier, f)| !fields_to_drop.contains(&(*qualifier, f))) - .map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field)))) + .filter_map(|(qualifier, field)| { + if let Some((key_name, qualified_fields)) = fields_to_coalesce + .iter() + .find(|(_, qf)| qf.contains(&(qualifier, field))) + { + // Only add the coalesce expression once (when we encounter the first field) + // Skip the second field (it's already included in to coalesce) + if (qualifier, field) == qualified_fields[0] { + let left_col = Expr::Column(Column::from(qualified_fields[0])); + let right_col = Expr::Column(Column::from(qualified_fields[1])); + return Some(coalesce(vec![left_col, right_col]).alias(*key_name)); + } + None + } else { + Some(Expr::Column(Column::from((qualifier, field)))) + } + }) .collect(); df = df.select(expr)?; }