Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,20 @@ public static RowType rowTypeWithRowTracking(RowType rowType, boolean sequenceNu
: SpecialFields.SEQUENCE_NUMBER);
return new RowType(fieldsWithRowTracking);
}

public static RowType rowTypeWithRowId(RowType rowType) {
List<DataField> fieldsWithRowTracking = new ArrayList<>(rowType.getFields());

fieldsWithRowTracking.forEach(
f -> {
if (ROW_ID.name().equals(f.name())) {
throw new IllegalArgumentException(
"Row tracking field name '"
+ f.name()
+ "' conflicts with existing field names.");
}
});
fieldsWithRowTracking.add(SpecialFields.ROW_ID);
return new RowType(fieldsWithRowTracking);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,22 @@

import java.io.Closeable;
import java.util.List;
import java.util.Optional;

/** Index reader for global index, return {@link GlobalIndexResult}. */
public interface GlobalIndexReader extends FunctionVisitor<Optional<GlobalIndexResult>>, Closeable {
public interface GlobalIndexReader extends FunctionVisitor<GlobalIndexResult>, Closeable {

@Override
default Optional<GlobalIndexResult> visitAnd(List<Optional<GlobalIndexResult>> children) {
default GlobalIndexResult visitAnd(List<GlobalIndexResult> children) {
throw new UnsupportedOperationException("Should not invoke this");
}

@Override
default Optional<GlobalIndexResult> visitOr(List<Optional<GlobalIndexResult>> children) {
default GlobalIndexResult visitOr(List<GlobalIndexResult> children) {
throw new UnsupportedOperationException("Should not invoke this");
}

@Override
default Optional<GlobalIndexResult> visit(TransformPredicate predicate) {
default GlobalIndexResult visit(TransformPredicate predicate) {
throw new UnsupportedOperationException("Should not invoke this");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
/** Index writer for global index. */
public interface GlobalIndexWriter {

void write(Object key);
void write(@Nullable Object key);

List<ResultEntry> finish();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;

/** A {@link GlobalIndexReader} wrapper for {@link FileIndexReader}. */
Expand All @@ -47,68 +46,68 @@ public FileIndexReaderWrapper(
}

@Override
public Optional<GlobalIndexResult> visitIsNotNull(FieldRef fieldRef) {
return Optional.ofNullable(transform.apply(reader.visitIsNotNull(fieldRef)));
public GlobalIndexResult visitIsNotNull(FieldRef fieldRef) {
return transform.apply(reader.visitIsNotNull(fieldRef));
}

@Override
public Optional<GlobalIndexResult> visitIsNull(FieldRef fieldRef) {
return Optional.ofNullable(transform.apply(reader.visitIsNull(fieldRef)));
public GlobalIndexResult visitIsNull(FieldRef fieldRef) {
return transform.apply(reader.visitIsNull(fieldRef));
}

@Override
public Optional<GlobalIndexResult> visitStartsWith(FieldRef fieldRef, Object literal) {
return Optional.ofNullable(transform.apply(reader.visitStartsWith(fieldRef, literal)));
public GlobalIndexResult visitStartsWith(FieldRef fieldRef, Object literal) {
return transform.apply(reader.visitStartsWith(fieldRef, literal));
}

@Override
public Optional<GlobalIndexResult> visitEndsWith(FieldRef fieldRef, Object literal) {
return Optional.ofNullable(transform.apply(reader.visitEndsWith(fieldRef, literal)));
public GlobalIndexResult visitEndsWith(FieldRef fieldRef, Object literal) {
return transform.apply(reader.visitEndsWith(fieldRef, literal));
}

@Override
public Optional<GlobalIndexResult> visitContains(FieldRef fieldRef, Object literal) {
return Optional.ofNullable(transform.apply(reader.visitContains(fieldRef, literal)));
public GlobalIndexResult visitContains(FieldRef fieldRef, Object literal) {
return transform.apply(reader.visitContains(fieldRef, literal));
}

@Override
public Optional<GlobalIndexResult> visitLessThan(FieldRef fieldRef, Object literal) {
return Optional.ofNullable(transform.apply(reader.visitLessThan(fieldRef, literal)));
public GlobalIndexResult visitLessThan(FieldRef fieldRef, Object literal) {
return transform.apply(reader.visitLessThan(fieldRef, literal));
}

@Override
public Optional<GlobalIndexResult> visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
return Optional.ofNullable(transform.apply(reader.visitGreaterOrEqual(fieldRef, literal)));
public GlobalIndexResult visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
return transform.apply(reader.visitGreaterOrEqual(fieldRef, literal));
}

@Override
public Optional<GlobalIndexResult> visitNotEqual(FieldRef fieldRef, Object literal) {
return Optional.ofNullable(transform.apply(reader.visitNotEqual(fieldRef, literal)));
public GlobalIndexResult visitNotEqual(FieldRef fieldRef, Object literal) {
return transform.apply(reader.visitNotEqual(fieldRef, literal));
}

@Override
public Optional<GlobalIndexResult> visitLessOrEqual(FieldRef fieldRef, Object literal) {
return Optional.ofNullable(transform.apply(reader.visitLessOrEqual(fieldRef, literal)));
public GlobalIndexResult visitLessOrEqual(FieldRef fieldRef, Object literal) {
return transform.apply(reader.visitLessOrEqual(fieldRef, literal));
}

@Override
public Optional<GlobalIndexResult> visitEqual(FieldRef fieldRef, Object literal) {
return Optional.ofNullable(transform.apply(reader.visitEqual(fieldRef, literal)));
public GlobalIndexResult visitEqual(FieldRef fieldRef, Object literal) {
return transform.apply(reader.visitEqual(fieldRef, literal));
}

@Override
public Optional<GlobalIndexResult> visitGreaterThan(FieldRef fieldRef, Object literal) {
return Optional.ofNullable(transform.apply(reader.visitGreaterThan(fieldRef, literal)));
public GlobalIndexResult visitGreaterThan(FieldRef fieldRef, Object literal) {
return transform.apply(reader.visitGreaterThan(fieldRef, literal));
}

@Override
public Optional<GlobalIndexResult> visitIn(FieldRef fieldRef, List<Object> literals) {
return Optional.ofNullable(transform.apply(reader.visitIn(fieldRef, literals)));
public GlobalIndexResult visitIn(FieldRef fieldRef, List<Object> literals) {
return transform.apply(reader.visitIn(fieldRef, literals));
}

@Override
public Optional<GlobalIndexResult> visitNotIn(FieldRef fieldRef, List<Object> literals) {
return Optional.ofNullable(transform.apply(reader.visitNotIn(fieldRef, literals)));
public GlobalIndexResult visitNotIn(FieldRef fieldRef, List<Object> literals) {
return transform.apply(reader.visitNotIn(fieldRef, literals));
}

@Override
Expand Down
13 changes: 12 additions & 1 deletion paimon-common/src/main/java/org/apache/paimon/utils/Range.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

package org.apache.paimon.utils;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/** Range represents from (inclusive) and to (inclusive). */
public class Range {
public class Range implements Serializable {

public final long from;
public final long to;
Expand Down Expand Up @@ -49,6 +52,14 @@ public boolean isAfter(Range other) {
return from > other.to;
}

public List<Long> toListLong() {
List<Long> longs = new ArrayList<>();
for (long i = from; i <= to; i++) {
longs.add(i);
}
return longs;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,26 +93,23 @@ private void testStringType(int version) throws Exception {
writer.write(o);
}
});
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, a).get())
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, a))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(0, 4));
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, b).get())
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, b))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(2));
assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef).get())
assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(1, 3));
assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef, Arrays.asList(a, b)).get())
assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef, Arrays.asList(a, b)))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(0, 2, 4));
assert !reader.visitEqual(fieldRef, BinaryString.fromString("c"))
.get()
.iterator()
.hasNext();
assert !reader.visitEqual(fieldRef, BinaryString.fromString("c")).iterator().hasNext();
}

private void testIntType(int version) throws Exception {
Expand All @@ -128,24 +125,24 @@ private void testIntType(int version) throws Exception {
writer.write(o);
}
});
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 0).get())
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 0))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(0));
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 1).get())
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 1))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(1));
assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef).get())
assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(2));
assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef, Arrays.asList(0, 1, 2)).get())
assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef, Arrays.asList(0, 1, 2)))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(0, 1));

