Skip to content

Commit

Permalink
Making it possible to specify connect and header timeouts on registry…
Browse files Browse the repository at this point in the history
… backends

With unit tests.

Signed-off-by: Jean Rouge <rougej+github@gmail.com>
  • Loading branch information
wk8 committed Oct 21, 2020
1 parent e435b83 commit b08df8b
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ bins: $(LINUX_BINS)
# ==== TEST ====
.PHONY: unit-test
unit-test:
-rm coverage.txt
-rm -f coverage.txt
$(GO) test -timeout=30s -race -coverprofile=coverage.txt $(ALL_PKGS) --tags "unit"

.PHONY: docker_stop
Expand Down
2 changes: 1 addition & 1 deletion lib/backend/registrybackend/blobclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type BlobClient struct {
// NewBlobClient creates a new BlobClient.
func NewBlobClient(config Config) (*BlobClient, error) {
config = config.applyDefaults()
authenticator, err := security.NewAuthenticator(config.Address, config.Security)
authenticator, err := config.Authenticator()
if err != nil {
return nil, fmt.Errorf("cannot create tag client authenticator: %s", err)
}
Expand Down
59 changes: 59 additions & 0 deletions lib/backend/registrybackend/blobclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"io"
"net/http"
"testing"
"time"

"github.com/pressly/chi"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/backend/backenderrors"
Expand Down Expand Up @@ -125,3 +127,60 @@ func TestBlobDownloadFileNotFound(t *testing.T) {
var b bytes.Buffer
require.Equal(backenderrors.ErrBlobNotFound, client.Download(namespace, "data", &b))
}

func TestBlobDownloadHeaderTimeout(t *testing.T) {
require := require.New(t)

blob := randutil.Blob(32 * memsize.KB)
namespace := core.NamespaceFixture()

r := chi.NewRouter()
r.Get(fmt.Sprintf("/v2/%s/blobs/{blob}", namespace), func(w http.ResponseWriter, req *http.Request) {
time.Sleep(time.Second)
// ignoring errors here, as this will fail after we timeout below
_, _ = io.Copy(w, bytes.NewReader(blob))
})
r.Head(fmt.Sprintf("/v2/%s/blobs/{blob}", namespace), func(w http.ResponseWriter, req *http.Request) {
time.Sleep(time.Second)
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(blob)))
})
addr, stop := testutil.StartServer(r)
defer stop()

config := newTestConfig(addr)
config.ResponseHeaderTimeout = 100 * time.Millisecond
client, err := NewBlobClient(config)
require.NoError(err)

_, err = client.Stat(namespace, "data")
if assert.NotNil(t, err) {
assert.Contains(t, err.Error(), "timeout awaiting response headers")
}

var b bytes.Buffer
err = client.Download(namespace, "data", &b)
if assert.NotNil(t, err) {
assert.Contains(t, err.Error(), "timeout awaiting response headers")
}
}

func TestBlobDownloadConnectTimeout(t *testing.T) {
require := require.New(t)

// unroutable address, courtesy of https://stackoverflow.com/a/904609/4867444
config := newTestConfig("10.255.255.1")
config.ConnectTimeout = 100 * time.Millisecond
client, err := NewBlobClient(config)
require.NoError(err)

_, err = client.Stat("dummynamespace", "data")
if assert.NotNil(t, err) {
assert.Contains(t, err.Error(), "i/o timeout")
}

var b bytes.Buffer
err = client.Download("dummynamespace", "data", &b)
if assert.NotNil(t, err) {
assert.Contains(t, err.Error(), "i/o timeout")
}
}
30 changes: 27 additions & 3 deletions lib/backend/registrybackend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,22 @@
package registrybackend

import (
"net"
"net/http"
"time"

"github.com/uber/kraken/lib/backend/registrybackend/security"
)

// Config defines the registry address, timeout and security options.
type Config struct {
Address string `yaml:"address"`
Timeout time.Duration `yaml:"timeout"`
Security security.Config `yaml:"security"`
Address string `yaml:"address"`
Timeout time.Duration `yaml:"timeout"`
// ConnectTimeout limits the time spent establishing the TCP connection (if a new one is needed).
ConnectTimeout time.Duration `yaml:"connect_timeout"`
// ResponseHeaderTimeout limits the time spent reading the headers of the response.
ResponseHeaderTimeout time.Duration `yaml:"response_header_timeout"`
Security security.Config `yaml:"security"`
}

// Set default configuration
Expand All @@ -33,3 +39,21 @@ func (c Config) applyDefaults() Config {
}
return c
}

