Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
with:
linux_6_0_enabled: false
linux_6_1_enabled: false
linux_6_2_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable"
linux_6_2_enabled: false
linux_6_3_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable"
linux_nightly_next_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable"
linux_nightly_main_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable"
Expand All @@ -31,3 +31,5 @@ jobs:
with:
linux_6_0_enabled: false
linux_6_1_enabled: false
linux_6_2_enabled: false
linux_6_3_enabled: true
11 changes: 5 additions & 6 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
name: Soundness
uses: swiftlang/github-workflows/.github/workflows/soundness.yml@0.0.7
with:
api_breakage_check_container_image: "swift:6.2-noble"
api_breakage_check_container_image: "swift:6.3-noble"
format_check_container_image: "swiftlang/swift:nightly-main-noble" # Needed due to https://github.com/swiftlang/swift-format/issues/1081
license_header_check_project_name: "Swift HTTP Server"

Expand All @@ -23,9 +23,7 @@ jobs:
linux_5_10_enabled: false
linux_6_0_enabled: false
linux_6_1_enabled: false
# linux_6_1_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error"
linux_6_2_enabled: true
linux_6_2_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error"
linux_6_2_enabled: false
linux_6_3_enabled: true
linux_6_3_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error"
linux_nightly_next_arguments_override: "--explicit-target-dependency-import-check error"
Expand All @@ -38,7 +36,7 @@ jobs:
linux_5_10_enabled: false
linux_6_0_enabled: false
linux_6_1_enabled: false
linux_6_2_enabled: true
linux_6_2_enabled: false
linux_6_3_enabled: true

static-sdk:
Expand All @@ -52,4 +50,5 @@ jobs:
linux_5_10_enabled: false
linux_6_0_enabled: false
linux_6_1_enabled: false
linux_6_2_enabled: true
linux_6_2_enabled: false
linux_6_3_enabled: true
191 changes: 191 additions & 0 deletions Sources/NIOHTTPServer/HTTPKeepAliveHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift HTTP Server open source project
//
// Copyright (c) 2026 Apple Inc. and the Swift HTTP Server project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of Swift HTTP Server project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import HTTPTypes
import NIOCore
import NIOHTTPTypes

