Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.stargate.sgv2.jsonapi.service.operation.collections;

import com.datastax.oss.driver.api.core.AsyncPagingIterable;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
Expand Down Expand Up @@ -389,21 +390,37 @@ default Uni<CountResponse> countDocumentsByKey(
QueryExecutor queryExecutor,
SimpleStatement simpleStatement) {
AtomicLong counter = new AtomicLong();
final CompletionStage<AsyncResultSet> async =
queryExecutor
.executeCount(dataApiRequestInfo, simpleStatement)
.whenComplete(
(rs, error) -> {
getCount(rs, error, counter);
});

return Uni.createFrom()
.completionStage(async)
return Multi.createBy()
.repeating()
.uni(
() -> new AtomicReference<AsyncResultSet>(null), // the state passed to the producer
stateRef -> {
Uni<AsyncResultSet> result =
stateRef.get() == null
? Uni.createFrom()
.completionStage(
queryExecutor.executeCount(
dataApiRequestInfo,
simpleStatement)) // AsyncResultSet is null so first page
: Uni.createFrom()
.completionStage(stateRef.get().fetchNextPage()); // After first page
// returning result for looping, this is remembering the rows as we page through them
return result
.onItem()
.invoke(
rs -> {
counter.addAndGet(rs.remaining());
stateRef.set(rs);
});
})
// Documents read until pageState available, max records read is deleteLimit + 1 TODO
// COMMENTS
.whilst(AsyncPagingIterable::hasMorePages)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question: where is the "max count" checked? Looks like this would read through all entries and not stop at specific count? (I am probably missing something)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would be set in the CQL limit, the page size we tell the driver is how to chunk LIMIT number of rows.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like the code we came up with for in memory sorting for tables

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Yes, since that's within SimpleStatement being passed.

.collect()
.asList()
.onItem()
.transform(
rs -> {
return new CountResponse(counter.get());
});
.transformToUni(resultSets -> Uni.createFrom().item(new CountResponse(counter.get())));
}

/**
Expand Down