Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"io"
"net/http"
"net/url"
"time"
)

// Version is the semantic version of the connect module.
Expand Down Expand Up @@ -319,6 +320,8 @@ type Spec struct {
Procedure string // for example, "/acme.foo.v1.FooService/Bar"
IsClient bool // otherwise we're in a handler
IdempotencyLevel IdempotencyLevel
ReadTimeout time.Duration
WriteTimeout time.Duration
}

// Peer describes the other party to an RPC.
Expand Down
11 changes: 11 additions & 0 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package connect
import (
"context"
"net/http"
"time"
)

// A Handler is the server-side implementation of a single RPC defined by a
Expand Down Expand Up @@ -255,6 +256,12 @@ func NewBidiStreamHandler[Req, Res any](

// ServeHTTP implements [http.Handler].
func (h *Handler) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) {
if h.spec.ReadTimeout != 0 {
rc := http.NewResponseController(responseWriter)
rc.SetReadDeadline(time.Now().Add(h.spec.ReadTimeout))
rc.SetWriteDeadline(time.Now().Add(h.spec.ReadTimeout))
}

// We don't need to defer functions to close the request body or read to
// EOF: the stream we construct later on already does that, and we only
// return early when dealing with misbehaving clients. In those cases, it's
Expand Down Expand Up @@ -350,6 +357,8 @@ type handlerConfig struct {
ReadMaxBytes int
SendMaxBytes int
StreamType StreamType
ReadTimeout time.Duration
WriteTimeout time.Duration
}

func newHandlerConfig(procedure string, streamType StreamType, options []HandlerOption) *handlerConfig {
Expand All @@ -376,6 +385,8 @@ func (c *handlerConfig) newSpec() Spec {
Schema: c.Schema,
StreamType: c.StreamType,
IdempotencyLevel: c.IdempotencyLevel,
ReadTimeout: c.ReadTimeout,
WriteTimeout: c.WriteTimeout,
}
}

Expand Down
21 changes: 21 additions & 0 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"io"
"net/http"
"time"
)

// A ClientOption configures a [Client].
Expand Down Expand Up @@ -351,6 +352,14 @@ func WithInterceptors(interceptors ...Interceptor) Option {
return &interceptorsOption{interceptors}
}

func WithReadTimeout(value time.Duration) HandlerOption {
return &readTimeoutOption{value: value}
}

func WithWriteTimeout(value time.Duration) HandlerOption {
return &writeTimeoutOption{value: value}
}

// WithOptions composes multiple Options into one.
func WithOptions(options ...Option) Option {
return &optionsOption{options}
Expand Down Expand Up @@ -645,3 +654,15 @@ func (o *conditionalHandlerOptions) applyToHandler(config *handlerConfig) {
option.applyToHandler(config)
}
}

type readTimeoutOption struct{ value time.Duration }

func (o *readTimeoutOption) applyToHandler(config *handlerConfig) {
config.ReadTimeout = o.value
}

type writeTimeoutOption struct{ value time.Duration }

func (o *writeTimeoutOption) applyToHandler(config *handlerConfig) {
config.WriteTimeout = o.value
}