Skip to content

Commit

Permalink
fix(elasticsearch): wait for (#2724)
Browse files Browse the repository at this point in the history
Wait for the HTTP port to be available to prevent random failures when
the container isn't fully started and returns 503 errors.
  • Loading branch information
stevenh authored Sep 27, 2024
1 parent 1bf8e2b commit 4dc3662
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 67 deletions.
155 changes: 89 additions & 66 deletions modules/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package elasticsearch

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"os"
Expand All @@ -15,6 +17,7 @@ const (
defaultTCPPort = "9300"
defaultPassword = "changeme"
defaultUsername = "elastic"
defaultCaCertPath = "/usr/share/elasticsearch/config/certs/http_ca.crt"
minimalImageVersion = "7.9.2"
)

Expand All @@ -32,7 +35,7 @@ type ElasticsearchContainer struct {
}

// Deprecated: use Run instead
// RunContainer creates an instance of the Couchbase container type
// RunContainer creates an instance of the Elasticsearch container type
func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*ElasticsearchContainer, error) {
return Run(ctx, "docker.elastic.co/elasticsearch/elasticsearch:7.9.2", opts...)
}
Expand All @@ -50,54 +53,41 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
defaultHTTPPort + "/tcp",
defaultTCPPort + "/tcp",
},
// regex that
// matches 8.3 JSON logging with started message and some follow up content within the message field
// matches 8.0 JSON logging with no whitespace between message field and content
// matches 7.x JSON logging with whitespace between message field and content
// matches 6.x text logging with node name in brackets and just a 'started' message till the end of the line
WaitingFor: wait.ForLog(`.*("message":\s?"started(\s|")?.*|]\sstarted\n)`).AsRegexp(),
LifecycleHooks: []testcontainers.ContainerLifecycleHooks{
{
// the container needs a post create hook to set the default JVM options in a file
PostCreates: []testcontainers.ContainerHook{},
PostReadies: []testcontainers.ContainerHook{},
},
},
},
Started: true,
}

// Gather all config options (defaults and then apply provided options)
settings := defaultOptions()
options := defaultOptions()
for _, opt := range opts {
if apply, ok := opt.(Option); ok {
apply(settings)
apply(options)
}
if err := opt.Customize(&req); err != nil {
return nil, err
}
}

// Transfer the certificate settings to the container request
err := configureCertificate(settings, &req)
if err != nil {
return nil, err
}

// Transfer the password settings to the container request
err = configurePassword(settings, &req)
if err != nil {
if err := configurePassword(options, &req); err != nil {
return nil, err
}

if isAtLeastVersion(req.Image, 7) {
req.LifecycleHooks[0].PostCreates = append(req.LifecycleHooks[0].PostCreates, configureJvmOpts)
req.LifecycleHooks = append(req.LifecycleHooks,
testcontainers.ContainerLifecycleHooks{
PostCreates: []testcontainers.ContainerHook{configureJvmOpts},
},
)
}

// Set the default waiting strategy if not already set.
setWaitFor(options, &req.ContainerRequest)

container, err := testcontainers.GenericContainer(ctx, req)
var esContainer *ElasticsearchContainer
if container != nil {
esContainer = &ElasticsearchContainer{Container: container, Settings: *settings}
esContainer = &ElasticsearchContainer{Container: container, Settings: *options}
}
if err != nil {
return esContainer, fmt.Errorf("generic container: %w", err)
Expand All @@ -110,6 +100,61 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
return esContainer, nil
}

// certWriter is a helper that writes the details of a CA cert to options.
type certWriter struct {
options *Options
certPool *x509.CertPool
}

// Read reads the CA cert from the reader and appends it to the options.
func (w *certWriter) Read(r io.Reader) error {
buf, err := io.ReadAll(r)
if err != nil {
return fmt.Errorf("read CA cert: %w", err)
}

w.options.CACert = buf
w.certPool.AppendCertsFromPEM(w.options.CACert)

return nil
}

// setWaitFor sets the req.WaitingFor strategy based on settings.
func setWaitFor(options *Options, req *testcontainers.ContainerRequest) {
var strategies []wait.Strategy
if req.WaitingFor != nil {
// Custom waiting strategy, ensure we honour it.
strategies = append(strategies, req.WaitingFor)
}

waitHTTP := wait.ForHTTP("/").WithPort(defaultHTTPPort)
if sslRequired(req) {
waitHTTP = waitHTTP.WithTLS(true).WithAllowInsecure(true)
cw := &certWriter{
options: options,
certPool: x509.NewCertPool(),
}

waitHTTP = waitHTTP.
WithTLS(true, &tls.Config{RootCAs: cw.certPool})

strategies = append(strategies, wait.ForFile(defaultCaCertPath).WithMatcher(cw.Read))
}

if options.Password != "" || options.Username != "" {
waitHTTP = waitHTTP.WithBasicAuth(options.Username, options.Password)
}

strategies = append(strategies, waitHTTP)

if len(strategies) > 1 {
req.WaitingFor = wait.ForAll(strategies...)
return
}

req.WaitingFor = strategies[0]
}

// configureAddress sets the address of the Elasticsearch container.
// If the certificate is set, it will use https as protocol, otherwise http.
func (c *ElasticsearchContainer) configureAddress(ctx context.Context) error {
Expand All @@ -133,50 +178,28 @@ func (c *ElasticsearchContainer) configureAddress(ctx context.Context) error {
return nil
}

// configureCertificate transfers the certificate settings to the container request.
// For that, it defines a post start hook that copies the certificate from the container to the host.
// The certificate is only available since version 8, and will be located in a well-known location.
func configureCertificate(settings *Options, req *testcontainers.GenericContainerRequest) error {
if isAtLeastVersion(req.Image, 8) {
// These configuration keys explicitly disable CA generation.
// If any are set we skip the file retrieval.
configKeys := []string{
"xpack.security.enabled",
"xpack.security.http.ssl.enabled",
"xpack.security.transport.ssl.enabled",
}
for _, configKey := range configKeys {
if value, ok := req.Env[configKey]; ok {
if value == "false" {
return nil
}
// sslRequired returns true if the SSL is required, otherwise false.
func sslRequired(req *testcontainers.ContainerRequest) bool {
if !isAtLeastVersion(req.Image, 8) {
return false
}

// These configuration keys explicitly disable CA generation.
// If any are set we skip the file retrieval.
configKeys := []string{
"xpack.security.enabled",
"xpack.security.http.ssl.enabled",
"xpack.security.transport.ssl.enabled",
}
for _, configKey := range configKeys {
if value, ok := req.Env[configKey]; ok {
if value == "false" {
return false
}
}

// The container needs a post ready hook to copy the certificate from the container to the host.
// This certificate is only available since version 8
req.LifecycleHooks[0].PostReadies = append(req.LifecycleHooks[0].PostReadies,
func(ctx context.Context, container testcontainers.Container) error {
const defaultCaCertPath = "/usr/share/elasticsearch/config/certs/http_ca.crt"

readCloser, err := container.CopyFileFromContainer(ctx, defaultCaCertPath)
if err != nil {
return err
}

// receive the bytes from the default location
certBytes, err := io.ReadAll(readCloser)
if err != nil {
return err
}

settings.CACert = certBytes

return nil
})
}

return nil
return true
}

// configurePassword transfers the password settings to the container request.
Expand Down
1 change: 0 additions & 1 deletion modules/elasticsearch/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type Options struct {

func defaultOptions() *Options {
return &Options{
CACert: nil,
Username: defaultUsername,
}
}
Expand Down

0 comments on commit 4dc3662

Please sign in to comment.