assert !reader.visitEqual(fieldRef, 2).get().iterator().hasNext();
assert !reader.visitEqual(fieldRef, 2).iterator().hasNext();
}

private void testBooleanType(int version) throws Exception {
Expand All @@ -161,11 +158,11 @@ private void testBooleanType(int version) throws Exception {
writer.write(o);
}
});
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, Boolean.TRUE).get())
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, Boolean.TRUE))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(0, 2));
assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef).get())
assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(4));
Expand Down Expand Up @@ -202,13 +199,12 @@ private void testHighCardinality(
long time2 = System.currentTimeMillis();
GlobalIndexResult result =
reader.visitEqual(
fieldRef, BinaryString.fromString(prefix + (approxCardinality / 2)))
.get();
fieldRef, BinaryString.fromString(prefix + (approxCardinality / 2)));
RoaringBitmap32 resultBm = ((BitmapIndexResultWrapper) result).getBitmapIndexResult().get();
System.out.println("read time: " + (System.currentTimeMillis() - time2));
assert resultBm.equals(middleBm);
long time3 = System.currentTimeMillis();
GlobalIndexResult resultNull = reader.visitIsNull(fieldRef).get();
GlobalIndexResult resultNull = reader.visitIsNull(fieldRef);
RoaringBitmap32 resultNullBm =
((BitmapIndexResultWrapper) resultNull).getBitmapIndexResult().get();
System.out.println("read null bitmap time: " + (System.currentTimeMillis() - time3));
Expand Down Expand Up @@ -277,26 +273,23 @@ private void testStringTypeWithReusing(int version) throws Exception {
a.pointTo(c.getSegments(), c.getOffset(), c.getSizeInBytes());
writer.write(null);
});
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, a).get())
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, a))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(0));
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, b).get())
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, b))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(3));
assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef).get())
assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(1, 2, 4, 5));
assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef, Arrays.asList(a, b)).get())
assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef, Arrays.asList(a, b)))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(0, 3));
assert !reader.visitEqual(fieldRef, BinaryString.fromString("c"))
.get()
.iterator()
.hasNext();
assert !reader.visitEqual(fieldRef, BinaryString.fromString("c")).iterator().hasNext();
}

