Skip to content

Commit

Permalink
remotecfg: wire calls to RegisterCollector (#1359)
Browse files Browse the repository at this point in the history
Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>
Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com>
  • Loading branch information
tpaschalis and clayton-cornell authored Aug 1, 2024
1 parent a014bda commit 69d1a38
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 7 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ Main (unreleased)
- Several bugfixes
- Full list of changes: https://github.com/grafana/beyla/releases/tag/v1.7.0

- Enable instances connected to remotecfg-compatible servers to Register
themselves to the remote service. (@tpaschalis)

### Bugfixes

- Fixed a clustering mode issue where a fatal startup failure of the clustering service
Expand Down
4 changes: 3 additions & 1 deletion docs/sources/reference/config-blocks/remotecfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ Name | Type | Description
`id` | `string` | A self-reported ID. | `see below` | no
`attributes` | `map(string)` | A set of self-reported attributes. | `{}` | no
`poll_frequency` | `duration` | How often to poll the API for new configuration. | `"1m"` | no
`name` | `string` | A human-readable name for the collector. | `""` | no

If the `url` is not set, then the service block is a no-op.

If not set, the self-reported `id` that {{< param "PRODUCT_NAME" >}} uses is a randomly generated, anonymous unique ID (UUID) that is stored as an `alloy_seed.json` file in {{< param "PRODUCT_NAME" >}}'s storage path so that it can persist across restarts.
If not set, the self-reported `id` that {{< param "PRODUCT_NAME" >}} uses is a randomly generated, anonymous unique ID (UUID) that is stored as an `alloy_seed.json` file in the {{< param "PRODUCT_NAME" >}} storage path so that it can persist across restarts.
You can use the `name` field to set another human-friendly identifier for the specific {{< param "PRODUCT_NAME" >}} instance.

The `id` and `attributes` fields are used in the periodic request sent to the
remote endpoint so that the API can decide what configuration to serve.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ require (
github.com/google/renameio/v2 v2.0.0
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.1
github.com/grafana/alloy-remote-config v0.0.6
github.com/grafana/alloy-remote-config v0.0.8
github.com/grafana/alloy/syntax v0.1.0
github.com/grafana/beyla v1.7.0
github.com/grafana/catchpoint-prometheus-exporter v0.0.0-20240606062944-e55f3668661d
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1008,8 +1008,8 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
github.com/gosnmp/gosnmp v1.37.0 h1:/Tf8D3b9wrnNuf/SfbvO+44mPrjVphBhRtcGg22V07Y=
github.com/gosnmp/gosnmp v1.37.0/go.mod h1:GDH9vNqpsD7f2HvZhKs5dlqSEcAS6s6Qp099oZRCR+M=
github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY=
github.com/grafana/alloy-remote-config v0.0.6 h1:/GzYu3/QPHRZFeqeeCZWnDMzXbXVkr7tAfrnHGZzreA=
github.com/grafana/alloy-remote-config v0.0.6/go.mod h1:kHE1usYo2WAVCikQkIXuoG1Clz8BSdiz3kF+DZSCQ4k=
github.com/grafana/alloy-remote-config v0.0.8 h1:bQTk7rkR1Hykss+bfMv7CucpF/fRsi2lixJHfIcOMnc=
github.com/grafana/alloy-remote-config v0.0.8/go.mod h1:kHE1usYo2WAVCikQkIXuoG1Clz8BSdiz3kF+DZSCQ4k=
github.com/grafana/beyla v1.7.0 h1:0BoIUTwIyXN2h1zrYm+/ffA9toezHJ5GQq4PqU3HrrU=
github.com/grafana/beyla v1.7.0/go.mod h1:n0dWKeQDxNY8qjgiG0ochjsSva94KPcDSXUBqwIrnVo=
github.com/grafana/cadvisor v0.0.0-20240729082359-1f04a91701e2 h1:ju6EcY2aEobeBg185ETtFCKj5WzaQ48qfkbsSRRQrF4=
Expand Down
10 changes: 10 additions & 0 deletions internal/service/remotecfg/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,13 @@ type noopClient struct{}
func (c noopClient) GetConfig(context.Context, *connect.Request[collectorv1.GetConfigRequest]) (*connect.Response[collectorv1.GetConfigResponse], error) {
return nil, errors.New("noop client")
}

// RegisterCollector checks in the current collector to the API on startup.
func (c noopClient) RegisterCollector(context.Context, *connect.Request[collectorv1.RegisterCollectorRequest]) (*connect.Response[collectorv1.RegisterCollectorResponse], error) {
return nil, errors.New("noop client")
}

// UnregisterCollector checks out the current collector to the API on shutdown.
func (c noopClient) UnregisterCollector(context.Context, *connect.Request[collectorv1.UnregisterCollectorRequest]) (*connect.Response[collectorv1.UnregisterCollectorResponse], error) {
return nil, errors.New("noop client")
}
19 changes: 19 additions & 0 deletions internal/service/remotecfg/remotecfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type Options struct {
type Arguments struct {
URL string `alloy:"url,attr,optional"`
ID string `alloy:"id,attr,optional"`
Name string `alloy:"name,attr,optional"`
Attributes map[string]string `alloy:"attributes,attr,optional"`
PollFrequency time.Duration `alloy:"poll_frequency,attr,optional"`
HTTPClientConfig *config.HTTPClientConfig `alloy:",squash"`
Expand Down Expand Up @@ -227,6 +228,7 @@ func (s *Service) Run(ctx context.Context, host service.Host) error {
s.ctrl = host.NewController(ServiceName)

s.fetch()
s.registerCollector()

// Run the service's own controller.
go func() {
Expand Down Expand Up @@ -288,6 +290,7 @@ func (s *Service) Update(newConfig any) error {

// Update the args as the last step to avoid polluting any comparisons
s.args = newArgs
s.registerCollector()
s.mut.Unlock()

// If we've already called Run, then immediately trigger an API call with
Expand All @@ -307,6 +310,22 @@ func (s *Service) fetch() {
s.fetchLocal()
}
}

func (s *Service) registerCollector() error {
req := connect.NewRequest(&collectorv1.RegisterCollectorRequest{
Id: s.args.ID,
Attributes: s.attrs,
Name: s.args.Name,
})
client := s.asClient

_, err := client.RegisterCollector(context.Background(), req)
if err != nil {
return err
}
return nil
}

func (s *Service) fetchRemote() error {
if !s.isEnabled() {
return nil
Expand Down
41 changes: 38 additions & 3 deletions internal/service/remotecfg/remotecfg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"os"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -43,6 +44,9 @@ func TestOnDiskCache(t *testing.T) {
client := &collectorClient{}
env.svc.asClient = client

var registerCalled atomic.Bool
client.registerCollectorFunc = buildRegisterCollectorFunc(&registerCalled)

// Mock client to return an unparseable response.
client.getConfigFunc = buildGetConfigHandler("unparseable config")

Expand All @@ -62,7 +66,7 @@ func TestOnDiskCache(t *testing.T) {
}

func TestAPIResponse(t *testing.T) {
ctx := componenttest.TestContext(t)
ctx, cancel := context.WithCancel(context.Background())
url := "https://example.com/"
cfg1 := `loki.process "default" { forward_to = [] }`
cfg2 := `loki.process "updated" { forward_to = [] }`
Expand All @@ -78,15 +82,19 @@ func TestAPIResponse(t *testing.T) {
env.svc.asClient = client

// Mock client to return a valid response.
var registerCalled atomic.Bool
client.mut.Lock()
client.getConfigFunc = buildGetConfigHandler(cfg1)
client.registerCollectorFunc = buildRegisterCollectorFunc(&registerCalled)
client.mut.Unlock()

// Run the service.
go func() {
require.NoError(t, env.Run(ctx))
}()

require.Eventually(t, func() bool { return registerCalled.Load() }, 1*time.Second, 10*time.Millisecond)

// As the API response was successful, verify that the service has loaded
// the valid response.
require.EventuallyWithT(t, func(c *assert.CollectT) {
Expand All @@ -102,6 +110,8 @@ func TestAPIResponse(t *testing.T) {
require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, getHash([]byte(cfg2)), env.svc.getCfgHash())
}, 1*time.Second, 10*time.Millisecond)

cancel()
}

func buildGetConfigHandler(in string) func(context.Context, *connect.Request[collectorv1.GetConfigRequest]) (*connect.Response[collectorv1.GetConfigResponse], error) {
Expand All @@ -115,6 +125,15 @@ func buildGetConfigHandler(in string) func(context.Context, *connect.Request[col
}
}

func buildRegisterCollectorFunc(called *atomic.Bool) func(ctx context.Context, req *connect.Request[collectorv1.RegisterCollectorRequest]) (*connect.Response[collectorv1.RegisterCollectorResponse], error) {
return func(ctx context.Context, req *connect.Request[collectorv1.RegisterCollectorRequest]) (*connect.Response[collectorv1.RegisterCollectorResponse], error) {
called.Store(true)
return &connect.Response[collectorv1.RegisterCollectorResponse]{
Msg: &collectorv1.RegisterCollectorResponse{},
}, nil
}
}

type testEnvironment struct {
t *testing.T
svc *Service
Expand Down Expand Up @@ -185,8 +204,9 @@ func (f fakeHost) NewController(id string) service.Controller {
}

type collectorClient struct {
mut sync.RWMutex
getConfigFunc func(context.Context, *connect.Request[collectorv1.GetConfigRequest]) (*connect.Response[collectorv1.GetConfigResponse], error)
mut sync.RWMutex
getConfigFunc func(context.Context, *connect.Request[collectorv1.GetConfigRequest]) (*connect.Response[collectorv1.GetConfigResponse], error)
registerCollectorFunc func(ctx context.Context, req *connect.Request[collectorv1.RegisterCollectorRequest]) (*connect.Response[collectorv1.RegisterCollectorResponse], error)
}

func (ag *collectorClient) GetConfig(ctx context.Context, req *connect.Request[collectorv1.GetConfigRequest]) (*connect.Response[collectorv1.GetConfigResponse], error) {
Expand All @@ -200,6 +220,21 @@ func (ag *collectorClient) GetConfig(ctx context.Context, req *connect.Request[c
panic("getConfigFunc not set")
}

func (ag *collectorClient) RegisterCollector(ctx context.Context, req *connect.Request[collectorv1.RegisterCollectorRequest]) (*connect.Response[collectorv1.RegisterCollectorResponse], error) {
ag.mut.RLock()
defer ag.mut.RUnlock()

if ag.registerCollectorFunc != nil {
return ag.registerCollectorFunc(ctx, req)
}

panic("registerCollectorFunc not set")
}

func (ag *collectorClient) UnregisterCollector(ctx context.Context, req *connect.Request[collectorv1.UnregisterCollectorRequest]) (*connect.Response[collectorv1.UnregisterCollectorResponse], error) {
panic("unregisterCollector isn't wired yet")
}

type serviceController struct {
f *alloy_runtime.Runtime
}
Expand Down

0 comments on commit 69d1a38

Please sign in to comment.