Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Statsd separator #2

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions common/headers/versionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ const (
ClientNameTypeScriptSDK = "temporal-typescript"
ClientNameCLI = "temporal-cli"

ServerVersion = "1.13.0"
CLIVersion = "1.13.0"
ServerVersion = "1.13.1"
CLIVersion = "1.13.1"

// SupportedServerVersions is used by CLI and inter role communication.
SupportedServerVersions = ">=1.0.0 <2.0.0"
Expand Down
10 changes: 10 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ func Error(err error) ZapTag {
return NewErrorTag(err)
}

// IsRetryable returns tag for IsRetryable
func IsRetryable(isRetryable bool) ZapTag {
return NewBoolTag("is-retryable", isRetryable)
}

// ClusterName returns tag for ClusterName
func ClusterName(clusterName string) ZapTag {
return NewStringTag("cluster-name", clusterName)
Expand Down Expand Up @@ -435,6 +440,11 @@ func Counter(c int) ZapTag {
return NewInt("counter", c)
}

// RequestCount returns tag for RequestCount
func RequestCount(c int) ZapTag {
return NewInt("request-count", c)
}

// Number returns tag for Number
func Number(n int64) ZapTag {
return NewInt64("number", n)
Expand Down
52 changes: 42 additions & 10 deletions common/metrics/tally/statsd/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package statsd
import (
"bytes"
"sort"
"strings"
"time"

"github.com/cactus/go-statsd-client/statsd"
Expand All @@ -37,6 +38,8 @@ import (
type temporalTallyStatsdReporter struct {
//Wrapper on top of "github.com/uber-go/tally/statsd"
tallystatsd tally.StatsReporter

separator string
}

func (r *temporalTallyStatsdReporter) metricNameWithTags(originalName string, tags map[string]string) string {
Expand All @@ -63,22 +66,23 @@ func (r *temporalTallyStatsdReporter) metricNameWithTags(originalName string, ta
func NewReporter(statsd statsd.Statter, opts tallystatsdreporter.Options) tally.StatsReporter {
return &temporalTallyStatsdReporter{
tallystatsd: tallystatsdreporter.NewReporter(statsd, opts),
separator: ".__",
}
}

func (r *temporalTallyStatsdReporter) ReportCounter(name string, tags map[string]string, value int64) {
newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportCounter(newName, map[string]string{}, value)
// newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportCounter(r.taggedName(name, tags), map[string]string{}, value)
}

func (r *temporalTallyStatsdReporter) ReportGauge(name string, tags map[string]string, value float64) {
newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportGauge(newName, map[string]string{}, value)
// newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportGauge(r.taggedName(name, tags), map[string]string{}, value)
}

func (r *temporalTallyStatsdReporter) ReportTimer(name string, tags map[string]string, interval time.Duration) {
newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportTimer(newName, map[string]string{}, interval)
// newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportTimer(r.taggedName(name, tags), map[string]string{}, interval)
}

func (r *temporalTallyStatsdReporter) ReportHistogramValueSamples(
Expand All @@ -89,8 +93,8 @@ func (r *temporalTallyStatsdReporter) ReportHistogramValueSamples(
bucketUpperBound float64,
samples int64,
) {
newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportHistogramValueSamples(newName, map[string]string{}, buckets, bucketLowerBound, bucketUpperBound, samples)
// newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportHistogramValueSamples(r.taggedName(name, tags), map[string]string{}, buckets, bucketLowerBound, bucketUpperBound, samples)
}

func (r *temporalTallyStatsdReporter) ReportHistogramDurationSamples(
Expand All @@ -101,8 +105,8 @@ func (r *temporalTallyStatsdReporter) ReportHistogramDurationSamples(
bucketUpperBound time.Duration,
samples int64,
) {
newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportHistogramDurationSamples(newName, map[string]string{}, buckets, bucketLowerBound, bucketUpperBound, samples)
// newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportHistogramDurationSamples(r.taggedName(name, tags), map[string]string{}, buckets, bucketLowerBound, bucketUpperBound, samples)
}

func (r *temporalTallyStatsdReporter) Capabilities() tally.Capabilities {
Expand All @@ -112,3 +116,31 @@ func (r *temporalTallyStatsdReporter) Capabilities() tally.Capabilities {
func (r *temporalTallyStatsdReporter) Flush() {
r.tallystatsd.Flush()
}

// https://github.com/influxdata/telegraf/blob/master/plugins/inputs/statsd/README.md#influx-statsd
func (r *temporalTallyStatsdReporter) taggedName(name string, tags map[string]string) string {
var b strings.Builder
b.WriteString(name)
for k, v := range tags {
b.WriteString(r.separator)
b.WriteString(replaceChars(k))
b.WriteByte('=')
b.WriteString(replaceChars(v))
}
return b.String()
}

// Replace problematic characters in tags.
func replaceChars(s string) string {
var b strings.Builder
b.Grow(len(s))
for i := 0; i < len(s); i++ {
switch s[i] {
case '.', ':', '|', '-', '=':
b.WriteByte('_')
default:
b.WriteByte(s[i])
}
}
return b.String()
}
15 changes: 9 additions & 6 deletions common/persistence/visibility/store/elasticsearch/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -216,13 +217,14 @@ func (p *processorImpl) bulkBeforeAction(_ int64, requests []elastic.BulkableReq
// bulkAfterAction is triggered after bulk processor commit
func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
if err != nil {
// This happens after configured retry, which means something bad happens on cluster or index
// When cluster back to live, processor will re-commit those failure requests

const logFirstNRequests = 5
isRetryable := client.IsRetryableError(err)
p.logger.Error("Unable to commit bulk ES request.", tag.Error(err), tag.Bool(isRetryable))
for _, request := range requests {
p.logger.Error("ES request failed.", tag.ESRequest(request.String()))
var logRequests strings.Builder
for i, request := range requests {
if i < logFirstNRequests {
logRequests.WriteString(request.String())
logRequests.WriteRune('\n')
}
p.metricsClient.IncCounter(metrics.ElasticsearchBulkProcessor, metrics.ElasticsearchBulkProcessorFailures)

if !isRetryable {
Expand All @@ -233,6 +235,7 @@ func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequ
p.sendToAckChan(visibilityTaskKey, false)
}
}
p.logger.Error("Unable to commit bulk ES request.", tag.Error(err), tag.IsRetryable(isRetryable), tag.RequestCount(len(requests)), tag.ESRequest(logRequests.String()))
return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"errors"
"fmt"
"io"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -173,11 +174,25 @@ func (s *visibilityStore) DeleteWorkflowExecution(request *manager.VisibilityDel
}

func getDocID(workflowID string, runID string) string {
return fmt.Sprintf("%s%s%s", workflowID, delimiter, runID)
// From Elasticsearch doc: _id is limited to 512 bytes in size and larger values will be rejected.
const maxDocIDLength = 512
// Generally runID is guid and this should never be the case.
if len(runID)+len(delimiter) >= maxDocIDLength {
if len(runID) >= maxDocIDLength {
return runID[0:maxDocIDLength]
}
return runID[0 : maxDocIDLength-len(delimiter)]
}

if len(workflowID)+len(runID)+len(delimiter) > maxDocIDLength {
workflowID = workflowID[0 : maxDocIDLength-len(runID)-len(delimiter)]
}

return workflowID + delimiter + runID
}

func getVisibilityTaskKey(shardID int32, taskID int64) string {
return fmt.Sprintf("%d%s%d", shardID, delimiter, taskID)
return strconv.FormatInt(int64(shardID), 10) + delimiter + strconv.FormatInt(taskID, 10)
}

func (s *visibilityStore) addBulkIndexRequestAndWait(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package elasticsearch

import (
"fmt"
"strings"
"time"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -269,3 +270,28 @@ func (s *ESVisibilitySuite) TestDeleteExecution_EmptyRequest() {
err := s.visibilityStore.DeleteWorkflowExecution(request)
s.NoError(err)
}

func (s *ESVisibilitySuite) Test_getDocID() {
s.Equal("wid~rid", getDocID("wid", "rid"))

s.Equal(strings.Repeat("a", 512), getDocID("wid", strings.Repeat("a", 1000)))
s.Equal(strings.Repeat("a", 512), getDocID("wid", strings.Repeat("a", 513)))
s.Equal(strings.Repeat("a", 512), getDocID("wid", strings.Repeat("a", 512)))
s.Equal(strings.Repeat("a", 511), getDocID("wid", strings.Repeat("a", 511)))
s.Equal("w~"+strings.Repeat("a", 510), getDocID("wid", strings.Repeat("a", 510)))
s.Equal("wi~"+strings.Repeat("a", 509), getDocID("wid", strings.Repeat("a", 509)))
s.Equal("wid~"+strings.Repeat("a", 508), getDocID("wid", strings.Repeat("a", 508)))
s.Equal("wid~"+strings.Repeat("a", 507), getDocID("wid", strings.Repeat("a", 507)))

s.Equal(strings.Repeat("a", 512-1-36)+"~fd86a520-741e-4fd3-a788-165c445ea6f3", getDocID(strings.Repeat("a", 1000), "fd86a520-741e-4fd3-a788-165c445ea6f3"))
s.Equal(strings.Repeat("a", 512-1-36)+"~fd86a520-741e-4fd3-a788-165c445ea6f3", getDocID(strings.Repeat("a", 477), "fd86a520-741e-4fd3-a788-165c445ea6f3"))
s.Equal(strings.Repeat("a", 512-1-36)+"~fd86a520-741e-4fd3-a788-165c445ea6f3", getDocID(strings.Repeat("a", 476), "fd86a520-741e-4fd3-a788-165c445ea6f3"))
s.Equal(strings.Repeat("a", 475)+"~fd86a520-741e-4fd3-a788-165c445ea6f3", getDocID(strings.Repeat("a", 475), "fd86a520-741e-4fd3-a788-165c445ea6f3"))
s.Equal(strings.Repeat("a", 474)+"~fd86a520-741e-4fd3-a788-165c445ea6f3", getDocID(strings.Repeat("a", 474), "fd86a520-741e-4fd3-a788-165c445ea6f3"))
s.Equal(strings.Repeat("a", 400)+"~fd86a520-741e-4fd3-a788-165c445ea6f3", getDocID(strings.Repeat("a", 400), "fd86a520-741e-4fd3-a788-165c445ea6f3"))
}

func (s *ESVisibilitySuite) Test_getVisibilityTaskKey() {
s.Equal("22~8", getVisibilityTaskKey(22, 8))
s.Equal("228~1978", getVisibilityTaskKey(228, 1978))
}
19 changes: 19 additions & 0 deletions host/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/server/common/config"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/payloads"
Expand Down Expand Up @@ -1378,6 +1379,24 @@ func (s *elasticsearchIntegrationSuite) TestUpsertWorkflowExecution_InvalidKey()
s.NotNil(failedEventAttr.GetFailure())
}

func (s *elasticsearchIntegrationSuite) Test_LongWorkflowID() {
if s.testClusterConfig.Persistence.StoreType == config.StoreTypeSQL {
// TODO: remove this when workflow_id field size is increased from varchar(255) in SQL schema.
return
}

id := strings.Repeat("a", 1000)
wt := "es-integration-long-workflow-id-test-type"
tl := "es-integration-long-workflow-id-test-taskqueue"
request := s.createStartWorkflowExecutionRequest(id, wt, tl)

we, err := s.engine.StartWorkflowExecution(NewContext(), request)
s.NoError(err)

query := fmt.Sprintf(`WorkflowId = "%s"`, id)
s.testHelperForReadOnce(we.GetRunId(), query, false)
}

func (s *elasticsearchIntegrationSuite) putIndexSettings(indexName string, maxResultWindowSize int) {
acknowledged, err := s.esClient.IndexPutSettings(
context.Background(),
Expand Down