Skip to content

Commit de6f206

Browse files
committed
Cap total number of concurrent requests per HTTP/2 connection
1 parent 64b89f2 commit de6f206

5 files changed

Lines changed: 611 additions & 31 deletions

File tree

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java

Lines changed: 85 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@
3232
import java.nio.ByteBuffer;
3333
import java.util.List;
3434
import java.util.Set;
35+
import java.util.concurrent.ConcurrentHashMap;
36+
import java.util.concurrent.ConcurrentMap;
3537
import java.util.concurrent.Future;
38+
import java.util.concurrent.RejectedExecutionException;
39+
import java.util.concurrent.atomic.AtomicInteger;
3640

3741
import org.apache.hc.core5.annotation.Internal;
3842
import org.apache.hc.core5.concurrent.Cancellable;
@@ -43,7 +47,6 @@
4347
import org.apache.hc.core5.function.Callback;
4448
import org.apache.hc.core5.function.Decorator;
4549
import org.apache.hc.core5.function.Resolver;
46-
import org.apache.hc.core5.http.ConnectionClosedException;
4750
import org.apache.hc.core5.http.EntityDetails;
4851
import org.apache.hc.core5.http.Header;
4952
import org.apache.hc.core5.http.HttpException;
@@ -86,6 +89,8 @@
8689
public class H2MultiplexingRequester extends AsyncRequester {
8790

8891
private final H2ConnPool connPool;
92+
private final ConcurrentMap<IOSession, AtomicInteger> inFlightPerSession;
93+
private final int maxRequestsPerConnection;
8994

9095
/**
9196
* Use {@link H2MultiplexingRequesterBootstrap} to create instances of this class.
@@ -100,11 +105,44 @@ public H2MultiplexingRequester(
100105
final Resolver<HttpHost, InetSocketAddress> addressResolver,
101106
final TlsStrategy tlsStrategy,
102107
final IOReactorMetricsListener threadPoolListener,
103-
final IOWorkerSelector workerSelector) {
108+
final IOWorkerSelector workerSelector,
109+
final int maxRequestsPerConnection) {
104110
super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
105111
ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE,
106112
threadPoolListener, workerSelector);
107113
this.connPool = new H2ConnPool(this, addressResolver, tlsStrategy);
114+
this.inFlightPerSession = new ConcurrentHashMap<>();
115+
this.maxRequestsPerConnection = maxRequestsPerConnection;
116+
}
117+
118+
private boolean tryAcquireSlot(final IOSession ioSession, final int max) {
119+
if (max <= 0) {
120+
return true;
121+
}
122+
final AtomicInteger counter = inFlightPerSession.computeIfAbsent(ioSession, s -> new AtomicInteger(0));
123+
for (;;) {
124+
final int q = counter.get();
125+
if (q >= max) {
126+
return false;
127+
}
128+
if (counter.compareAndSet(q, q + 1)) {
129+
return true;
130+
}
131+
}
132+
}
133+
134+
private void releaseSlot(final IOSession ioSession, final int max) {
135+
if (max <= 0) {
136+
return;
137+
}
138+
final AtomicInteger counter = inFlightPerSession.get(ioSession);
139+
if (counter == null) {
140+
return;
141+
}
142+
final int q = counter.decrementAndGet();
143+
if (q <= 0) {
144+
inFlightPerSession.remove(ioSession, counter);
145+
}
108146
}
109147

110148
public void closeIdle(final TimeValue idleTime) {
@@ -182,15 +220,29 @@ private void execute(
182220
if (request.getAuthority() == null) {
183221
request.setAuthority(new URIAuthority(host));
184222
}
223+
if (request.getScheme() == null) {
224+
request.setScheme(host.getSchemeName());
225+
}
185226
connPool.getSession(host, timeout, new FutureCallback<IOSession>() {
186227

187228
@Override
188229
public void completed(final IOSession ioSession) {
230+
if (!tryAcquireSlot(ioSession, maxRequestsPerConnection)) {
231+
exchangeHandler.failed(new RejectedExecutionException(
232+
"Maximum number of concurrent requests per connection reached (max=" + maxRequestsPerConnection + ")"));
233+
exchangeHandler.releaseResources();
234+
return;
235+
}
236+
237+
final AsyncClientExchangeHandler actual = maxRequestsPerConnection > 0
238+
? new ReleasingAsyncClientExchangeHandler(exchangeHandler, () -> releaseSlot(ioSession, maxRequestsPerConnection))
239+
: exchangeHandler;
240+
189241
final AsyncClientExchangeHandler handlerProxy = new AsyncClientExchangeHandler() {
190242

191243
@Override
192244
public void releaseResources() {
193-
exchangeHandler.releaseResources();
245+
actual.releaseResources();
194246
}
195247

196248
@Override
@@ -199,67 +251,70 @@ public void produceRequest(final RequestChannel channel, final HttpContext httpC
199251
}
200252

201253
@Override
202-
public int available() {
203-
return exchangeHandler.available();
254+
public void consumeResponse(
255+
final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
256+
actual.consumeResponse(response, entityDetails, httpContext);
204257
}
205258

206259
@Override
207-
public void produce(final DataStreamChannel channel) throws IOException {
208-
exchangeHandler.produce(channel);
260+
public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
261+
actual.consumeInformation(response, httpContext);
209262
}
210263

211264
@Override
212-
public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
213-
exchangeHandler.consumeInformation(response, httpContext);
265+
public int available() {
266+
return actual.available();
214267
}
215268

216269
@Override
217-
public void consumeResponse(
218-
final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
219-
exchangeHandler.consumeResponse(response, entityDetails, httpContext);
270+
public void produce(final DataStreamChannel channel) throws IOException {
271+
actual.produce(channel);
220272
}
221273

222274
@Override
223275
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
224-
exchangeHandler.updateCapacity(capacityChannel);
276+
actual.updateCapacity(capacityChannel);
225277
}
226278

227279
@Override
228280
public void consume(final ByteBuffer src) throws IOException {
229-
exchangeHandler.consume(src);
281+
actual.consume(src);
230282
}
231283

232284
@Override
233285
public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
234-
exchangeHandler.streamEnd(trailers);
286+
actual.streamEnd(trailers);
235287
}
236288

237289
@Override
238290
public void cancel() {
239-
exchangeHandler.cancel();
291+
actual.cancel();
240292
}
241293

242294
@Override
243295
public void failed(final Exception cause) {
244-
exchangeHandler.failed(cause);
296+
actual.failed(cause);
245297
}
246298

247299
};
248300
final Timeout socketTimeout = ioSession.getSocketTimeout();
249-
ioSession.enqueue(new RequestExecutionCommand(
250-
handlerProxy,
251-
pushHandlerFactory,
252-
context,
253-
streamControl -> {
254-
cancellableDependency.setDependency(streamControl);
255-
if (socketTimeout != null) {
256-
streamControl.setTimeout(socketTimeout);
257-
}
258-
}),
259-
Command.Priority.NORMAL);
260-
if (!ioSession.isOpen()) {
261-
exchangeHandler.failed(new ConnectionClosedException());
301+
try {
302+
ioSession.enqueue(new RequestExecutionCommand(
303+
handlerProxy,
304+
pushHandlerFactory,
305+
context,
306+
streamControl -> {
307+
cancellableDependency.setDependency(streamControl);
308+
if (socketTimeout != null) {
309+
streamControl.setTimeout(socketTimeout);
310+
}
311+
}),
312+
Command.Priority.NORMAL);
313+
} catch (final RuntimeException ex) {
314+
actual.failed(ex);
315+
actual.releaseResources();
262316
}
317+
263318
}
264319

265320
@Override

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.ArrayList;
3030
import java.util.List;
3131

32+
import org.apache.hc.core5.annotation.Experimental;
3233
import org.apache.hc.core5.function.Callback;
3334
import org.apache.hc.core5.function.Decorator;
3435
import org.apache.hc.core5.function.Supplier;
@@ -76,6 +77,8 @@ public class H2MultiplexingRequesterBootstrap {
7677

7778
private IOReactorMetricsListener threadPoolListener;
7879

80+
private int maxRequestsPerConnection;
81+
7982
private H2MultiplexingRequesterBootstrap() {
8083
this.routeEntries = new ArrayList<>();
8184
}
@@ -180,6 +183,21 @@ public final H2MultiplexingRequesterBootstrap setIOReactorMetricsListener(final
180183
return this;
181184
}
182185

186+
/**
187+
* Sets a hard cap on the number of requests allowed to be queued / in-flight per connection.
188+
* When the limit is reached, new submissions fail fast with {@link java.util.concurrent.RejectedExecutionException}.
189+
* A value {@code <= 0} means unlimited (default).
190+
*
191+
* @param max maximum number of requests per connection; {@code <= 0} to disable the cap
192+
* @return this instance.
193+
* @since 5.5
194+
*/
195+
@Experimental
196+
public final H2MultiplexingRequesterBootstrap setMaxRequestsPerConnection(final int max) {
197+
this.maxRequestsPerConnection = max;
198+
return this;
199+
}
200+
183201
/**
184202
* Sets {@link H2StreamListener} instance.
185203
*
@@ -274,7 +292,8 @@ public H2MultiplexingRequester create() {
274292
DefaultAddressResolver.INSTANCE,
275293
tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy(),
276294
threadPoolListener,
277-
null);
295+
null,
296+
maxRequestsPerConnection);
278297
}
279298

280299
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.hc.core5.http2.impl.nio.bootstrap;
28+
29+
import java.io.IOException;
30+
import java.nio.ByteBuffer;
31+
import java.util.List;
32+
import java.util.concurrent.atomic.AtomicBoolean;
33+
34+
import org.apache.hc.core5.http.EntityDetails;
35+
import org.apache.hc.core5.http.Header;
36+
import org.apache.hc.core5.http.HttpException;
37+
import org.apache.hc.core5.http.HttpResponse;
38+
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
39+
import org.apache.hc.core5.http.nio.CapacityChannel;
40+
import org.apache.hc.core5.http.nio.DataStreamChannel;
41+
import org.apache.hc.core5.http.nio.RequestChannel;
42+
import org.apache.hc.core5.http.protocol.HttpContext;
43+
44+
final class ReleasingAsyncClientExchangeHandler implements AsyncClientExchangeHandler {
45+
46+
private final AsyncClientExchangeHandler exchangeHandler;
47+
private final Runnable onRelease;
48+
private final AtomicBoolean released;
49+
50+
ReleasingAsyncClientExchangeHandler(final AsyncClientExchangeHandler exchangeHandler, final Runnable onRelease) {
51+
this.exchangeHandler = exchangeHandler;
52+
this.onRelease = onRelease;
53+
this.released = new AtomicBoolean(false);
54+
}
55+
56+
@Override
57+
public void produceRequest(final RequestChannel channel, final HttpContext context) throws HttpException, IOException {
58+
exchangeHandler.produceRequest(channel, context);
59+
}
60+
61+
@Override
62+
public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext context)
63+
throws HttpException, IOException {
64+
exchangeHandler.consumeResponse(response, entityDetails, context);
65+
}
66+
67+
@Override
68+
public void consumeInformation(final HttpResponse response, final HttpContext context) throws HttpException, IOException {
69+
exchangeHandler.consumeInformation(response, context);
70+
}
71+
72+
@Override
73+
public int available() {
74+
return exchangeHandler.available();
75+
}
76+
77+
@Override
78+
public void produce(final DataStreamChannel channel) throws IOException {
79+
exchangeHandler.produce(channel);
80+
}
81+
82+
@Override
83+
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
84+
exchangeHandler.updateCapacity(capacityChannel);
85+
}
86+
87+
@Override
88+
public void consume(final ByteBuffer src) throws IOException {
89+
exchangeHandler.consume(src);
90+
}
91+
92+
@Override
93+
public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
94+
exchangeHandler.streamEnd(trailers);
95+
}
96+
97+
@Override
98+
public void failed(final Exception cause) {
99+
exchangeHandler.failed(cause);
100+
}
101+
102+
@Override
103+
public void cancel() {
104+
exchangeHandler.cancel();
105+
}
106+
107+
@Override
108+
public void releaseResources() {
109+
try {
110+
exchangeHandler.releaseResources();
111+
} finally {
112+
if (released.compareAndSet(false, true)) {
113+
onRelease.run();
114+
}
115+
}
116+
}
117+
118+
}

0 commit comments

Comments
 (0)