From 61c99de7e3d05e312e6f6beac5d693d8c9e78b3e Mon Sep 17 00:00:00 2001 From: Andrew Stucki Date: Thu, 19 Dec 2024 12:48:29 -0500 Subject: [PATCH 1/3] Add cleanup routines for underlying http transport --- rpadmin/admin.go | 29 ++++++++++++++++-- rpadmin/admin_test.go | 70 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+), 2 deletions(-) diff --git a/rpadmin/admin.go b/rpadmin/admin.go index 603f5ab..cd9b107 100644 --- a/rpadmin/admin.go +++ b/rpadmin/admin.go @@ -96,6 +96,7 @@ type AdminAPI struct { urls []string brokerIDToUrlsMutex sync.Mutex brokerIDToUrls map[int]string + transport *http.Transport retryClient *pester.Client oneshotClient *http.Client auth Auth @@ -180,18 +181,19 @@ func newAdminAPI(urls []string, auth Auth, tlsConfig *tls.Config, dialer DialCon for _, opt := range opts { opt.apply(client) } + transport := defaultTransport() + a := &AdminAPI{ urls: make([]string, len(urls)), retryClient: client, oneshotClient: &http.Client{Timeout: 10 * time.Second}, auth: auth, + transport: transport, tlsConfig: tlsConfig, brokerIDToUrls: make(map[int]string), forCloud: forCloud, } - transport := &http.Transport{} - if tlsConfig != nil { transport.TLSClientConfig = tlsConfig } @@ -781,3 +783,26 @@ func (a *AdminAPI) UpdateAPIUrlsFromKubernetesDNS() error { return a.initURLs(urls, a.tlsConfig, a.forCloud) } + +// Close closes all idle connections of the underlying transport +// this should be called when an admin client is no longer in-use +// in order to not leak connections from the underlying transport +// pool. +func (a *AdminAPI) Close() { + a.transport.CloseIdleConnections() +} + +func defaultTransport() *http.Transport { + return &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + ForceAttemptHTTP2: true, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + } +} diff --git a/rpadmin/admin_test.go b/rpadmin/admin_test.go index df78e5f..67f40f3 100644 --- a/rpadmin/admin_test.go +++ b/rpadmin/admin_test.go @@ -379,3 +379,73 @@ func TestUpdateAPIUrlsFromKubernetesDNS(t *testing.T) { }) } } + +func TestIdleConnectionClosure(t *testing.T) { + clients := 1000 + numRequests := 10 + + urls := []string{} + for id := 0; id < 3; id++ { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case strings.HasPrefix(r.URL.Path, "/v1/node_config"): + w.Write([]byte(fmt.Sprintf(`{"node_id": %d}`, id))) //nolint:gocritic // original rpk code + case strings.HasPrefix(r.URL.Path, "/v1/partitions/redpanda/controller/0"): + w.Write([]byte(`{"leader_id": 0}`)) //nolint:gocritic // original rpk code + } + })) + + t.Cleanup(server.Close) + + urls = append(urls, server.URL) + } + + // tracker to make sure we close all of our connections + var mutex sync.RWMutex + conns := []*wrappedConnection{} + + for i := 0; i < clients; i++ { + // initialize a new client and do some requests + adminClient, err := NewAdminAPIWithDialer(urls, new(NopAuth), nil, func(ctx context.Context, network, addr string) (net.Conn, error) { + conn, err := (&net.Dialer{}).DialContext(ctx, network, addr) + if err != nil { + return nil, err + } + mutex.Lock() + defer mutex.Unlock() + + wrapped := &wrappedConnection{Conn: conn} + conns = append(conns, wrapped) + return wrapped, nil + }) + require.NoError(t, err) + + for i := 0; i < numRequests; i++ { + _, err = adminClient.GetLeaderID(context.Background()) + require.NoError(t, err) + } + + adminClient.Close() + } + + mutex.RLock() + defer mutex.RUnlock() + + closed := 0 + for _, conn := range conns { + if conn.closed { + closed++ + } + } + require.Equal(t, closed, len(conns), "Not all connections were closed") +} + +type wrappedConnection struct { + net.Conn + closed bool +} + +func (w *wrappedConnection) Close() error { + w.closed = true + return w.Conn.Close() +} From 8a67afeadb95a63bb94fa52e02a87666e3684c20 Mon Sep 17 00:00:00 2001 From: Andrew Stucki Date: Thu, 19 Dec 2024 13:02:06 -0500 Subject: [PATCH 2/3] make linter happy --- rpadmin/admin_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rpadmin/admin_test.go b/rpadmin/admin_test.go index 67f40f3..2e8dc52 100644 --- a/rpadmin/admin_test.go +++ b/rpadmin/admin_test.go @@ -391,7 +391,7 @@ func TestIdleConnectionClosure(t *testing.T) { case strings.HasPrefix(r.URL.Path, "/v1/node_config"): w.Write([]byte(fmt.Sprintf(`{"node_id": %d}`, id))) //nolint:gocritic // original rpk code case strings.HasPrefix(r.URL.Path, "/v1/partitions/redpanda/controller/0"): - w.Write([]byte(`{"leader_id": 0}`)) //nolint:gocritic // original rpk code + w.Write([]byte(`{"leader_id": 0}`)) } })) @@ -425,7 +425,7 @@ func TestIdleConnectionClosure(t *testing.T) { require.NoError(t, err) } - adminClient.Close() + // adminClient.Close() } mutex.RLock() From 9d81554c753d668c802f7394cb195c392101fcd7 Mon Sep 17 00:00:00 2001 From: Andrew Stucki Date: Thu, 19 Dec 2024 13:06:38 -0500 Subject: [PATCH 3/3] uncomment accidentally commented line in test --- rpadmin/admin_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpadmin/admin_test.go b/rpadmin/admin_test.go index 2e8dc52..659a841 100644 --- a/rpadmin/admin_test.go +++ b/rpadmin/admin_test.go @@ -425,7 +425,7 @@ func TestIdleConnectionClosure(t *testing.T) { require.NoError(t, err) } - // adminClient.Close() + adminClient.Close() } mutex.RLock()