-
Notifications
You must be signed in to change notification settings - Fork 985
Add CompletableFuture-based executeAsync overloads to fluent Async for simpler async composition. #782
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Add CompletableFuture-based executeAsync overloads to fluent Async for simpler async composition. #782
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,28 +26,124 @@ | |
| */ | ||
| package org.apache.hc.client5.http.fluent; | ||
|
|
||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.LinkedBlockingQueue; | ||
| import java.util.concurrent.RejectedExecutionException; | ||
| import java.util.concurrent.ThreadPoolExecutor; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
|
|
||
| import org.apache.hc.core5.annotation.Contract; | ||
| import org.apache.hc.core5.annotation.ThreadingBehavior; | ||
| import org.apache.hc.core5.concurrent.BasicFuture; | ||
| import org.apache.hc.core5.concurrent.DefaultThreadFactory; | ||
| import org.apache.hc.core5.concurrent.FutureCallback; | ||
| import org.apache.hc.core5.http.io.HttpClientResponseHandler; | ||
| import org.apache.hc.core5.util.Args; | ||
|
|
||
| /** | ||
| * Asynchronous executor for {@link Request}s. | ||
| * | ||
| * @since 4.3 | ||
| */ | ||
| @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL) | ||
| public class Async { | ||
|
|
||
| private static final int DEFAULT_MAX_THREADS = | ||
| Math.max(2, Math.min(32, Runtime.getRuntime().availableProcessors() * 2)); | ||
|
|
||
| private static final int DEFAULT_QUEUE_CAPACITY = 1000; | ||
|
|
||
| private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger(0); | ||
|
|
||
| private Executor executor; | ||
| private java.util.concurrent.Executor concurrentExec; | ||
| private volatile java.util.concurrent.Executor concurrentExec; | ||
| private volatile ExecutorService ownedConcurrentExec; | ||
|
|
||
| private int maxThreads = DEFAULT_MAX_THREADS; | ||
| private int queueCapacity = DEFAULT_QUEUE_CAPACITY; | ||
|
|
||
| public static Async newInstance() { | ||
| return new Async(); | ||
| } | ||
|
|
||
| Async() { | ||
| super(); | ||
| // Keep legacy behavior by default. | ||
| } | ||
|
|
||
| public Async maxThreads(final int maxThreads) { | ||
| Args.positive(maxThreads, "maxThreads"); | ||
| this.maxThreads = maxThreads; | ||
| rebuildOwnedExecutorIfActive(); | ||
| return this; | ||
| } | ||
|
|
||
| public Async queueCapacity(final int queueCapacity) { | ||
| Args.positive(queueCapacity, "queueCapacity"); | ||
| this.queueCapacity = queueCapacity; | ||
| rebuildOwnedExecutorIfActive(); | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Enables an owned bounded default executor for asynchronous request execution using the | ||
| * current {@code maxThreads} and {@code queueCapacity} settings. | ||
| * | ||
| * @return this instance. | ||
| * @since 5.7 | ||
| */ | ||
| public Async useDefaultExecutor() { | ||
| return useDefaultExecutor(this.maxThreads, this.queueCapacity); | ||
| } | ||
|
|
||
| /** | ||
| * Enables an owned bounded default executor for asynchronous request execution. | ||
| * | ||
| * @param maxThreads maximum number of threads. | ||
| * @param queueCapacity maximum number of queued tasks. | ||
| * @return this instance. | ||
| * @since 5.7 | ||
| */ | ||
| public Async useDefaultExecutor(final int maxThreads, final int queueCapacity) { | ||
| Args.positive(maxThreads, "maxThreads"); | ||
| Args.positive(queueCapacity, "queueCapacity"); | ||
| this.maxThreads = maxThreads; | ||
| this.queueCapacity = queueCapacity; | ||
|
|
||
| shutdown(); | ||
| this.ownedConcurrentExec = createDefaultExecutor(this.maxThreads, this.queueCapacity); | ||
| this.concurrentExec = this.ownedConcurrentExec; | ||
| return this; | ||
| } | ||
|
|
||
| private void rebuildOwnedExecutorIfActive() { | ||
| if (this.ownedConcurrentExec != null) { | ||
| shutdown(); | ||
| this.ownedConcurrentExec = createDefaultExecutor(this.maxThreads, this.queueCapacity); | ||
| this.concurrentExec = this.ownedConcurrentExec; | ||
| } | ||
| } | ||
|
|
||
| private static ExecutorService createDefaultExecutor(final int maxThreads, final int queueCapacity) { | ||
| final int instanceId = INSTANCE_COUNT.incrementAndGet(); | ||
| final DefaultThreadFactory threadFactory = new DefaultThreadFactory( | ||
| "httpclient5-fluent-async-" + instanceId + "-", | ||
| true); | ||
|
|
||
| final ThreadPoolExecutor exec = new ThreadPoolExecutor( | ||
| maxThreads, | ||
| maxThreads, | ||
| 60L, | ||
| TimeUnit.SECONDS, | ||
| new LinkedBlockingQueue<>(queueCapacity), | ||
| threadFactory, | ||
| new ThreadPoolExecutor.CallerRunsPolicy()); | ||
|
|
||
| exec.allowCoreThreadTimeOut(true); | ||
| return exec; | ||
| } | ||
|
|
||
| public Async use(final Executor executor) { | ||
|
|
@@ -57,9 +153,25 @@ public Async use(final Executor executor) { | |
|
|
||
| public Async use(final java.util.concurrent.Executor concurrentExec) { | ||
| this.concurrentExec = concurrentExec; | ||
| shutdown(); | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Shuts down resources owned by this instance, if any. | ||
| * <p> | ||
| * This method never attempts to shut down executors supplied via {@link #use(java.util.concurrent.Executor)}. | ||
| * | ||
| * @since 5.7 | ||
| */ | ||
| public void shutdown() { | ||
| final ExecutorService exec = this.ownedConcurrentExec; | ||
| if (exec != null) { | ||
| this.ownedConcurrentExec = null; | ||
| exec.shutdown(); | ||
| } | ||
| } | ||
|
|
||
| static class ExecRunnable<T> implements Runnable { | ||
|
|
||
| private final BasicFuture<T> future; | ||
|
|
@@ -100,8 +212,14 @@ public <T> Future<T> execute( | |
| request, | ||
| this.executor != null ? this.executor : Executor.newInstance(), | ||
| handler); | ||
| if (this.concurrentExec != null) { | ||
| this.concurrentExec.execute(runnable); | ||
|
|
||
| final java.util.concurrent.Executor exec = this.concurrentExec; | ||
| if (exec != null) { | ||
| try { | ||
| exec.execute(runnable); | ||
| } catch (final RejectedExecutionException ex) { | ||
| future.failed(ex); | ||
| } | ||
| } else { | ||
| final Thread t = new Thread(runnable); | ||
| t.setDaemon(true); | ||
|
|
@@ -122,4 +240,108 @@ public Future<Content> execute(final Request request) { | |
| return execute(request, new ContentResponseHandler(), null); | ||
| } | ||
|
|
||
| /** | ||
| * Executes the given request asynchronously and returns a {@link CompletableFuture} that completes | ||
| * when the response has been fully received and converted by the given response handler. | ||
| * | ||
| * @param request the request to execute. | ||
| * @param handler the response handler. | ||
| * @param <T> the handler result type. | ||
| * @return a {@code CompletableFuture} producing the handler result. | ||
| * @since 5.7 | ||
| */ | ||
| public <T> CompletableFuture<T> executeAsync(final Request request, final HttpClientResponseHandler<T> handler) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason why we can't push this down into
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wait, I'm confused. This class provides an async API for synchronous request execution?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@rschmitt This is historic and goes back to the early days of HttpClient 4.x
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Fluent is built on the classic client. The new CompletableFuture methods are just adapters over the existing callback-based API.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, I think it's weird to add |
||
| final CompletableFuture<T> cf = new CompletableFuture<>(); | ||
| execute(request, handler, new FutureCallback<T>() { | ||
|
|
||
| @Override | ||
| public void completed(final T result) { | ||
| cf.complete(result); | ||
| } | ||
|
|
||
| @Override | ||
| public void failed(final Exception ex) { | ||
| cf.completeExceptionally(ex); | ||
| } | ||
|
|
||
| @Override | ||
| public void cancelled() { | ||
| cf.cancel(false); | ||
| } | ||
|
|
||
| }); | ||
| return cf; | ||
| } | ||
|
|
||
| /** | ||
| * Executes the given request asynchronously and returns a {@link CompletableFuture} that completes | ||
| * when the response has been fully received and converted by the given response handler. The given | ||
| * callback is invoked on completion, failure, or cancellation. | ||
| * | ||
| * @param request the request to execute. | ||
| * @param handler the response handler. | ||
| * @param callback the callback to invoke on completion, failure, or cancellation; may be {@code null}. | ||
| * @param <T> the handler result type. | ||
| * @return a {@code CompletableFuture} producing the handler result. | ||
| * @since 5.7 | ||
| */ | ||
| public <T> CompletableFuture<T> executeAsync( | ||
| final Request request, final HttpClientResponseHandler<T> handler, final FutureCallback<T> callback) { | ||
| final CompletableFuture<T> cf = new CompletableFuture<>(); | ||
| execute(request, handler, new FutureCallback<T>() { | ||
|
|
||
| @Override | ||
| public void completed(final T result) { | ||
| if (callback != null) { | ||
| callback.completed(result); | ||
| } | ||
| cf.complete(result); | ||
| } | ||
|
|
||
| @Override | ||
| public void failed(final Exception ex) { | ||
| if (callback != null) { | ||
| callback.failed(ex); | ||
| } | ||
| cf.completeExceptionally(ex); | ||
| } | ||
|
|
||
| @Override | ||
| public void cancelled() { | ||
| if (callback != null) { | ||
| callback.cancelled(); | ||
| } | ||
| cf.cancel(false); | ||
| } | ||
|
|
||
| }); | ||
| return cf; | ||
| } | ||
|
|
||
| /** | ||
| * Executes the given request asynchronously and returns a {@link CompletableFuture} that completes | ||
| * when the response has been fully received and converted to {@link Content}. | ||
| * | ||
| * @param request the request to execute. | ||
| * @return a {@code CompletableFuture} producing the response {@code Content}. | ||
| * @since 5.7 | ||
| */ | ||
| public CompletableFuture<Content> executeAsync(final Request request) { | ||
| return executeAsync(request, new ContentResponseHandler()); | ||
| } | ||
|
|
||
| /** | ||
| * Executes the given request asynchronously and returns a {@link CompletableFuture} that completes | ||
| * when the response has been fully received and converted to {@link Content}. The given callback | ||
| * is invoked on completion, failure, or cancellation. | ||
| * | ||
| * @param request the request to execute. | ||
| * @param callback the callback to invoke on completion, failure, or cancellation; may be {@code null}. | ||
| * @return a {@code CompletableFuture} producing the response {@code Content}. | ||
| * @since 5.7 | ||
| */ | ||
| public CompletableFuture<Content> executeAsync(final Request request, final FutureCallback<Content> callback) { | ||
| return executeAsync(request, new ContentResponseHandler(), callback); | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| /* | ||
| * ==================================================================== | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| * ==================================================================== | ||
| * | ||
| * This software consists of voluntary contributions made by many | ||
| * individuals on behalf of the Apache Software Foundation. For more | ||
| * information on the Apache Software Foundation, please see | ||
| * <http://www.apache.org/>. | ||
| * | ||
| */ | ||
| package org.apache.hc.client5.http.examples.fluent; | ||
|
|
||
| import java.util.Arrays; | ||
| import java.util.List; | ||
| import java.util.concurrent.CompletableFuture; | ||
|
|
||
| import org.apache.hc.client5.http.fluent.Async; | ||
| import org.apache.hc.client5.http.fluent.Request; | ||
|
|
||
| /** | ||
| * This example demonstrates how the HttpClient fluent API can be used to execute multiple | ||
| * requests asynchronously using CompletableFuture. | ||
| */ | ||
| public class FluentAsyncCompletableFuture { | ||
|
|
||
| public static void main(final String... args) throws Exception { | ||
|
|
||
| final List<Request> requests = Arrays.asList( | ||
| Request.get("http://www.google.com/"), | ||
| Request.get("http://www.yahoo.com/"), | ||
| Request.get("http://www.apache.org/"), | ||
| Request.get("http://www.apple.com/") | ||
| ); | ||
|
|
||
| final Async async = Async.newInstance().useDefaultExecutor(8, 500); | ||
| try { | ||
|
|
||
| final CompletableFuture<?>[] futures = requests.stream() | ||
| .map(r -> async.executeAsync(r) | ||
| .thenAccept(content -> System.out.println("Request completed: " + r)) | ||
| .exceptionally(ex -> { | ||
| System.out.println(ex.getMessage() + ": " + r); | ||
| return null; | ||
| })) | ||
| .toArray(CompletableFuture[]::new); | ||
|
|
||
| CompletableFuture.allOf(futures).join(); | ||
| } finally { | ||
| async.shutdown(); | ||
| } | ||
|
|
||
| System.out.println("Done"); | ||
| } | ||
|
|
||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class needs a thread-safety policy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just add Contract