Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,12 @@
<td>Integer</td>
<td>For streaming write, full compaction will be constantly triggered after delta commits. For batch write, full compaction will be triggered with each commit as long as this value is greater than 0.</td>
</tr>
<tr>
<td><h5>global-index.enabled-in-core </h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>If the compute system does not handle global index, we can enable it in scanner, but this is a single node index scanner with low speed.</td>
</tr>
<tr>
<td><h5>global-index.row-count-per-shard</h5></td>
<td style="word-wrap: break-word;">100000</td>
Expand Down
11 changes: 11 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2091,6 +2091,13 @@ public InlineElement getDescription() {
.defaultValue(100000L)
.withDescription("Row count per shard for global index.");

public static final ConfigOption<Boolean> GLOBAL_INDEX_ENABLED_IN_CORE =
key("global-index.enabled-in-core")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

global-index.enabled

.booleanType()
.defaultValue(true)
.withDescription(
"If the compute system does not handle global index, we can enable it in scanner, but this is a single node index scanner with low speed.");

private final Options options;

public CoreOptions(Map<String, String> options) {
Expand Down Expand Up @@ -3235,6 +3242,10 @@ public long globalIndexRowCountPerShard() {
return options.get(GLOBAL_INDEX_ROW_COUNT_PER_SHARD);
}

public boolean globalIndexEnabledInScan() {
return options.get(GLOBAL_INDEX_ENABLED_IN_CORE);
}

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down
134 changes: 134 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/utils/Range.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -60,6 +62,138 @@ public List<Long> toListLong() {
return longs;
}

/**
* Excludes the given ranges from this range and returns the remaining ranges.
*
* <p>For example, if this range is [0, 10000] and ranges to exclude are [1000, 2000], [3000,
* 4000], [5000, 6000], then the result is [0, 999], [2001, 2999], [4001, 4999], [6001, 10000].
*
* @param ranges the ranges to exclude (can be unsorted and overlapping)
* @return the remaining ranges after exclusion
*/
public List<Range> exclude(List<Range> ranges) {
if (ranges.isEmpty()) {
return Collections.singletonList(this);
}

// Sort ranges by from
List<Range> sorted = new ArrayList<>(ranges);
sorted.sort(Comparator.comparingLong(a -> a.from));

List<Range> result = new ArrayList<>();
long current = this.from;

for (Range exclude : sorted) {
// Compute intersection with the current range
Range intersect = Range.intersection(new Range(current, this.to), exclude);
if (intersect == null) {
continue;
}
// Add the part before the intersection (if any)
if (current < intersect.from) {
result.add(new Range(current, intersect.from - 1));
}
// Move current position past the intersection
current = intersect.to + 1;
if (current > this.to) {
break;
}
}

// Add the remaining part after all exclusions (if any)
if (current <= this.to) {
result.add(new Range(current, this.to));
}

return result;
}

/**
* Sorts and merges overlapping ranges.
*
* <p>For example, Range[0,10] and Range[5,15] will be merged to Range[0,15] because they
* overlap. However, Range[0,10] and Range[11,20] will NOT be merged even though they are
* adjacent, because they don't overlap.
*
* @param ranges the ranges to sort and merge (can be unsorted and overlapping)
* @return the sorted and merged ranges with no overlaps
*/
public static List<Range> sortAndMergeOverlap(List<Range> ranges) {
if (ranges == null || ranges.isEmpty()) {
return Collections.emptyList();
}

if (ranges.size() == 1) {
return new ArrayList<>(ranges);
}

// Sort ranges by from
List<Range> sorted = new ArrayList<>(ranges);
sorted.sort(Comparator.comparingLong(r -> r.from));

List<Range> result = new ArrayList<>();
Range current = sorted.get(0);

for (int i = 1; i < sorted.size(); i++) {
Range next = sorted.get(i);
// Check if current and next overlap (not just adjacent)
if (current.to >= next.from) {
// Merge: extend current range
current = new Range(current.from, Math.max(current.to, next.to));
} else {
// No overlap: add current to result and move to next
result.add(current);
current = next;
}
}
// Add the last range
result.add(current);

return result;
}

/**
* Computes the intersection of two lists of ranges.
*
* <p>Assumes that both left and right are sorted and merged already (no overlaps within each
* list).
*
* <p>For example, left=[0,10],[20,30] and right=[5,15],[25,35] will return [5,10],[25,30].
*
* @param left the first list of ranges (must be sorted and merged)
* @param right the second list of ranges (must be sorted and merged)
* @return the intersection of the two lists
*/
public static List<Range> and(List<Range> left, List<Range> right) {
if (left == null || right == null || left.isEmpty() || right.isEmpty()) {
return Collections.emptyList();
}

List<Range> result = new ArrayList<>();
int i = 0;
int j = 0;

while (i < left.size() && j < right.size()) {
Range l = left.get(i);
Range r = right.get(j);

// Compute intersection of current ranges
Range intersect = Range.intersection(l, r);
if (intersect != null) {
result.add(intersect);
}

// Advance the pointer of the range that ends earlier
if (l.to <= r.to) {
i++;
} else {
j++;
}
}

return result;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public RoaringNavigableMap64() {
this.roaring64NavigableMap = new Roaring64NavigableMap();
}

public RoaringNavigableMap64(Range range) {
this.roaring64NavigableMap = new Roaring64NavigableMap();
this.roaring64NavigableMap.addRange(range.from, range.to);
}

private RoaringNavigableMap64(Roaring64NavigableMap bitmap) {
this.roaring64NavigableMap = bitmap;
}
Expand Down
Loading