Skip to content
This repository was archived by the owner on Jul 16, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions ci/gpu/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ export CUDA_REL=${CUDA_VERSION%.*}
# Set home to the job's workspace
export HOME=$WORKSPACE

# Parse git describe
export GIT_DESCRIBE_TAG=`git describe --tags`
export MINOR_VERSION=`echo $GIT_DESCRIBE_TAG | grep -o -E '([0-9]\.[0-9])'`

################################################################################
# SETUP - Check environment
################################################################################
Expand All @@ -37,10 +41,7 @@ $CC --version
$CXX --version

logger "Setup new environment..."
conda install -c rapidsai/label/cuda$CUDA_REL -c rapidsai-nightly/label/cuda$CUDA_REL -c nvidia/label/cuda$CUDA_REL -c conda-forge \
'cudf=0.7*' \
'pyarrow=0.12.1' \
'dask>=1.1.5'
conda install "cudf=$MINOR_VERSION.*" "dask>=1.1.5"
pip install git+https://github.com/dask/dask.git --upgrade --no-deps

conda list
Expand Down
79 changes: 0 additions & 79 deletions dask_cudf/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,85 +169,6 @@ def merge(
rsuffix=suffixes[1],
)

def join(self, other, how="left", lsuffix="", rsuffix=""):
"""Join two datatframes

*on* is not supported.
"""
if how == "right":
return other.join(other=self, how="left", lsuffix=rsuffix, rsuffix=lsuffix)

same_names = set(self.columns) & set(other.columns)
if same_names and not (lsuffix or rsuffix):
raise ValueError(
"there are overlapping columns but "
"lsuffix and rsuffix are not defined"
)

left, leftuniques = self._align_divisions()
right, rightuniques = other._align_to_indices(leftuniques)

leftparts = left.to_delayed()
rightparts = right.to_delayed()

@delayed
def part_join(left, right, how):
return left.join(
right, how=how, sort=True, lsuffix=lsuffix, rsuffix=rsuffix
)

def inner_selector():
pivot = 0
for i in range(len(leftparts)):
for j in range(pivot, len(rightparts)):
if leftuniques[i] & rightuniques[j]:
yield leftparts[i], rightparts[j]
pivot = j + 1
break

def left_selector():
pivot = 0
for i in range(len(leftparts)):
for j in range(pivot, len(rightparts)):
if leftuniques[i] & rightuniques[j]:
yield leftparts[i], rightparts[j]
pivot = j + 1
break
else:
yield leftparts[i], None

selector = {"left": left_selector, "inner": inner_selector}[how]

rhs_dtypes = [(k, other._meta.dtypes[k]) for k in other._meta.columns]

@delayed
def fix_column(lhs):
df = cudf.DataFrame()
for k in lhs.columns:
df[k + lsuffix] = lhs[k]

for k, dtype in rhs_dtypes:
data = np.zeros(len(lhs), dtype=dtype)
mask_size = cudf.utils.utils.calc_chunk_size(
data.size, cudf.utils.utils.mask_bitsize
)
mask = np.zeros(mask_size, dtype=cudf.utils.utils.mask_dtype)
sr = cudf.Series.from_masked_array(
data=data, mask=mask, null_count=data.size
)

df[k + rsuffix] = sr.set_index(df.index)

return df

joinedparts = [
(part_join(lhs, rhs, how=how) if rhs is not None else fix_column(lhs))
for lhs, rhs in selector()
]

meta = self._meta.join(other._meta, how=how, lsuffix=lsuffix, rsuffix=rsuffix)
return from_delayed(joinedparts, meta=meta)

def _align_divisions(self):
"""Align so that the values do not split across partitions
"""
Expand Down
19 changes: 13 additions & 6 deletions dask_cudf/tests/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,20 @@ def test_join_inner(left_nrows, right_nrows, left_nkeys, right_nkeys):
expect = expect.to_pandas()

# dask_cudf
left = dgd.from_cudf(left, chunksize=chunksize)
right = dgd.from_cudf(right, chunksize=chunksize)
g_left = dgd.from_cudf(left, chunksize=chunksize)
g_right = dgd.from_cudf(right, chunksize=chunksize)

joined = left.set_index("x").join(
right.set_index("x"), how="inner", lsuffix="l", rsuffix="r"
joined = g_left.set_index("x").join(
g_right.set_index("x"), how="inner", lsuffix="l", rsuffix="r"
)

got = joined.compute().to_pandas()

# Check index
np.testing.assert_array_equal(expect.index.values, got.index.values)
# currently a random number
got.index.name = None
# correct value of 'x'
expect.index.name = None
dd.assert_eq(expect, got)

# Check rows in each groups
expect_rows = {}
Expand All @@ -71,6 +75,9 @@ def gather(df, grows):
@pytest.mark.parametrize("right_nkeys", [4, 5])
@pytest.mark.parametrize("how", ["left", "right"])
def test_join_left(left_nrows, right_nrows, left_nkeys, right_nkeys, how):
if how == "right":
pytest.xfail("Right joins are not yet supported")

chunksize = 50

np.random.seed(0)
Expand Down