/// A NIO channel handler that ensures HTTP/1.1 keep-alive semantics are honored when
/// the server starts writing a response before the request body has been fully read.
///
/// The handler buffers final response parts (head + any body fragments + end) when
/// they are written before the request `.end` has been received. The buffer is
/// released at the next deadline:
///
/// - **`channelReadComplete`**: the end of an inbound read cycle.
/// - **`flush`**: an upstream writer (e.g. `NIOAsyncChannelOutboundWriter`) forced a
/// flush.
///
/// At each deadline, if request `.end` has arrived, the buffer is flushed as-is and
/// the connection is reusable. If request `.end` has *not* arrived, the head is
/// amended with `Connection: close`, the buffer is flushed, and the connection is
/// closed once response `.end` is written. This protects against clients that keep
/// uploading request body bytes after the response has completed (which would
/// otherwise force the server to drain unbounded data) and gives the client an
/// explicit signal not to pipeline another request on the connection.
///
/// Informational (1xx) responses pass through unchanged and do not affect buffering
/// state.
@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
final class HTTPKeepAliveHandler: ChannelDuplexHandler {
Copy link
Copy Markdown
Member

@fabianfett fabianfett May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really think we should unify the number of handlers here significantly. Doing this in a follow up is fine.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but I don't want to do it at this point - the timeouts are in a separate PR, and having them separate makes reviewing easier IMO. I will consolidate them in a follow up.

typealias InboundIn = HTTPRequestPart
typealias InboundOut = HTTPRequestPart
typealias OutboundIn = HTTPResponsePart
typealias OutboundOut = HTTPResponsePart

private struct BufferedWrite {
var part: HTTPResponsePart
var promise: EventLoopPromise<Void>?
}

private enum FinalResponseState {
/// No final response part has been written yet for the current request.
/// Informational (1xx) responses may have been passed through.
case notStarted
/// The final response head was written before request `.end` arrived. The
/// head — and any additional response parts (body fragments, `.end`) the
/// handler wrote in the same window — are buffered until the next deadline
/// (`channelReadComplete` or `flush`), at which point we decide whether to
/// keep the connection alive or amend the head with `Connection: close`.
case buffering(head: BufferedWrite, additional: [BufferedWrite])
/// The head has been flushed; remaining parts stream directly. If
/// `closeAfterResponseEnd` is true, the head carried `Connection: close`
/// and we close once response `.end` is written.
case streaming
}

/// `true` when the request `.end` has been received on the inbound side, or no
/// request is currently in flight. `false` between receiving a request `.head`
/// and its `.end`.
private var requestEndReceived: Bool = true

/// `true` if we've committed to closing the connection after this response's
/// `.end` is written. Set when the buffer is flushed while request `.end` has
/// not yet arrived (so we add `Connection: close`). Cleared when a new request
/// begins.
private var closeAfterResponseEnd: Bool = false

private var finalResponseState: FinalResponseState = .notStarted

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let part = self.unwrapInboundIn(data)
switch part {
case .head:
// Begin a new request. (Any previous request's response must have
// completed already since HTTPServerPipelineHandler enforces ordering.)
Comment on lines +85 to +86
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this is only true if pipelining is disabled.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, from the HTTPServerPipelineHandler docs:

This handler ensures that HTTP server pipelines only process one request at a time.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That option is enabled so we currently have this handler set up, which means this should be safe. We don't provide a way to change this configuration.

self.requestEndReceived = false
self.closeAfterResponseEnd = false
self.finalResponseState = .notStarted
case .body:
break
case .end:
self.requestEndReceived = true
}
context.fireChannelRead(data)
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this also needs a timeout in the form of channelReadComplete. We should check the state in channelReadComplete and flush the head.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to keep the timeouts in #72 - I haven't updated it since we had our chat though, I'll re-assign to you when I do.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

channelReadComplete isn't a timeout though :) It's another NIO callback.

Copy link
Copy Markdown
Collaborator Author

@gjcairo gjcairo May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, but then I don't think I understand your original comment :D

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this offline: we agreed we would append connection:close to the response head if, by the time a read cycle ends (i.e. channelReadComplete is called) or flush is manually called, we haven't seen the request end.

func channelReadComplete(context: ChannelHandlerContext) {
// End of an inbound read cycle: this is the deadline for deciding whether
// the buffered response can be sent as-is (keep-alive) or must include
// `Connection: close`. If request `.end` arrived during the cycle the head
// is flushed unchanged; otherwise we amend the head and close after
// response `.end`.
if case .buffering = self.finalResponseState {
self.flushBuffer(context: context)
}
context.fireChannelReadComplete()
}

func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
let part = self.unwrapOutboundIn(data)
switch self.finalResponseState {
case .notStarted:
// Informational (1xx) responses pass through without affecting state: they
// don't conclude the response, so we remain in `.notStarted` until the
// final response head is written.
if case .head(let response) = part, response.status.kind == .informational {
context.write(data, promise: promise)
return
}
if self.requestEndReceived {
// Request fully read; stream the response directly.
self.finalResponseState = .streaming
context.write(data, promise: promise)
} else {
// Start buffering with the head. Additional parts (body, end) the
// handler may write before the next deadline are appended below.
self.finalResponseState = .buffering(
head: BufferedWrite(part: part, promise: promise),
additional: []
)
}
case .buffering(let head, var additional):
additional.append(BufferedWrite(part: part, promise: promise))
self.finalResponseState = .buffering(head: head, additional: additional)
case .streaming:
context.write(data, promise: promise)
if case .end = part, self.closeAfterResponseEnd {
// The head we flushed earlier carried `Connection: close`; close
// the connection now that the response is complete.
context.flush()
context.close(mode: .output, promise: nil)
}
}
}

func flush(context: ChannelHandlerContext) {
// An upstream writer forced a flush. Same deadline as `channelReadComplete`:
// release any buffered parts, with `Connection: close` if request `.end`
// hasn't arrived.
if case .buffering = self.finalResponseState {
self.flushBuffer(context: context)
}
context.flush()
}

/// Releases buffered response parts to the pipeline. If request `.end` has not
/// yet arrived, amend the head with `Connection: close` and arrange to close
/// the connection once response `.end` is written.
private func flushBuffer(context: ChannelHandlerContext) {
guard case .buffering(var head, let additional) = self.finalResponseState else { return }

if !self.requestEndReceived {
// Amend the head with `Connection: close` before flushing.
if case .head(var response) = head.part {
response.headerFields[.connection] = "close"
head.part = .head(response)
}
self.closeAfterResponseEnd = true
}

self.finalResponseState = .streaming

context.write(self.wrapOutboundOut(head.part), promise: head.promise)
var sawEnd = false
for write in additional {
context.write(self.wrapOutboundOut(write.part), promise: write.promise)
if case .end = write.part {
sawEnd = true
}
}
context.flush()

if sawEnd && self.closeAfterResponseEnd {
// The response was fully buffered (head + ... + end) and we have to
// close. Close now (the flush above ensured the writes reached the
// wire).
context.close(mode: .output, promise: nil)
}
}
}
70 changes: 40 additions & 30 deletions Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,21 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
fileprivate var state: ReaderState

