Skip to content

Commit e38acfc

Browse files
committed
fix: eagerly remove torn down operations from Batcher
1 parent e108c9e commit e38acfc

File tree

2 files changed

+45
-48
lines changed

2 files changed

+45
-48
lines changed

packages/core/src/GqlClient.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ export class GqlClient {
7474
requestPolicy: context.cache ? 'cache-and-network' : 'network-only',
7575
preferGetMethod: false, // since @urql/[email protected]
7676
exchanges: this.exchanges(),
77+
fetchOptions: this.getFetchOptions(),
7778
});
7879
}
7980

@@ -409,16 +410,21 @@ export class GqlClient {
409410
batchFetchExchange({
410411
batchInterval: 1,
411412
maxBatchSize: 10,
412-
fetchOptions: {
413-
credentials: 'omit',
414-
headers: this.context.headers,
415-
},
413+
url: this.context.environment.backend,
414+
fetchOptions: this.getFetchOptions(),
416415
}),
417416
);
418417

419418
return exchanges;
420419
}
421420

421+
private getFetchOptions(): RequestInit {
422+
return {
423+
credentials: 'omit',
424+
headers: this.context.headers,
425+
};
426+
}
427+
422428
private registerQuery(op: Operation): void {
423429
this.activeQueries.set(op.key, op);
424430
this.logger.debug(

packages/core/src/batching.ts

Lines changed: 35 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -30,52 +30,40 @@ import {
3030
} from 'wonka';
3131

3232
class Batcher {
33-
private queuesByUrl = new Map<string, Operation[]>();
33+
private queue = new Map<number, Operation>();
3434
private flushTimer: ReturnType<typeof setTimeout> | null = null;
35-
private onBatchReady: (url: string, operations: Operation[]) => void =
36-
() => {};
35+
private flushing = false;
36+
private onBatchReady: (operations: Operation[]) => void = () => {};
3737

3838
constructor(
3939
private readonly batchInterval: number,
4040
private readonly maxBatchSize: number,
4141
) {}
4242

4343
push(operation: Operation): void {
44-
const url = operation.context.url;
45-
const queue = this.queuesByUrl.get(url);
44+
this.queue.set(operation.key, operation);
4645

47-
if (queue) {
48-
queue.push(operation);
49-
} else {
50-
this.queuesByUrl.set(url, [operation]);
51-
}
52-
53-
if (this.shouldFlush()) {
46+
if (this.shouldFlush() && !this.flushing) {
5447
this.cancelScheduledFlush();
5548
this.flushBatch();
5649
return;
5750
}
5851

59-
if (this.isFirstOperation()) {
52+
if (!this.flushTimer && !this.flushing) {
6053
this.scheduleFlush();
6154
}
6255
}
6356

64-
onBatch(handler: (url: string, batch: Operation[]) => void): void {
65-
this.onBatchReady = handler;
57+
remove(operation: Operation): void {
58+
this.queue.delete(operation.key);
6659
}
6760

68-
private shouldFlush(): boolean {
69-
for (const queue of this.queuesByUrl.values()) {
70-
if (queue.length >= this.maxBatchSize) {
71-
return true;
72-
}
73-
}
74-
return false;
61+
onBatch(handler: (operations: Operation[]) => void): void {
62+
this.onBatchReady = handler;
7563
}
7664

77-
private isFirstOperation(): boolean {
78-
return !this.flushTimer;
65+
private shouldFlush(): boolean {
66+
return this.queue.size >= this.maxBatchSize;
7967
}
8068

8169
private scheduleFlush(): void {
@@ -93,33 +81,27 @@ class Batcher {
9381
}
9482

9583
private flushBatch(): void {
96-
for (const [url, operations] of this.queuesByUrl.entries()) {
97-
if (operations.length === 0) continue;
84+
if (this.flushing) return;
85+
this.flushing = true;
9886

99-
const batch = operations.slice(0, this.maxBatchSize);
100-
const remaining = operations.slice(this.maxBatchSize);
87+
if (this.queue.size === 0) {
88+
this.flushing = false;
89+
return;
90+
}
91+
92+
const batch = Array.from(this.queue.values()).slice(0, this.maxBatchSize);
10193

102-
if (remaining.length > 0) {
103-
this.queuesByUrl.set(url, remaining);
104-
} else {
105-
this.queuesByUrl.delete(url);
106-
}
94+
this.onBatchReady(batch);
10795

108-
this.onBatchReady(url, batch);
96+
for (const op of batch) {
97+
this.queue.delete(op.key);
10998
}
11099

111-
if (this.hasRemainingOperations()) {
100+
if (this.queue.size > 0) {
112101
setTimeout(() => this.flushBatch(), 0);
113102
} else {
114-
this.flushTimer = null;
115-
}
116-
}
117-
118-
private hasRemainingOperations(): boolean {
119-
for (const queue of this.queuesByUrl.values()) {
120-
if (queue.length > 0) return true;
103+
this.flushing = false;
121104
}
122-
return false;
123105
}
124106
}
125107

@@ -145,6 +127,7 @@ function makeSingleRequestSource(
145127
export type BatchFetchExchangeConfig = {
146128
batchInterval: number;
147129
maxBatchSize: number;
130+
url: string;
148131
fetchOptions?: RequestInit | (() => RequestInit);
149132
};
150133

@@ -157,10 +140,12 @@ export type BatchFetchExchangeConfig = {
157140
* - Single-operation batches use standard GraphQL request format (not array)
158141
* - Mutations and subscriptions are never batched
159142
* - Queries can opt-out of batching by setting `context.batch = false`
143+
* - Torn-down operations are automatically removed from pending batches
160144
*/
161145
export function batchFetchExchange({
162146
batchInterval,
163147
maxBatchSize,
148+
url,
164149
fetchOptions,
165150
}: BatchFetchExchangeConfig): Exchange {
166151
return ({ forward }) => {
@@ -203,6 +188,10 @@ export function batchFetchExchange({
203188
(op: Operation) => op.kind === 'query' && op.context.batch !== false,
204189
),
205190
mergeMap((op: Operation) => {
191+
invariant(
192+
op.context.url === url,
193+
`Operation URL mismatch: expected "${url}", got "${op.context.url}"`,
194+
);
206195
batcher.push(op);
207196
return make<OperationResult>(({ next }) => {
208197
const sinks = resultSinks.get(op.key);
@@ -214,6 +203,8 @@ export function batchFetchExchange({
214203
}
215204

216205
return () => {
206+
batcher.remove(op);
207+
217208
const remaining = resultSinks.get(op.key);
218209
if (!remaining) return;
219210

@@ -228,7 +219,7 @@ export function batchFetchExchange({
228219
}),
229220
);
230221

231-
batcher.onBatch((url, operations) => {
222+
batcher.onBatch((operations) => {
232223
invariant(
233224
isNonEmptyArray(operations),
234225
'Expected non-empty array of operations',

0 commit comments

Comments
 (0)