From 1bda407237fc660629f019a6dd3301a21e4147f6 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Thu, 28 Oct 2021 23:18:45 -0700 Subject: [PATCH 1/3] Limit Elasticsearch document _id to 512 bytes (#2112) --- common/log/tag/tags.go | 10 +++++++ .../store/elasticsearch/processor.go | 15 ++++++----- .../store/elasticsearch/visibility_store.go | 19 ++++++++++++-- .../visibility_store_write_test.go | 26 +++++++++++++++++++ host/elasticsearch_test.go | 19 ++++++++++++++ 5 files changed, 81 insertions(+), 8 deletions(-) diff --git a/common/log/tag/tags.go b/common/log/tag/tags.go index e722b884f79..c9477f865a3 100644 --- a/common/log/tag/tags.go +++ b/common/log/tag/tags.go @@ -56,6 +56,11 @@ func Error(err error) ZapTag { return NewErrorTag(err) } +// IsRetryable returns tag for IsRetryable +func IsRetryable(isRetryable bool) ZapTag { + return NewBoolTag("is-retryable", isRetryable) +} + // ClusterName returns tag for ClusterName func ClusterName(clusterName string) ZapTag { return NewStringTag("cluster-name", clusterName) @@ -435,6 +440,11 @@ func Counter(c int) ZapTag { return NewInt("counter", c) } +// RequestCount returns tag for RequestCount +func RequestCount(c int) ZapTag { + return NewInt("request-count", c) +} + // Number returns tag for Number func Number(n int64) ZapTag { return NewInt64("number", n) diff --git a/common/persistence/visibility/store/elasticsearch/processor.go b/common/persistence/visibility/store/elasticsearch/processor.go index 756c77e77d8..de85d999b1b 100644 --- a/common/persistence/visibility/store/elasticsearch/processor.go +++ b/common/persistence/visibility/store/elasticsearch/processor.go @@ -30,6 +30,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "sync/atomic" "time" @@ -216,13 +217,14 @@ func (p *processorImpl) bulkBeforeAction(_ int64, requests []elastic.BulkableReq // bulkAfterAction is triggered after bulk processor commit func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) { if err != nil { - // This happens after configured retry, which means something bad happens on cluster or index - // When cluster back to live, processor will re-commit those failure requests - + const logFirstNRequests = 5 isRetryable := client.IsRetryableError(err) - p.logger.Error("Unable to commit bulk ES request.", tag.Error(err), tag.Bool(isRetryable)) - for _, request := range requests { - p.logger.Error("ES request failed.", tag.ESRequest(request.String())) + var logRequests strings.Builder + for i, request := range requests { + if i < logFirstNRequests { + logRequests.WriteString(request.String()) + logRequests.WriteRune('\n') + } p.metricsClient.IncCounter(metrics.ElasticsearchBulkProcessor, metrics.ElasticsearchBulkProcessorFailures) if !isRetryable { @@ -233,6 +235,7 @@ func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequ p.sendToAckChan(visibilityTaskKey, false) } } + p.logger.Error("Unable to commit bulk ES request.", tag.Error(err), tag.IsRetryable(isRetryable), tag.RequestCount(len(requests)), tag.ESRequest(logRequests.String())) return } diff --git a/common/persistence/visibility/store/elasticsearch/visibility_store.go b/common/persistence/visibility/store/elasticsearch/visibility_store.go index ac7a2f3f08b..4012a7c4074 100644 --- a/common/persistence/visibility/store/elasticsearch/visibility_store.go +++ b/common/persistence/visibility/store/elasticsearch/visibility_store.go @@ -32,6 +32,7 @@ import ( "errors" "fmt" "io" + "strconv" "strings" "time" @@ -173,11 +174,25 @@ func (s *visibilityStore) DeleteWorkflowExecution(request *manager.VisibilityDel } func getDocID(workflowID string, runID string) string { - return fmt.Sprintf("%s%s%s", workflowID, delimiter, runID) + // From Elasticsearch doc: _id is limited to 512 bytes in size and larger values will be rejected. + const maxDocIDLength = 512 + // Generally runID is guid and this should never be the case. + if len(runID)+len(delimiter) >= maxDocIDLength { + if len(runID) >= maxDocIDLength { + return runID[0:maxDocIDLength] + } + return runID[0 : maxDocIDLength-len(delimiter)] + } + + if len(workflowID)+len(runID)+len(delimiter) > maxDocIDLength { + workflowID = workflowID[0 : maxDocIDLength-len(runID)-len(delimiter)] + } + + return workflowID + delimiter + runID } func getVisibilityTaskKey(shardID int32, taskID int64) string { - return fmt.Sprintf("%d%s%d", shardID, delimiter, taskID) + return strconv.FormatInt(int64(shardID), 10) + delimiter + strconv.FormatInt(taskID, 10) } func (s *visibilityStore) addBulkIndexRequestAndWait( diff --git a/common/persistence/visibility/store/elasticsearch/visibility_store_write_test.go b/common/persistence/visibility/store/elasticsearch/visibility_store_write_test.go index 178a3f2a042..30616d6eb50 100644 --- a/common/persistence/visibility/store/elasticsearch/visibility_store_write_test.go +++ b/common/persistence/visibility/store/elasticsearch/visibility_store_write_test.go @@ -26,6 +26,7 @@ package elasticsearch import ( "fmt" + "strings" "time" "github.com/golang/mock/gomock" @@ -269,3 +270,28 @@ func (s *ESVisibilitySuite) TestDeleteExecution_EmptyRequest() { err := s.visibilityStore.DeleteWorkflowExecution(request) s.NoError(err) } + +func (s *ESVisibilitySuite) Test_getDocID() { + s.Equal("wid~rid", getDocID("wid", "rid")) + + s.Equal(strings.Repeat("a", 512), getDocID("wid", strings.Repeat("a", 1000))) + s.Equal(strings.Repeat("a", 512), getDocID("wid", strings.Repeat("a", 513))) + s.Equal(strings.Repeat("a", 512), getDocID("wid", strings.Repeat("a", 512))) + s.Equal(strings.Repeat("a", 511), getDocID("wid", strings.Repeat("a", 511))) + s.Equal("w~"+strings.Repeat("a", 510), getDocID("wid", strings.Repeat("a", 510))) + s.Equal("wi~"+strings.Repeat("a", 509), getDocID("wid", strings.Repeat("a", 509))) + s.Equal("wid~"+strings.Repeat("a", 508), getDocID("wid", strings.Repeat("a", 508))) + s.Equal("wid~"+strings.Repeat("a", 507), getDocID("wid", strings.Repeat("a", 507))) + + s.Equal(strings.Repeat("a", 512-1-36)+"~fd86a520-741e-4fd3-a788-165c445ea6f3", getDocID(strings.Repeat("a", 1000), "fd86a520-741e-4fd3-a788-165c445ea6f3")) + s.Equal(strings.Repeat("a", 512-1-36)+"~fd86a520-741e-4fd3-a788-165c445ea6f3", getDocID(strings.Repeat("a", 477), "fd86a520-741e-4fd3-a788-165c445ea6f3")) + s.Equal(strings.Repeat("a", 512-1-36)+"~fd86a520-741e-4fd3-a788-165c445ea6f3", getDocID(strings.Repeat("a", 476), "fd86a520-741e-4fd3-a788-165c445ea6f3")) + s.Equal(strings.Repeat("a", 475)+"~fd86a520-741e-4fd3-a788-165c445ea6f3", getDocID(strings.Repeat("a", 475), "fd86a520-741e-4fd3-a788-165c445ea6f3")) + s.Equal(strings.Repeat("a", 474)+"~fd86a520-741e-4fd3-a788-165c445ea6f3", getDocID(strings.Repeat("a", 474), "fd86a520-741e-4fd3-a788-165c445ea6f3")) + s.Equal(strings.Repeat("a", 400)+"~fd86a520-741e-4fd3-a788-165c445ea6f3", getDocID(strings.Repeat("a", 400), "fd86a520-741e-4fd3-a788-165c445ea6f3")) +} + +func (s *ESVisibilitySuite) Test_getVisibilityTaskKey() { + s.Equal("22~8", getVisibilityTaskKey(22, 8)) + s.Equal("228~1978", getVisibilityTaskKey(228, 1978)) +} diff --git a/host/elasticsearch_test.go b/host/elasticsearch_test.go index 0333384d20d..bdb06c2c264 100644 --- a/host/elasticsearch_test.go +++ b/host/elasticsearch_test.go @@ -53,6 +53,7 @@ import ( workflowpb "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/server/common/config" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/payload" "go.temporal.io/server/common/payloads" @@ -1378,6 +1379,24 @@ func (s *elasticsearchIntegrationSuite) TestUpsertWorkflowExecution_InvalidKey() s.NotNil(failedEventAttr.GetFailure()) } +func (s *elasticsearchIntegrationSuite) Test_LongWorkflowID() { + if s.testClusterConfig.Persistence.StoreType == config.StoreTypeSQL { + // TODO: remove this when workflow_id field size is increased from varchar(255) in SQL schema. + return + } + + id := strings.Repeat("a", 1000) + wt := "es-integration-long-workflow-id-test-type" + tl := "es-integration-long-workflow-id-test-taskqueue" + request := s.createStartWorkflowExecutionRequest(id, wt, tl) + + we, err := s.engine.StartWorkflowExecution(NewContext(), request) + s.NoError(err) + + query := fmt.Sprintf(`WorkflowId = "%s"`, id) + s.testHelperForReadOnce(we.GetRunId(), query, false) +} + func (s *elasticsearchIntegrationSuite) putIndexSettings(indexName string, maxResultWindowSize int) { acknowledged, err := s.esClient.IndexPutSettings( context.Background(), From 112837dd25ca87e7298908b579725f79b8337a7f Mon Sep 17 00:00:00 2001 From: David Reiss Date: Wed, 3 Nov 2021 18:07:59 -0700 Subject: [PATCH 2/3] Update version to 1.13.1 --- common/headers/versionChecker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/headers/versionChecker.go b/common/headers/versionChecker.go index bacbf871fa1..0d118049854 100644 --- a/common/headers/versionChecker.go +++ b/common/headers/versionChecker.go @@ -41,8 +41,8 @@ const ( ClientNameTypeScriptSDK = "temporal-typescript" ClientNameCLI = "temporal-cli" - ServerVersion = "1.13.0" - CLIVersion = "1.13.0" + ServerVersion = "1.13.1" + CLIVersion = "1.13.1" // SupportedServerVersions is used by CLI and inter role communication. SupportedServerVersions = ">=1.0.0 <2.0.0" From ce5d63590cc8541a80cbe6215b9f0b197e76dccb Mon Sep 17 00:00:00 2001 From: Mike Cutalo Date: Mon, 13 Dec 2021 16:10:33 -0800 Subject: [PATCH 3/3] statsd separator --- common/metrics/tally/statsd/reporter.go | 52 ++++++++++++++++++++----- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/common/metrics/tally/statsd/reporter.go b/common/metrics/tally/statsd/reporter.go index 3a5e47e4e0a..cd60e707017 100644 --- a/common/metrics/tally/statsd/reporter.go +++ b/common/metrics/tally/statsd/reporter.go @@ -27,6 +27,7 @@ package statsd import ( "bytes" "sort" + "strings" "time" "github.com/cactus/go-statsd-client/statsd" @@ -37,6 +38,8 @@ import ( type temporalTallyStatsdReporter struct { //Wrapper on top of "github.com/uber-go/tally/statsd" tallystatsd tally.StatsReporter + + separator string } func (r *temporalTallyStatsdReporter) metricNameWithTags(originalName string, tags map[string]string) string { @@ -63,22 +66,23 @@ func (r *temporalTallyStatsdReporter) metricNameWithTags(originalName string, ta func NewReporter(statsd statsd.Statter, opts tallystatsdreporter.Options) tally.StatsReporter { return &temporalTallyStatsdReporter{ tallystatsd: tallystatsdreporter.NewReporter(statsd, opts), + separator: ".__", } } func (r *temporalTallyStatsdReporter) ReportCounter(name string, tags map[string]string, value int64) { - newName := r.metricNameWithTags(name, tags) - r.tallystatsd.ReportCounter(newName, map[string]string{}, value) + // newName := r.metricNameWithTags(name, tags) + r.tallystatsd.ReportCounter(r.taggedName(name, tags), map[string]string{}, value) } func (r *temporalTallyStatsdReporter) ReportGauge(name string, tags map[string]string, value float64) { - newName := r.metricNameWithTags(name, tags) - r.tallystatsd.ReportGauge(newName, map[string]string{}, value) + // newName := r.metricNameWithTags(name, tags) + r.tallystatsd.ReportGauge(r.taggedName(name, tags), map[string]string{}, value) } func (r *temporalTallyStatsdReporter) ReportTimer(name string, tags map[string]string, interval time.Duration) { - newName := r.metricNameWithTags(name, tags) - r.tallystatsd.ReportTimer(newName, map[string]string{}, interval) + // newName := r.metricNameWithTags(name, tags) + r.tallystatsd.ReportTimer(r.taggedName(name, tags), map[string]string{}, interval) } func (r *temporalTallyStatsdReporter) ReportHistogramValueSamples( @@ -89,8 +93,8 @@ func (r *temporalTallyStatsdReporter) ReportHistogramValueSamples( bucketUpperBound float64, samples int64, ) { - newName := r.metricNameWithTags(name, tags) - r.tallystatsd.ReportHistogramValueSamples(newName, map[string]string{}, buckets, bucketLowerBound, bucketUpperBound, samples) + // newName := r.metricNameWithTags(name, tags) + r.tallystatsd.ReportHistogramValueSamples(r.taggedName(name, tags), map[string]string{}, buckets, bucketLowerBound, bucketUpperBound, samples) } func (r *temporalTallyStatsdReporter) ReportHistogramDurationSamples( @@ -101,8 +105,8 @@ func (r *temporalTallyStatsdReporter) ReportHistogramDurationSamples( bucketUpperBound time.Duration, samples int64, ) { - newName := r.metricNameWithTags(name, tags) - r.tallystatsd.ReportHistogramDurationSamples(newName, map[string]string{}, buckets, bucketLowerBound, bucketUpperBound, samples) + // newName := r.metricNameWithTags(name, tags) + r.tallystatsd.ReportHistogramDurationSamples(r.taggedName(name, tags), map[string]string{}, buckets, bucketLowerBound, bucketUpperBound, samples) } func (r *temporalTallyStatsdReporter) Capabilities() tally.Capabilities { @@ -112,3 +116,31 @@ func (r *temporalTallyStatsdReporter) Capabilities() tally.Capabilities { func (r *temporalTallyStatsdReporter) Flush() { r.tallystatsd.Flush() } + +// https://github.com/influxdata/telegraf/blob/master/plugins/inputs/statsd/README.md#influx-statsd +func (r *temporalTallyStatsdReporter) taggedName(name string, tags map[string]string) string { + var b strings.Builder + b.WriteString(name) + for k, v := range tags { + b.WriteString(r.separator) + b.WriteString(replaceChars(k)) + b.WriteByte('=') + b.WriteString(replaceChars(v)) + } + return b.String() +} + +// Replace problematic characters in tags. +func replaceChars(s string) string { + var b strings.Builder + b.Grow(len(s)) + for i := 0; i < len(s); i++ { + switch s[i] { + case '.', ':', '|', '-', '=': + b.WriteByte('_') + default: + b.WriteByte(s[i]) + } + } + return b.String() +}