/// The iterator that provides HTTP request parts from the underlying channel.
private var iterator: NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator
/// Taken from `state` at construction; returned to `state` when this reader
/// observes request `.end` so the outer request loop can recover it for
/// HTTP/1.1 keep-alive.
private var iterator: NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator?

/// A reusable buffer handed to the body closure on each call to ``read(body:)``.
/// Reusing it across calls preserves the allocation; the buffer is cleared
/// (while keeping its capacity) at the start of every read.
private var buffer: UniqueArray<UInt8>

/// Initializes a new request body reader with the given NIO async channel iterator.
///
/// - Parameter iterator: The NIO async channel inbound stream iterator to use for reading request parts.
fileprivate init(
iterator: consuming sending NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator,
readerState: ReaderState
) {
self.iterator = iterator
/// Initializes a new request body reader, taking the iterator from the
/// shared `ReaderState`.
fileprivate init(readerState: ReaderState) {
self.state = readerState
self.iterator = readerState.takeIterator()
self.buffer = UniqueArray<UInt8>()
}

Expand All @@ -72,7 +71,7 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
) async throws(EitherError<ReadFailure, Failure>) -> Return {
let requestPart: HTTPRequestPart?
do {
requestPart = try await self.iterator.next(isolation: #isolation)
requestPart = try await self.iterator?.next(isolation: #isolation)
} catch {
throw .first(error)
}
Expand All @@ -85,9 +84,14 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
self.buffer.reserveCapacity(element.readableBytes)
self.buffer.append(copying: element.readableBytesUInt8Span)
case .end(let trailers):
// Move the iterator back into ReaderState so the outer request
// loop can recover it for the next request on the same connection
// (HTTP/1.1 keep-alive).
nonisolated(unsafe) let iter = self.iterator.take()
self.state.wrapped.withLock { state in
state.trailers = trailers
state.finishedReading = true
_ = state.iterator.swap(newValue: iter)
}
case .none:
throw .first(RequestBodyReadError.streamEndedBeforeReceivingRequestEnd)
Expand All @@ -102,15 +106,31 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
}

final class ReaderState: Sendable {
struct Wrapped {
struct Wrapped: ~Copyable {
var trailers: HTTPFields? = nil
var finishedReading: Bool = false

/// The iterator. Initially populated from the channel; taken by the
/// body reader at construction time and returned by it once request
/// `.end` has been observed (for HTTP/1.1 keep-alive recovery).
var iterator:
Disconnected<
NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator?
>
}

let wrapped: Mutex<Wrapped>

init() {
self.wrapped = .init(.init())
init(iterator: consuming sending NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator) {
self.wrapped = .init(.init(iterator: Disconnected(value: iterator)))
}

/// Takes the iterator out of the state. Returns the iterator if present,
/// or `nil` if it's already been taken (e.g. by the body reader).
func takeIterator() -> sending NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator? {
self.wrapped.withLock { state in
state.iterator.swap(newValue: nil)
}
}
}

Expand All @@ -123,18 +143,12 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
/// The type of errors that can occur during reading operations.
public typealias Failure = any Error

private var iterator: Disconnected<NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator?>

internal var state: ReaderState

/// Initializes a new HTTP request body and trailers reader with the given NIO async channel iterator.
/// Initializes a new HTTP request body and trailers reader.
///
/// - Parameter iterator: The NIO async channel inbound stream iterator to use for reading request parts.
init(
iterator: consuming sending NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator,
readerState: ReaderState
) {
self.iterator = .init(value: iterator)
/// - Parameter readerState: The shared reader state that holds the iterator and captures trailers.
init(readerState: ReaderState) {
self.state = readerState
}

Expand Down Expand Up @@ -166,14 +180,10 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
public consuming func consumeAndConclude<Return, Failure: Error>(
body: nonisolated(nonsending) (consuming sending RequestBodyAsyncReader) async throws(Failure) -> Return
) async throws(Failure) -> (Return, HTTPFields?) {
if let iterator = self.iterator.take() {
let partsReader = RequestBodyAsyncReader(iterator: iterator, readerState: self.state)
let result = try await body(partsReader)
let trailers = self.state.wrapped.withLock { $0.trailers }
return (result, trailers)
} else {
fatalError("consumeAndConclude called more than once")
}
let partsReader = RequestBodyAsyncReader(readerState: self.state)
let result = try await body(partsReader)
let trailers = self.state.wrapped.withLock { $0.trailers }
return (result, trailers)
}
}

Expand Down
Loading
Loading