Skip to content

Conversation

@lingbohome
Copy link

@lingbohome lingbohome commented Sep 27, 2025

Motivation

When using HTTP-based APIClient, if the custom httpclient is not configured, a new httpclient instance will be created internally for each watch request to be processed, which will result in the inability to share and reuse the connection pool maintained by the underlying httpclient.
This results in a large number of TCP connections that cannot be released, which ultimately causes connection leakage problems.
Therefore, without configuring a custom httpclient, add the DisableKeepAlives option to ensure that the connection is released in time for each watch request.

Modifications

Verification

Documentation

…ove the potential TCP connection leakage caused by event stream reading

Signed-off-by: lingbo <[email protected]>
@MasonM
Copy link
Member

MasonM commented Oct 27, 2025

if the custom httpclient is not configured, a new httpclient instance will be created internally for each watch request to be processed,

Do you have sample code, or some sort of reproduction steps for this?

From looking at the code, I'm guessing what's happening is EventStreamReader() doesn't close the response body properly since it's wrapped in bufio reader, and per the docs:

If the returned error is nil, the Response will contain a non-nil Body which the user is expected to close. If the Body is not both read to EOF and closed, the Client's underlying RoundTripper (typically Transport) may not be able to re-use a persistent TCP connection to the server for a subsequent "keep-alive" request.

So I can see how DisableKeepAlives: true works around the problem, but we're still not doing what the docs say we should be doing. It should be possible to update the client code to close the body (POC: main...MasonM:argo-workflows:poc-eventstreamreader-close), but that's more complicated, so DisableKeepAlives: true might be better.

@lingbohome
Copy link
Author

(POC: main...MasonM:argo-workflows:poc-eventstreamreader-close)
At first, I also thought it was because Response.Body didn't have an explicit Close(), so I used your similar approach:

type EventStream struct {
	*bufio.Reader
	closer io.Closer
}

func (es *EventStream) Close() error {
	return es.closer.Close()
}
func (h Facade) EventStreamReader(ctx context.Context, in interface{}, path string) (*EventStream, error) {
    ......
	return &EventStream{
		Reader: bufio.NewReader(resp.Body),
		closer: resp.Body,
	}, nil
}

type serverSentEventsClient struct {
	ctx    context.Context
	reader *EventStream
}
func (f serverSentEventsClient) RecvEvent(v interface{}) error {
	for {
		select {
		case <-f.ctx.Done():
			log.Infoln("Client disconnected from SSE")
			return f.reader.Close()
		default:
		}

		line, err := f.reader.ReadBytes('\n')
		if err != nil {
			 f.reader.Close()
			return err
		}
		log.Debugln(string(line))
        ......
	}
}


After testing, the problem persisted and nothing changed, and later, setting DisableKeepAlives: true solved the problem.
If you want to reproduce this issue, don't configure a custom httpclient like this:

    opt := apiclient.Opts{
        ArgoServerOpts: apiclient.ArgoServerOpts{
            URL:                addr,
            Path:               "/",
            Secure:             true,
            InsecureSkipVerify: true,
            HTTP1:              true,
            HTTP1Client: nil,
        },
        AuthSupplier: func() string {
            return token
        },
    }

Then make multiple requests to get running task logs, and you should observe that the TCP connection continues to grow until your service process is terminated, which is not released.
@MasonM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants