diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/collections/CollectionReadOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/collections/CollectionReadOperation.java index e26a177757..d36cd781a9 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/collections/CollectionReadOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/collections/CollectionReadOperation.java @@ -1,5 +1,6 @@ package io.stargate.sgv2.jsonapi.service.operation.collections; +import com.datastax.oss.driver.api.core.AsyncPagingIterable; import com.datastax.oss.driver.api.core.cql.AsyncResultSet; import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.cql.SimpleStatement; @@ -389,21 +390,37 @@ default Uni countDocumentsByKey( QueryExecutor queryExecutor, SimpleStatement simpleStatement) { AtomicLong counter = new AtomicLong(); - final CompletionStage async = - queryExecutor - .executeCount(dataApiRequestInfo, simpleStatement) - .whenComplete( - (rs, error) -> { - getCount(rs, error, counter); - }); - return Uni.createFrom() - .completionStage(async) + return Multi.createBy() + .repeating() + .uni( + () -> new AtomicReference(null), // the state passed to the producer + stateRef -> { + Uni result = + stateRef.get() == null + ? Uni.createFrom() + .completionStage( + queryExecutor.executeCount( + dataApiRequestInfo, + simpleStatement)) // AsyncResultSet is null so first page + : Uni.createFrom() + .completionStage(stateRef.get().fetchNextPage()); // After first page + // returning result for looping, this is remembering the rows as we page through them + return result + .onItem() + .invoke( + rs -> { + counter.addAndGet(rs.remaining()); + stateRef.set(rs); + }); + }) + // Documents read until pageState available, max records read is deleteLimit + 1 TODO + // COMMENTS + .whilst(AsyncPagingIterable::hasMorePages) + .collect() + .asList() .onItem() - .transform( - rs -> { - return new CountResponse(counter.get()); - }); + .transformToUni(resultSets -> Uni.createFrom().item(new CountResponse(counter.get()))); } /**