diff --git a/internal/api/api.go b/internal/api/api.go index 1b651ef27..8f0136276 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -62,6 +62,7 @@ func New( wholeConf any, log log.Modular, stats metrics.Type, + count *int, opts ...OptFunc, ) (*Type, error) { gMux := mux.NewRouter() @@ -146,6 +147,10 @@ func New( } } + handleConfigAcknowledgement := func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "{\"success_reload_count\":\"%v\"}", *count) + } + if t.conf.DebugEndpoints { t.RegisterEndpoint( "/debug/config/json", "DEBUG: Returns the loaded config as JSON.", @@ -200,6 +205,7 @@ func New( t.RegisterEndpoint("/ping", "Ping me.", handlePing) t.RegisterEndpoint("/version", "Returns the service version.", handleVersion) + t.RegisterEndpoint("/config/ack", "Returns the count of success watcher", handleConfigAcknowledgement) t.RegisterEndpoint("/endpoints", "Returns this map of endpoints.", handleEndpoints) // If we want to expose a stats endpoint we register the endpoints. diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 97a4cc40d..a0a745dcf 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -20,7 +20,7 @@ func TestAPIEnableCORS(t *testing.T) { conf.CORS.Enabled = true conf.CORS.AllowedOrigins = []string{"*"} - s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop()) + s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil) require.NoError(t, err) handler := s.Handler() @@ -41,7 +41,7 @@ func TestAPIEnableCORSOrigins(t *testing.T) { conf.CORS.Enabled = true conf.CORS.AllowedOrigins = []string{"foo", "bar"} - s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop()) + s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil) require.NoError(t, err) handler := s.Handler() @@ -81,7 +81,7 @@ func TestAPIEnableCORSNoHeaders(t *testing.T) { conf := api.NewConfig() conf.CORS.Enabled = true - _, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop()) + _, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil) require.Error(t, err) assert.Contains(t, err.Error(), "must specify at least one allowed origin") } @@ -164,7 +164,7 @@ func TestAPIBasicAuth(t *testing.T) { conf.BasicAuth.PasswordHash = tc.correctPass conf.BasicAuth.Salt = "EzrwNJYw2wkErVVV1P36FQ==" - s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop()) + s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil) if ok := tc.expectedErr(t, err); !(ok && err == nil) { return } diff --git a/internal/cli/common/manager.go b/internal/cli/common/manager.go index be51a3921..0bc86a4c9 100644 --- a/internal/cli/common/manager.go +++ b/internal/cli/common/manager.go @@ -30,6 +30,7 @@ func CreateManager( logger log.Modular, streamsMode bool, conf config.Type, + count *int, mgrOpts ...manager.OptFunc, ) (stoppableMgr *StoppableManager, err error) { var stats *metrics.Namespaced @@ -88,7 +89,7 @@ func CreateManager( } var httpServer *api.Type - if httpServer, err = api.New(cliOpts.Version, cliOpts.DateBuilt, conf.HTTP, sanitNode, logger, stats); err != nil { + if httpServer, err = api.New(cliOpts.Version, cliOpts.DateBuilt, conf.HTTP, sanitNode, logger, stats, count); err != nil { err = fmt.Errorf("failed to initialise API: %w", err) return } diff --git a/internal/cli/common/service.go b/internal/cli/common/service.go index f0f91595d..93c3f6003 100644 --- a/internal/cli/common/service.go +++ b/internal/cli/common/service.go @@ -54,6 +54,7 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error { } verLogger := logger.With("benthos_version", cliOpts.Version) + if mainPath == "" { verLogger.Info("Running without a main config file") } else if inferredMainPath { @@ -73,8 +74,9 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error { if strict && len(lints) > 0 { return errors.New(cliOpts.ExecTemplate("shutting down due to linter errors, to prevent shutdown run {{.ProductName}} with --chilled")) } - - stoppableManager, err := CreateManager(c, cliOpts, logger, streamsMode, conf) + //Success Watcher Count Is Used to for to get count of the config which was updated with the watcher flag. + success_reload_count := 0 + stoppableManager, err := CreateManager(c, cliOpts, logger, streamsMode, conf, &success_reload_count) if err != nil { return err } @@ -90,9 +92,10 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error { watching := cliOpts.RootFlags.GetWatcher(c) if streamsMode { enableStreamsAPI := !c.Bool("no-api") - stoppableStream, err = initStreamsMode(cliOpts, strict, watching, enableStreamsAPI, confReader, stoppableManager.Manager()) + stoppableStream, err = initStreamsMode(cliOpts, strict, watching, enableStreamsAPI, confReader, stoppableManager.Manager(), &success_reload_count) } else { - stoppableStream, dataStreamClosedChan, err = initNormalMode(cliOpts, conf, strict, watching, confReader, stoppableManager.Manager()) + logger.Info("InitMode Get Initiated... strict:%v", strict) + stoppableStream, dataStreamClosedChan, err = initNormalMode(cliOpts, conf, strict, watching, confReader, stoppableManager.Manager(), &success_reload_count) } if err != nil { return err @@ -133,6 +136,7 @@ func initStreamsMode( strict, watching, enableAPI bool, confReader *config.Reader, mgr *manager.Type, + success_reload_count *int, ) (RunningStream, error) { logger := mgr.Logger() streamMgr := strmmgr.New(mgr, strmmgr.OptAPIEnabled(enableAPI)) @@ -181,7 +185,7 @@ func initStreamsMode( } if watching { - if err := confReader.BeginFileWatching(mgr, strict); err != nil { + if err := confReader.BeginFileWatching(mgr, strict, success_reload_count); err != nil { return nil, fmt.Errorf("failed to create stream config watcher: %w", err) } } @@ -194,6 +198,7 @@ func initNormalMode( strict, watching bool, confReader *config.Reader, mgr *manager.Type, + success_reload_count *int, ) (newStream RunningStream, stoppedChan chan struct{}, err error) { logger := mgr.Logger() @@ -231,7 +236,7 @@ func initNormalMode( } if watching { - if err := confReader.BeginFileWatching(mgr, strict); err != nil { + if err := confReader.BeginFileWatching(mgr, strict, success_reload_count); err != nil { return nil, nil, fmt.Errorf("failed to create config file watcher: %w", err) } } diff --git a/internal/cli/studio/pull_runner.go b/internal/cli/studio/pull_runner.go index a773ec8a3..c705046e6 100644 --- a/internal/cli/studio/pull_runner.go +++ b/internal/cli/studio/pull_runner.go @@ -297,7 +297,7 @@ func (r *PullRunner) bootstrapConfigReader(ctx context.Context) (bootstrapErr er tmpTracingSummary.SetEnabled(false) stopMgrTmp, err := common.CreateManager( - r.cliContext, r.cliOpts, r.logger, false, conf, + r.cliContext, r.cliOpts, r.logger, false, conf, nil, manager.OptSetEnvironment(tmpEnv), manager.OptSetBloblangEnvironment(bloblEnv), manager.OptSetFS(sessFS)) @@ -413,13 +413,13 @@ func (r *PullRunner) Sync(ctx context.Context) { } } for _, res := range diff.AddResources { - if err := r.confReader.TriggerResourceUpdate(r.mgr, r.strictMode, res.Name); err != nil { + if err := r.confReader.TriggerResourceUpdate(r.mgr, r.strictMode, res.Name, nil); err != nil { r.logger.Error("Failed to reflect resource file '%v' update: %v", res.Name, err) runErr = err } } if diff.MainConfig != nil { - if err := r.confReader.TriggerMainUpdate(r.mgr, r.strictMode, diff.MainConfig.Name); err != nil { + if err := r.confReader.TriggerMainUpdate(r.mgr, r.strictMode, diff.MainConfig.Name, nil); err != nil { r.logger.Error("Failed to reflect main config file '%v' update: %v", diff.MainConfig.Name, err) runErr = err } diff --git a/internal/config/reader.go b/internal/config/reader.go index 98e7d014c..095717277 100644 --- a/internal/config/reader.go +++ b/internal/config/reader.go @@ -357,7 +357,7 @@ func (r *Reader) readMain(mainPath string) (conf Type, pConf *docs.ParsedConfig, // TriggerMainUpdate attempts to re-read the main configuration file, trigger // the provided main update func, and apply changes to resources to the provided // manager as appropriate. -func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPath string) error { +func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPath string, success_reload_count *int) error { conf, _, lints, err := r.readMain(newPath) if errors.Is(err, fs.ErrNotExist) { if r.mainPath != newPath { @@ -416,6 +416,13 @@ func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPat mgr.Logger().Error("Failed to apply updated config: %v", err) return err } + + // Success Watcher Count denotes if the configuration in the benthos gets updated with the watcher + // flag then success watcher count gets increased + if success_reload_count != nil { + *success_reload_count = *success_reload_count + 1 + mgr.Logger().Info("Success Reload Count: %v, For Normal Config", *success_reload_count) + } mgr.Logger().Info("Updated main config") } return nil diff --git a/internal/config/reader_test.go b/internal/config/reader_test.go index 96589cd78..230d1631b 100644 --- a/internal/config/reader_test.go +++ b/internal/config/reader_test.go @@ -161,7 +161,7 @@ processor_resources: assert.False(t, testMgr.ProbeProcessor("c")) assert.False(t, testMgr.ProbeProcessor("d")) - require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "bar_main.yaml")) + require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "bar_main.yaml", nil)) // Wait for the config watcher to reload the config select { @@ -226,10 +226,10 @@ processor_resources: return nil })) - require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "a.yaml")) - require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "b.yaml")) + require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "a.yaml", nil)) + require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "b.yaml", nil)) - require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "foo_main.yaml")) + require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "foo_main.yaml", nil)) assert.Equal(t, "fooin", conf.Input.Label) assert.Equal(t, "fooout", conf.Output.Label) diff --git a/internal/config/resource_reader.go b/internal/config/resource_reader.go index 26323caa4..dab995c63 100644 --- a/internal/config/resource_reader.go +++ b/internal/config/resource_reader.go @@ -240,7 +240,7 @@ func (r *Reader) readResource(path string) (conf manager.ResourceConfig, lints [ // TriggerResourceUpdate attempts to re-read a resource configuration file and // apply changes to the provided manager as appropriate. -func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, path string) error { +func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, path string, success_reload_count *int) error { newResConf, lints, err := r.readResource(path) if errors.Is(err, fs.ErrNotExist) { return r.TriggerResourceDelete(mgr, path) @@ -273,6 +273,11 @@ func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, pa } r.resourceFileInfo[path] = newInfo + + if success_reload_count != nil { + *success_reload_count = *success_reload_count + 1 + mgr.Logger().Info("Success Reload Count: %v, For Stream Config", *success_reload_count) + } return nil } diff --git a/internal/config/resource_reader_test.go b/internal/config/resource_reader_test.go index 9c1ea280a..f0764335c 100644 --- a/internal/config/resource_reader_test.go +++ b/internal/config/resource_reader_test.go @@ -59,7 +59,7 @@ processor_resources: // Watch for configuration changes. testMgr, err := manager.New(conf.ResourceConfig) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) tCtx, done := context.WithTimeout(context.Background(), time.Second*30) defer done() @@ -175,7 +175,7 @@ processor_resources: // Watch for configuration changes. testMgr, err := manager.New(conf.ResourceConfig) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) tCtx, done := context.WithTimeout(context.Background(), time.Second*30) defer done() diff --git a/internal/config/stream_reader.go b/internal/config/stream_reader.go index 673a0583f..a2a7ed6f6 100644 --- a/internal/config/stream_reader.go +++ b/internal/config/stream_reader.go @@ -184,7 +184,7 @@ func (r *Reader) findStreamPathWalkedDir(streamPath string) (dir string) { // TriggerStreamUpdate attempts to re-read a stream configuration file, and // trigger the provided stream update func. -func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path string) error { +func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path string, success_reload_count *int) error { if r.streamUpdateFn == nil { return nil } @@ -236,5 +236,10 @@ func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path return err } mgr.Logger().Info("Updated stream %v config from file.", info.id) + + if success_reload_count != nil { + *success_reload_count = *success_reload_count + 1 + mgr.Logger().Info("Success Reload Count: %v, For Stream Config", *success_reload_count) + } return nil } diff --git a/internal/config/watcher.go b/internal/config/watcher.go index d0be14862..26b2d390c 100644 --- a/internal/config/watcher.go +++ b/internal/config/watcher.go @@ -68,7 +68,7 @@ func (r *Reader) modifiedSinceLastRead(name string) bool { // WARNING: Either SubscribeConfigChanges or SubscribeStreamChanges must be // called before this, as otherwise it is unsafe to register them during // watching. -func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool) error { +func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool, success_reload_count *int) error { if r.watcher != nil { return errors.New("a file watcher has already been started") } @@ -102,9 +102,11 @@ func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool) error } refreshFiles := func() error { + mgr.Logger().Info("Inside the Refresh Files") if !r.streamsMode && r.mainPath != "" { if _, err := r.fs.Stat(r.mainPath); err == nil { if err := addNotWatching([]string{r.mainPath}); err != nil { + mgr.Logger().Error("addNotWatching Error") return err } } @@ -173,13 +175,14 @@ func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool) error } var succeeded bool if nameClean == r.mainPath { - succeeded = !ShouldReread(r.TriggerMainUpdate(mgr, strict, r.mainPath)) + succeeded = !ShouldReread(r.TriggerMainUpdate(mgr, strict, r.mainPath, success_reload_count)) } else if _, exists := r.streamFileInfo[nameClean]; exists { - succeeded = !ShouldReread(r.TriggerStreamUpdate(mgr, strict, nameClean)) + succeeded = !ShouldReread(r.TriggerStreamUpdate(mgr, strict, nameClean, success_reload_count)) } else { - succeeded = !ShouldReread(r.TriggerResourceUpdate(mgr, strict, nameClean)) + succeeded = !ShouldReread(r.TriggerResourceUpdate(mgr, strict, nameClean, success_reload_count)) } if succeeded { + mgr.Logger().Info("This is the collaps changes %v", collapsedChanges) delete(collapsedChanges, nameClean) } else { change.at = time.Now() diff --git a/internal/config/watcher_test.go b/internal/config/watcher_test.go index 11ae49455..cc135ba6d 100644 --- a/internal/config/watcher_test.go +++ b/internal/config/watcher_test.go @@ -48,7 +48,7 @@ output: // Watch for configuration changes testMgr, err := manager.New(manager.ResourceConfig{}) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) // Overwrite original config require.NoError(t, os.WriteFile(confFilePath, dummyConfig, 0o644)) @@ -91,16 +91,19 @@ output: changeChan := make(chan struct{}) var updatedConf stream.Config + var once sync.Once require.NoError(t, rdr.SubscribeConfigChanges(func(conf *Type) error { updatedConf = conf.Config - close(changeChan) + once.Do(func() { + close(changeChan) + }) return nil })) // Watch for configuration changes testMgr, err := manager.New(manager.ResourceConfig{}) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) // Create a new config folder and place in it a new copy of the config file newConfDir := filepath.Join(rootDir, "config_new") @@ -184,7 +187,7 @@ func TestReaderStreamDirectWatching(t *testing.T) { // Watch for configuration changes testMgr, err := manager.New(manager.ResourceConfig{}) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) require.NoError(t, os.WriteFile(confAPath, []byte(`output: { label: a2, drop: {} }`), 0o644)) require.NoError(t, os.WriteFile(confBPath, []byte(`output: { label: b2, drop: {} }`), 0o644)) @@ -268,7 +271,7 @@ func TestReaderStreamWildcardWatching(t *testing.T) { // Watch for configuration changes testMgr, err := manager.New(manager.ResourceConfig{}) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) require.NoError(t, os.WriteFile(confAPath, []byte(`output: { label: a2, drop: {} }`), 0o644)) require.NoError(t, os.WriteFile(confBPath, []byte(`output: { label: b2, drop: {} }`), 0o644)) @@ -352,7 +355,7 @@ func TestReaderStreamDirWatching(t *testing.T) { // Watch for configuration changes testMgr, err := manager.New(manager.ResourceConfig{}) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) require.NoError(t, os.WriteFile(confAPath, []byte(`output: { label: a2, drop: {} }`), 0o644)) require.NoError(t, os.WriteFile(confBPath, []byte(`output: { label: b2, drop: {} }`), 0o644)) @@ -443,7 +446,7 @@ func TestReaderWatcherRace(t *testing.T) { // Watch for configuration changes testMgr, err := manager.New(manager.ResourceConfig{}) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) for i := 0; i < 2; i++ { // Wait for the config watcher to reload each config @@ -523,7 +526,7 @@ processor_resources: // Watch for configuration changes testMgr, err := manager.New(manager.ResourceConfig{}) require.NoError(t, err) - require.NoError(t, rdr.BeginFileWatching(testMgr, true)) + require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil)) require.NoError(t, os.WriteFile(confAPath, procConfig("a", "a2"), 0o644)) require.NoError(t, os.WriteFile(confBPath, procConfig("b", "b2"), 0o644)) diff --git a/internal/impl/io/input_http_server_test.go b/internal/impl/io/input_http_server_test.go index bc98acd11..bd11d7066 100644 --- a/internal/impl/io/input_http_server_test.go +++ b/internal/impl/io/input_http_server_test.go @@ -222,7 +222,7 @@ func TestHTTPServerLifecycle(t *testing.T) { testURL := fmt.Sprintf("http://localhost:%v/foo/bar", freePort) - apiImpl, err := api.New("", "", apiConf, nil, log.Noop(), metrics.Noop()) + apiImpl, err := api.New("", "", apiConf, nil, log.Noop(), metrics.Noop(), nil) require.NoError(t, err) go func() { diff --git a/public/service/stream_builder.go b/public/service/stream_builder.go index b272eb41e..42bba91e8 100644 --- a/public/service/stream_builder.go +++ b/public/service/stream_builder.go @@ -880,7 +880,7 @@ func (s *StreamBuilder) buildWithEnv(env *bundle.Environment) (*Stream, error) { sanitConf.DocsProvider = env _ = s.configSpec.SanitiseYAML(&sanitNode, sanitConf) } - if apiType, err = api.New("", "", s.http, sanitNode, logger, stats); err != nil { + if apiType, err = api.New("", "", s.http, sanitNode, logger, stats, nil); err != nil { return nil, fmt.Errorf("unable to create stream HTTP server due to: %w. Tip: you can disable the server with `http.enabled` set to `false`, or override the configured server with SetHTTPMux", err) } apiMut = apiType