Skip to content

Commit

Permalink
[CHORE] processing discovered targets async (#3517)
Browse files Browse the repository at this point in the history
* Revert "Support configuring java runtime from configmap or secret (env.valueFrom)" (#3510)

* Revert "Support configuring java runtime from configmap or secret (env.valueF…"

This reverts commit 2b36f0d.

* chlog (#3511)

* [CHORE] changing log level

Signed-off-by: Nicolas Takashi <nicolas.takashi@coralogix.com>

* [CHORE] renaming method

Signed-off-by: Nicolas Takashi <nicolas.takashi@coralogix.com>

* [CHORE] adding change log entry

Signed-off-by: Nicolas Takashi <nicolas.takashi@coralogix.com>

* [CHORE] locking targets per job

Signed-off-by: Nicolas Takashi <nicolas.takashi@coralogix.com>

* Update .chloggen/discovering-target-async.yaml

Co-authored-by: Mikołaj Świątek <mail@mikolajswiatek.com>

* [REFACTORY] applying comments

Signed-off-by: Nicolas Takashi <nicolas.takashi@coralogix.com>

* [CHORE] adding mutex back

Signed-off-by: Nicolas Takashi <nicolas.takashi@coralogix.com>

---------

Signed-off-by: Nicolas Takashi <nicolas.takashi@coralogix.com>
Co-authored-by: Jacob Aronoff <jaronoff97@users.noreply.github.com>
Co-authored-by: Mikołaj Świątek <mail@mikolajswiatek.com>
  • Loading branch information
3 people authored Dec 19, 2024
1 parent 5eefae8 commit b387afd
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 67 deletions.
21 changes: 21 additions & 0 deletions .chloggen/discovering-target-async.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
component: target allocator

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Process discovered targets asyncchronously

# One or more tracking issues related to the change
issues: [1842]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
This change enables the target allocator to process discovered targets asynchronously.
This is a significant performance improvement for the target allocator, as it allows it to process targets in parallel, rather than sequentially.
This change also introduces new metrics to track the performance of the target allocator.
- opentelemetry_allocator_process_targets_duration_seconds: The duration of the process targets operation.
- opentelemetry_allocator_process_target_groups_duration_seconds: The duration of the process target groups operation.
28 changes: 13 additions & 15 deletions cmd/otel-allocator/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/stretchr/testify/require"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -45,18 +44,17 @@ import (
// the HTTP server afterward. Test data is chosen to be reasonably representative of what the Prometheus service discovery
// outputs in the real world.
func BenchmarkProcessTargets(b *testing.B) {
numTargets := 10000
numTargets := 800000
targetsPerGroup := 5
groupsPerJob := 20
tsets := prepareBenchmarkData(numTargets, targetsPerGroup, groupsPerJob)
labelsBuilder := labels.NewBuilder(labels.EmptyLabels())

b.ResetTimer()
for _, strategy := range allocation.GetRegisteredAllocatorNames() {
b.Run(strategy, func(b *testing.B) {
targetDiscoverer, allocator := createTestDiscoverer(strategy, map[string][]*relabel.Config{})
targetDiscoverer := createTestDiscoverer(strategy, map[string][]*relabel.Config{})
targetDiscoverer.UpdateTsets(tsets)
b.ResetTimer()
for i := 0; i < b.N; i++ {
targetDiscoverer.ProcessTargets(labelsBuilder, tsets, allocator.SetTargets)
targetDiscoverer.Reload()
}
})
}
Expand All @@ -65,11 +63,10 @@ func BenchmarkProcessTargets(b *testing.B) {
// BenchmarkProcessTargetsWithRelabelConfig is BenchmarkProcessTargets with a relabel config set. The relabel config
// does not actually modify any records, but does force the prehook to perform any necessary conversions along the way.
func BenchmarkProcessTargetsWithRelabelConfig(b *testing.B) {
numTargets := 10000
numTargets := 800000
targetsPerGroup := 5
groupsPerJob := 20
tsets := prepareBenchmarkData(numTargets, targetsPerGroup, groupsPerJob)
labelsBuilder := labels.NewBuilder(labels.EmptyLabels())
prehookConfig := make(map[string][]*relabel.Config, len(tsets))
for jobName := range tsets {
// keep all targets in half the jobs, drop the rest
Expand All @@ -91,12 +88,13 @@ func BenchmarkProcessTargetsWithRelabelConfig(b *testing.B) {
}
}

b.ResetTimer()
for _, strategy := range allocation.GetRegisteredAllocatorNames() {
b.Run(strategy, func(b *testing.B) {
targetDiscoverer, allocator := createTestDiscoverer(strategy, prehookConfig)
targetDiscoverer := createTestDiscoverer(strategy, prehookConfig)
targetDiscoverer.UpdateTsets(tsets)
b.ResetTimer()
for i := 0; i < b.N; i++ {
targetDiscoverer.ProcessTargets(labelsBuilder, tsets, allocator.SetTargets)
targetDiscoverer.Reload()
}
})
}
Expand Down Expand Up @@ -172,7 +170,7 @@ func prepareBenchmarkData(numTargets, targetsPerGroup, groupsPerJob int) map[str
return tsets
}

func createTestDiscoverer(allocationStrategy string, prehookConfig map[string][]*relabel.Config) (*target.Discoverer, allocation.Allocator) {
func createTestDiscoverer(allocationStrategy string, prehookConfig map[string][]*relabel.Config) *target.Discoverer {
ctx := context.Background()
logger := ctrl.Log.WithName(fmt.Sprintf("bench-%s", allocationStrategy))
ctrl.SetLogger(logr.New(log.NullLogSink{}))
Expand All @@ -187,6 +185,6 @@ func createTestDiscoverer(allocationStrategy string, prehookConfig map[string][]
registry := prometheus.NewRegistry()
sdMetrics, _ := discovery.CreateAndRegisterSDMetrics(registry)
discoveryManager := discovery.NewManager(ctx, gokitlog.NewNopLogger(), registry, sdMetrics)
targetDiscoverer := target.NewDiscoverer(logger, discoveryManager, allocatorPrehook, srv)
return targetDiscoverer, allocator
targetDiscoverer := target.NewDiscoverer(logger, discoveryManager, allocatorPrehook, srv, allocator.SetTargets)
return targetDiscoverer
}
4 changes: 2 additions & 2 deletions cmd/otel-allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func main() {
}
discoveryManager = discovery.NewManager(discoveryCtx, gokitlog.NewNopLogger(), prometheus.DefaultRegisterer, sdMetrics)

targetDiscoverer = target.NewDiscoverer(log, discoveryManager, allocatorPrehook, srv)
targetDiscoverer = target.NewDiscoverer(log, discoveryManager, allocatorPrehook, srv, allocator.SetTargets)
collectorWatcher, collectorWatcherErr := collector.NewCollectorWatcher(log, cfg.ClusterConfig)
if collectorWatcherErr != nil {
setupLog.Error(collectorWatcherErr, "Unable to initialize collector watcher")
Expand Down Expand Up @@ -175,7 +175,7 @@ func main() {
setupLog.Info("Prometheus config empty, skipping initial discovery configuration")
}

err := targetDiscoverer.Watch(allocator.SetTargets)
err := targetDiscoverer.Run()
setupLog.Info("Target discoverer exited")
return err
},
Expand Down
177 changes: 139 additions & 38 deletions cmd/otel-allocator/target/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package target
import (
"hash"
"hash/fnv"
"sync"
"time"

"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"go.uber.org/zap/zapcore"
"gopkg.in/yaml.v3"

allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher"
Expand All @@ -37,16 +40,33 @@ var (
Name: "opentelemetry_allocator_targets",
Help: "Number of targets discovered.",
}, []string{"job_name"})

processTargetsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "opentelemetry_allocator_process_targets_duration_seconds",
Help: "Duration of processing targets.",
Buckets: []float64{1, 5, 10, 30, 60, 120},
})

processTargetGroupsDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "opentelemetry_allocator_process_target_groups_duration_seconds",
Help: "Duration of processing target groups.",
Buckets: []float64{1, 5, 10, 30, 60, 120},
}, []string{"job_name"})
)

type Discoverer struct {
log logr.Logger
manager *discovery.Manager
close chan struct{}
configsMap map[allocatorWatcher.EventSource][]*promconfig.ScrapeConfig
hook discoveryHook
scrapeConfigsHash hash.Hash
scrapeConfigsUpdater scrapeConfigsUpdater
log logr.Logger
manager *discovery.Manager
close chan struct{}
mtxScrape sync.Mutex // Guards the fields below.
configsMap map[allocatorWatcher.EventSource][]*promconfig.ScrapeConfig
hook discoveryHook
scrapeConfigsHash hash.Hash
scrapeConfigsUpdater scrapeConfigsUpdater
targetSets map[string][]*targetgroup.Group
triggerReload chan struct{}
processTargetsCallBack func(targets map[string]*Item)
mtxTargets sync.Mutex
}

type discoveryHook interface {
Expand All @@ -57,15 +77,17 @@ type scrapeConfigsUpdater interface {
UpdateScrapeConfigResponse(map[string]*promconfig.ScrapeConfig) error
}

func NewDiscoverer(log logr.Logger, manager *discovery.Manager, hook discoveryHook, scrapeConfigsUpdater scrapeConfigsUpdater) *Discoverer {
func NewDiscoverer(log logr.Logger, manager *discovery.Manager, hook discoveryHook, scrapeConfigsUpdater scrapeConfigsUpdater, setTargets func(targets map[string]*Item)) *Discoverer {
return &Discoverer{
log: log,
manager: manager,
close: make(chan struct{}),
configsMap: make(map[allocatorWatcher.EventSource][]*promconfig.ScrapeConfig),
hook: hook,
scrapeConfigsHash: nil, // we want the first update to succeed even if the config is empty
scrapeConfigsUpdater: scrapeConfigsUpdater,
log: log,
manager: manager,
close: make(chan struct{}),
triggerReload: make(chan struct{}, 1),
configsMap: make(map[allocatorWatcher.EventSource][]*promconfig.ScrapeConfig),
hook: hook,
scrapeConfigsHash: nil, // we want the first update to succeed even if the config is empty
scrapeConfigsUpdater: scrapeConfigsUpdater,
processTargetsCallBack: setTargets,
}
}

Expand Down Expand Up @@ -105,43 +127,122 @@ func (m *Discoverer) ApplyConfig(source allocatorWatcher.EventSource, scrapeConf
return m.manager.ApplyConfig(discoveryCfg)
}

func (m *Discoverer) Watch(fn func(targets map[string]*Item)) error {
labelsBuilder := labels.NewBuilder(labels.EmptyLabels())
func (m *Discoverer) Run() error {
err := m.run(m.manager.SyncCh())
if err != nil {
m.log.Error(err, "Service Discovery watch event failed")
return err
}
<-m.close
m.log.Info("Service Discovery watch event stopped: discovery manager closed")
return nil
}

// UpdateTsets updates the target sets to be scraped.
func (m *Discoverer) UpdateTsets(tsets map[string][]*targetgroup.Group) {
m.mtxScrape.Lock()
m.targetSets = tsets
m.mtxScrape.Unlock()
}

// reloader triggers a reload of the scrape configs at regular intervals.
// The time between reloads is defined by reloadIntervalDuration to avoid overloading the system
// with too many reloads, because some service discovery mechanisms can be quite chatty.
func (m *Discoverer) reloader() {
reloadIntervalDuration := model.Duration(5 * time.Second)
ticker := time.NewTicker(time.Duration(reloadIntervalDuration))

defer ticker.Stop()

for {
select {
case <-m.close:
m.log.Info("Service Discovery watch event stopped: discovery manager closed")
return nil
case tsets := <-m.manager.SyncCh():
m.ProcessTargets(labelsBuilder, tsets, fn)
return
case <-ticker.C:
select {
case <-m.triggerReload:
m.Reload()
case <-m.close:
return
}
}
}
}

func (m *Discoverer) ProcessTargets(builder *labels.Builder, tsets map[string][]*targetgroup.Group, fn func(targets map[string]*Item)) {
// Reload triggers a reload of the scrape configs.
// This will process the target groups and update the targets concurrently.
func (m *Discoverer) Reload() {
m.mtxScrape.Lock()
var wg sync.WaitGroup
targets := map[string]*Item{}
timer := prometheus.NewTimer(processTargetsDuration)
defer timer.ObserveDuration()

for jobName, groups := range m.targetSets {
wg.Add(1)
// Run the sync in parallel as these take a while and at high load can't catch up.
go func(jobName string, groups []*targetgroup.Group) {
processedTargets := m.processTargetGroups(jobName, groups)
m.mtxTargets.Lock()
for k, v := range processedTargets {
targets[k] = v
}
m.mtxTargets.Unlock()
wg.Done()
}(jobName, groups)
}
m.mtxScrape.Unlock()
wg.Wait()
m.processTargetsCallBack(targets)
}

for jobName, tgs := range tsets {
var count float64 = 0
for _, tg := range tgs {
builder.Reset(labels.EmptyLabels())
for ln, lv := range tg.Labels {
// processTargetGroups processes the target groups and returns a map of targets.
func (m *Discoverer) processTargetGroups(jobName string, groups []*targetgroup.Group) map[string]*Item {
builder := labels.NewBuilder(labels.Labels{})
timer := prometheus.NewTimer(processTargetGroupsDuration.WithLabelValues(jobName))
targets := map[string]*Item{}
defer timer.ObserveDuration()
var count float64 = 0
for _, tg := range groups {
builder.Reset(labels.EmptyLabels())
for ln, lv := range tg.Labels {
builder.Set(string(ln), string(lv))
}
groupLabels := builder.Labels()
for _, t := range tg.Targets {
count++
builder.Reset(groupLabels)
for ln, lv := range t {
builder.Set(string(ln), string(lv))
}
groupLabels := builder.Labels()
for _, t := range tg.Targets {
count++
builder.Reset(groupLabels)
for ln, lv := range t {
builder.Set(string(ln), string(lv))
}
item := NewItem(jobName, string(t[model.AddressLabel]), builder.Labels(), "")
targets[item.Hash()] = item
item := NewItem(jobName, string(t[model.AddressLabel]), builder.Labels(), "")
targets[item.Hash()] = item
}
}
targetsDiscovered.WithLabelValues(jobName).Set(count)
return targets
}

// Run receives and saves target set updates and triggers the scraping loops reloading.
// Reloading happens in the background so that it doesn't block receiving targets updates.
func (m *Discoverer) run(tsets <-chan map[string][]*targetgroup.Group) error {
go m.reloader()
for {
select {
case ts := <-tsets:
m.log.V(int(zapcore.DebugLevel)).Info("Service Discovery watch event received", "targets groups", len(ts))
m.UpdateTsets(ts)

select {
case m.triggerReload <- struct{}{}:
default:
}

case <-m.close:
m.log.Info("Service Discovery watch event stopped: discovery manager closed")
return nil
}
targetsDiscovered.WithLabelValues(jobName).Set(count)
}
fn(targets)
}

func (m *Discoverer) Close() {
Expand Down
Loading

0 comments on commit b387afd

Please sign in to comment.