private void testAllNull(int version) throws Exception {
Expand All @@ -312,10 +305,10 @@ private void testAllNull(int version) throws Exception {
writer.write(o);
}
});
assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef).get())
assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(0, 1, 2));
assert !reader.visitIsNotNull(fieldRef).get().iterator().hasNext();
assert !reader.visitIsNotNull(fieldRef).iterator().hasNext();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,18 @@ public Optional<GlobalIndexResult> visit(LeafPredicate predicate) {
Collection<GlobalIndexReader> readers =
indexReadersCache.computeIfAbsent(fieldId, readersFunction::apply);
for (GlobalIndexReader fileIndexReader : readers) {
Optional<GlobalIndexResult> childResult =
GlobalIndexResult childResult =
predicate.function().visit(fileIndexReader, fieldRef, predicate.literals());

// AND Operation
if (childResult.isPresent()) {
if (compoundResult.isPresent()) {
GlobalIndexResult r1 = compoundResult.get();
GlobalIndexResult r2 = childResult.get();
compoundResult = Optional.of(r1.and(r2));
} else {
compoundResult = childResult;
}
if (compoundResult.isPresent()) {
GlobalIndexResult r1 = compoundResult.get();
compoundResult = Optional.of(r1.and(childResult));
} else {
compoundResult = Optional.of(childResult);
}

if (compoundResult.isPresent() && !compoundResult.get().iterator().hasNext()) {
if (!compoundResult.get().iterator().hasNext()) {
return compoundResult;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public Path filePath(String fileName) {
return indexPathFactory.toPath(fileName);
}

public long fileSize(String fileName) throws IOException {
return fileIO.getFileSize(filePath(fileName));
}

public OutputStream newOutputStream(String fileName) throws IOException {
return fileIO.newOutputStream(indexPathFactory.toPath(fileName), true);
}
Expand Down
Loading
Loading