func (c Config) Authenticator() (security.Authenticator, error) {
transport := http.DefaultTransport.(*http.Transport).Clone()

if c.ConnectTimeout != 0 {
dialer := &net.Dialer{
Timeout: c.ConnectTimeout,
KeepAlive: 30 * time.Second,
}
transport.DialContext = dialer.DialContext
}

if c.ResponseHeaderTimeout != 0 {
transport.ResponseHeaderTimeout = c.ResponseHeaderTimeout
}

return security.NewAuthenticator(c.Address, c.Security, transport)
}
7 changes: 3 additions & 4 deletions lib/backend/registrybackend/security/security.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,16 @@ type authenticator struct {
// address, TLS, and credentials configuration. It supports both basic auth and
// token based authentication challenges. If TLS is disabled, no authentication
// is attempted.
func NewAuthenticator(address string, config Config) (Authenticator, error) {
rt := http.DefaultTransport.(*http.Transport).Clone()
func NewAuthenticator(address string, config Config, transport *http.Transport) (Authenticator, error) {
tlsClientConfig, err := config.TLS.BuildClient()
if err != nil {
return nil, fmt.Errorf("build tls config for %q: %s", address, err)
}
rt.TLSClientConfig = tlsClientConfig
transport.TLSClientConfig = tlsClientConfig
return &authenticator{
address: address,
config: config,
roundTripper: rt,
roundTripper: transport,
credentialStore: newCredentialStore(address, config),
challengeManager: challenge.NewSimpleManager(),
}, nil
Expand Down
2 changes: 1 addition & 1 deletion lib/backend/registrybackend/tagclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type TagClient struct {
// NewTagClient creates a new TagClient.
func NewTagClient(config Config) (*TagClient, error) {
config = config.applyDefaults()
authenticator, err := security.NewAuthenticator(config.Address, config.Security)
authenticator, err := config.Authenticator()
if err != nil {
return nil, fmt.Errorf("cannot create tag client authenticator: %s", err)
}
Expand Down
68 changes: 68 additions & 0 deletions lib/backend/registrybackend/tagclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"net/http"
"strings"
"testing"
"time"

"github.com/pressly/chi"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/backend/backenderrors"
Expand Down Expand Up @@ -97,3 +99,69 @@ func TestTagDownloadFileNotFound(t *testing.T) {
var b bytes.Buffer
require.Equal(backenderrors.ErrBlobNotFound, client.Download(tag, tag, &b))
}

func TestTagDownloadHeaderTimeout(t *testing.T) {
require := require.New(t)

imageConfig := core.NewBlobFixture()
layer1 := core.NewBlobFixture()
layer2 := core.NewBlobFixture()
digest, manifest := dockerutil.ManifestFixture(
imageConfig.Digest, layer1.Digest, layer2.Digest)

tag := core.TagFixture()
namespace := strings.Split(tag, ":")[0]

r := chi.NewRouter()
r.Get(fmt.Sprintf("/v2/%s/manifests/{tag}", namespace), func(w http.ResponseWriter, req *http.Request) {
time.Sleep(time.Second)
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(manifest)))
w.Header().Set("Docker-Content-Digest", digest.String())
_, _ = io.Copy(w, bytes.NewReader(manifest))
})
r.Head(fmt.Sprintf("/v2/%s/manifests/{tag}", namespace), func(w http.ResponseWriter, req *http.Request) {
time.Sleep(time.Second)
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(manifest)))
w.Header().Set("Docker-Content-Digest", digest.String())
_, _ = io.Copy(w, bytes.NewReader(manifest))
})
addr, stop := testutil.StartServer(r)
defer stop()

config := newTestConfig(addr)
config.ResponseHeaderTimeout = 100 * time.Millisecond
client, err := NewTagClient(config)
require.NoError(err)

_, err = client.Stat(tag, tag)
if assert.NotNil(t, err) {
assert.Contains(t, err.Error(), "timeout awaiting response headers")
}

var b bytes.Buffer
err = client.Download(tag, tag, &b)
if assert.NotNil(t, err) {
assert.Contains(t, err.Error(), "timeout awaiting response headers")
}
}

func TestTagDownloadConnectTimeout(t *testing.T) {
require := require.New(t)

// unroutable address, courtesy of https://stackoverflow.com/a/904609/4867444
config := newTestConfig("10.255.255.1")
config.ConnectTimeout = 100 * time.Millisecond
client, err := NewTagClient(config)
require.NoError(err)

_, err = client.Stat("dummynamespace", "image:tag")
if assert.NotNil(t, err) {
assert.Contains(t, err.Error(), "i/o timeout")
}

var b bytes.Buffer
err = client.Download("dummynamespace", "image:tag", &b)
if assert.NotNil(t, err) {
assert.Contains(t, err.Error(), "i/o timeout")
}
}

0 comments on commit b08df8b

Please sign in to comment.