Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Tirth7545 committed Sep 19, 2024
1 parent 50a5cd2 commit eebed22
Show file tree
Hide file tree
Showing 14 changed files with 72 additions and 37 deletions.
6 changes: 6 additions & 0 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func New(
wholeConf any,
log log.Modular,
stats metrics.Type,
count *int,
opts ...OptFunc,
) (*Type, error) {
gMux := mux.NewRouter()
Expand Down Expand Up @@ -146,6 +147,10 @@ func New(
}
}

handleConfigAcknowledgement := func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "{\"success_reload_count\":\"%v\"}", *count)
}

if t.conf.DebugEndpoints {
t.RegisterEndpoint(
"/debug/config/json", "DEBUG: Returns the loaded config as JSON.",
Expand Down Expand Up @@ -200,6 +205,7 @@ func New(

t.RegisterEndpoint("/ping", "Ping me.", handlePing)
t.RegisterEndpoint("/version", "Returns the service version.", handleVersion)
t.RegisterEndpoint("/config/ack", "Returns the count of success watcher", handleConfigAcknowledgement)
t.RegisterEndpoint("/endpoints", "Returns this map of endpoints.", handleEndpoints)

// If we want to expose a stats endpoint we register the endpoints.
Expand Down
8 changes: 4 additions & 4 deletions internal/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestAPIEnableCORS(t *testing.T) {
conf.CORS.Enabled = true
conf.CORS.AllowedOrigins = []string{"*"}

s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop())
s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil)
require.NoError(t, err)

handler := s.Handler()
Expand All @@ -41,7 +41,7 @@ func TestAPIEnableCORSOrigins(t *testing.T) {
conf.CORS.Enabled = true
conf.CORS.AllowedOrigins = []string{"foo", "bar"}

s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop())
s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil)
require.NoError(t, err)

handler := s.Handler()
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestAPIEnableCORSNoHeaders(t *testing.T) {
conf := api.NewConfig()
conf.CORS.Enabled = true

_, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop())
_, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil)
require.Error(t, err)
assert.Contains(t, err.Error(), "must specify at least one allowed origin")
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestAPIBasicAuth(t *testing.T) {
conf.BasicAuth.PasswordHash = tc.correctPass
conf.BasicAuth.Salt = "EzrwNJYw2wkErVVV1P36FQ=="

s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop())
s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil)
if ok := tc.expectedErr(t, err); !(ok && err == nil) {
return
}
Expand Down
3 changes: 2 additions & 1 deletion internal/cli/common/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func CreateManager(
logger log.Modular,
streamsMode bool,
conf config.Type,
count *int,
mgrOpts ...manager.OptFunc,
) (stoppableMgr *StoppableManager, err error) {
var stats *metrics.Namespaced
Expand Down Expand Up @@ -88,7 +89,7 @@ func CreateManager(
}

var httpServer *api.Type
if httpServer, err = api.New(cliOpts.Version, cliOpts.DateBuilt, conf.HTTP, sanitNode, logger, stats); err != nil {
if httpServer, err = api.New(cliOpts.Version, cliOpts.DateBuilt, conf.HTTP, sanitNode, logger, stats, count); err != nil {
err = fmt.Errorf("failed to initialise API: %w", err)
return
}
Expand Down
17 changes: 11 additions & 6 deletions internal/cli/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error {
}

verLogger := logger.With("benthos_version", cliOpts.Version)

if mainPath == "" {
verLogger.Info("Running without a main config file")
} else if inferredMainPath {
Expand All @@ -73,8 +74,9 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error {
if strict && len(lints) > 0 {
return errors.New(cliOpts.ExecTemplate("shutting down due to linter errors, to prevent shutdown run {{.ProductName}} with --chilled"))
}

stoppableManager, err := CreateManager(c, cliOpts, logger, streamsMode, conf)
//Success Watcher Count Is Used to for to get count of the config which was updated with the watcher flag.
success_reload_count := 0

Check failure on line 78 in internal/cli/common/service.go

View workflow job for this annotation

GitHub Actions / golangci-lint

ST1003: should not use underscores in Go names; var success_reload_count should be successReloadCount (stylecheck)
stoppableManager, err := CreateManager(c, cliOpts, logger, streamsMode, conf, &success_reload_count)
if err != nil {
return err
}
Expand All @@ -90,9 +92,10 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error {
watching := cliOpts.RootFlags.GetWatcher(c)
if streamsMode {
enableStreamsAPI := !c.Bool("no-api")
stoppableStream, err = initStreamsMode(cliOpts, strict, watching, enableStreamsAPI, confReader, stoppableManager.Manager())
stoppableStream, err = initStreamsMode(cliOpts, strict, watching, enableStreamsAPI, confReader, stoppableManager.Manager(), &success_reload_count)
} else {
stoppableStream, dataStreamClosedChan, err = initNormalMode(cliOpts, conf, strict, watching, confReader, stoppableManager.Manager())
logger.Info("InitMode Get Initiated... strict:%v", strict)
stoppableStream, dataStreamClosedChan, err = initNormalMode(cliOpts, conf, strict, watching, confReader, stoppableManager.Manager(), &success_reload_count)
}
if err != nil {
return err
Expand Down Expand Up @@ -133,6 +136,7 @@ func initStreamsMode(
strict, watching, enableAPI bool,
confReader *config.Reader,
mgr *manager.Type,
success_reload_count *int,

Check failure on line 139 in internal/cli/common/service.go

View workflow job for this annotation

GitHub Actions / golangci-lint

ST1003: should not use underscores in Go names; func parameter success_reload_count should be successReloadCount (stylecheck)
) (RunningStream, error) {
logger := mgr.Logger()
streamMgr := strmmgr.New(mgr, strmmgr.OptAPIEnabled(enableAPI))
Expand Down Expand Up @@ -181,7 +185,7 @@ func initStreamsMode(
}

if watching {
if err := confReader.BeginFileWatching(mgr, strict); err != nil {
if err := confReader.BeginFileWatching(mgr, strict, success_reload_count); err != nil {
return nil, fmt.Errorf("failed to create stream config watcher: %w", err)
}
}
Expand All @@ -194,6 +198,7 @@ func initNormalMode(
strict, watching bool,
confReader *config.Reader,
mgr *manager.Type,
success_reload_count *int,

Check failure on line 201 in internal/cli/common/service.go

View workflow job for this annotation

GitHub Actions / golangci-lint

ST1003: should not use underscores in Go names; func parameter success_reload_count should be successReloadCount (stylecheck)
) (newStream RunningStream, stoppedChan chan struct{}, err error) {
logger := mgr.Logger()

Expand Down Expand Up @@ -231,7 +236,7 @@ func initNormalMode(
}

if watching {
if err := confReader.BeginFileWatching(mgr, strict); err != nil {
if err := confReader.BeginFileWatching(mgr, strict, success_reload_count); err != nil {
return nil, nil, fmt.Errorf("failed to create config file watcher: %w", err)
}
}
Expand Down
6 changes: 3 additions & 3 deletions internal/cli/studio/pull_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (r *PullRunner) bootstrapConfigReader(ctx context.Context) (bootstrapErr er
tmpTracingSummary.SetEnabled(false)

stopMgrTmp, err := common.CreateManager(
r.cliContext, r.cliOpts, r.logger, false, conf,
r.cliContext, r.cliOpts, r.logger, false, conf, nil,
manager.OptSetEnvironment(tmpEnv),
manager.OptSetBloblangEnvironment(bloblEnv),
manager.OptSetFS(sessFS))
Expand Down Expand Up @@ -413,13 +413,13 @@ func (r *PullRunner) Sync(ctx context.Context) {
}
}
for _, res := range diff.AddResources {
if err := r.confReader.TriggerResourceUpdate(r.mgr, r.strictMode, res.Name); err != nil {
if err := r.confReader.TriggerResourceUpdate(r.mgr, r.strictMode, res.Name, nil); err != nil {
r.logger.Error("Failed to reflect resource file '%v' update: %v", res.Name, err)
runErr = err
}
}
if diff.MainConfig != nil {
if err := r.confReader.TriggerMainUpdate(r.mgr, r.strictMode, diff.MainConfig.Name); err != nil {
if err := r.confReader.TriggerMainUpdate(r.mgr, r.strictMode, diff.MainConfig.Name, nil); err != nil {
r.logger.Error("Failed to reflect main config file '%v' update: %v", diff.MainConfig.Name, err)
runErr = err
}
Expand Down
9 changes: 8 additions & 1 deletion internal/config/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (r *Reader) readMain(mainPath string) (conf Type, pConf *docs.ParsedConfig,
// TriggerMainUpdate attempts to re-read the main configuration file, trigger
// the provided main update func, and apply changes to resources to the provided
// manager as appropriate.
func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPath string) error {
func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPath string, success_reload_count *int) error {

Check failure on line 360 in internal/config/reader.go

View workflow job for this annotation

GitHub Actions / golangci-lint

ST1003: should not use underscores in Go names; method parameter success_reload_count should be successReloadCount (stylecheck)
conf, _, lints, err := r.readMain(newPath)
if errors.Is(err, fs.ErrNotExist) {
if r.mainPath != newPath {
Expand Down Expand Up @@ -416,6 +416,13 @@ func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPat
mgr.Logger().Error("Failed to apply updated config: %v", err)
return err
}

// Success Watcher Count denotes if the configuration in the benthos gets updated with the watcher
// flag then success watcher count gets increased
if success_reload_count != nil {
*success_reload_count = *success_reload_count + 1
mgr.Logger().Info("Success Reload Count: %v, For Normal Config", *success_reload_count)
}
mgr.Logger().Info("Updated main config")
}
return nil
Expand Down
8 changes: 4 additions & 4 deletions internal/config/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ processor_resources:
assert.False(t, testMgr.ProbeProcessor("c"))
assert.False(t, testMgr.ProbeProcessor("d"))

require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "bar_main.yaml"))
require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "bar_main.yaml", nil))

// Wait for the config watcher to reload the config
select {
Expand Down Expand Up @@ -226,10 +226,10 @@ processor_resources:
return nil
}))

require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "a.yaml"))
require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "b.yaml"))
require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "a.yaml", nil))
require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "b.yaml", nil))

require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "foo_main.yaml"))
require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "foo_main.yaml", nil))

assert.Equal(t, "fooin", conf.Input.Label)
assert.Equal(t, "fooout", conf.Output.Label)
Expand Down
7 changes: 6 additions & 1 deletion internal/config/resource_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (r *Reader) readResource(path string) (conf manager.ResourceConfig, lints [

// TriggerResourceUpdate attempts to re-read a resource configuration file and
// apply changes to the provided manager as appropriate.
func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, path string) error {
func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, path string, success_reload_count *int) error {

Check failure on line 243 in internal/config/resource_reader.go

View workflow job for this annotation

GitHub Actions / golangci-lint

ST1003: should not use underscores in Go names; method parameter success_reload_count should be successReloadCount (stylecheck)
newResConf, lints, err := r.readResource(path)
if errors.Is(err, fs.ErrNotExist) {
return r.TriggerResourceDelete(mgr, path)
Expand Down Expand Up @@ -273,6 +273,11 @@ func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, pa
}

r.resourceFileInfo[path] = newInfo

if success_reload_count != nil {
*success_reload_count = *success_reload_count + 1
mgr.Logger().Info("Success Reload Count: %v, For Stream Config", *success_reload_count)
}
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions internal/config/resource_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ processor_resources:
// Watch for configuration changes.
testMgr, err := manager.New(conf.ResourceConfig)
require.NoError(t, err)
require.NoError(t, rdr.BeginFileWatching(testMgr, true))
require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil))

tCtx, done := context.WithTimeout(context.Background(), time.Second*30)
defer done()
Expand Down Expand Up @@ -175,7 +175,7 @@ processor_resources:
// Watch for configuration changes.
testMgr, err := manager.New(conf.ResourceConfig)
require.NoError(t, err)
require.NoError(t, rdr.BeginFileWatching(testMgr, true))
require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil))

tCtx, done := context.WithTimeout(context.Background(), time.Second*30)
defer done()
Expand Down
7 changes: 6 additions & 1 deletion internal/config/stream_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (r *Reader) findStreamPathWalkedDir(streamPath string) (dir string) {

// TriggerStreamUpdate attempts to re-read a stream configuration file, and
// trigger the provided stream update func.
func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path string) error {
func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path string, success_reload_count *int) error {

Check failure on line 187 in internal/config/stream_reader.go

View workflow job for this annotation

GitHub Actions / golangci-lint

ST1003: should not use underscores in Go names; method parameter success_reload_count should be successReloadCount (stylecheck)
if r.streamUpdateFn == nil {
return nil
}
Expand Down Expand Up @@ -236,5 +236,10 @@ func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path
return err
}
mgr.Logger().Info("Updated stream %v config from file.", info.id)

if success_reload_count != nil {
*success_reload_count = *success_reload_count + 1
mgr.Logger().Info("Success Reload Count: %v, For Stream Config", *success_reload_count)
}
return nil
}
11 changes: 7 additions & 4 deletions internal/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (r *Reader) modifiedSinceLastRead(name string) bool {
// WARNING: Either SubscribeConfigChanges or SubscribeStreamChanges must be
// called before this, as otherwise it is unsafe to register them during
// watching.
func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool) error {
func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool, success_reload_count *int) error {

Check failure on line 71 in internal/config/watcher.go

View workflow job for this annotation

GitHub Actions / golangci-lint

ST1003: should not use underscores in Go names; method parameter success_reload_count should be successReloadCount (stylecheck)
if r.watcher != nil {
return errors.New("a file watcher has already been started")
}
Expand Down Expand Up @@ -102,9 +102,11 @@ func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool) error
}

refreshFiles := func() error {
mgr.Logger().Info("Inside the Refresh Files")
if !r.streamsMode && r.mainPath != "" {
if _, err := r.fs.Stat(r.mainPath); err == nil {
if err := addNotWatching([]string{r.mainPath}); err != nil {
mgr.Logger().Error("addNotWatching Error")
return err
}
}
Expand Down Expand Up @@ -173,13 +175,14 @@ func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool) error
}
var succeeded bool
if nameClean == r.mainPath {
succeeded = !ShouldReread(r.TriggerMainUpdate(mgr, strict, r.mainPath))
succeeded = !ShouldReread(r.TriggerMainUpdate(mgr, strict, r.mainPath, success_reload_count))
} else if _, exists := r.streamFileInfo[nameClean]; exists {
succeeded = !ShouldReread(r.TriggerStreamUpdate(mgr, strict, nameClean))
succeeded = !ShouldReread(r.TriggerStreamUpdate(mgr, strict, nameClean, success_reload_count))
} else {
succeeded = !ShouldReread(r.TriggerResourceUpdate(mgr, strict, nameClean))
succeeded = !ShouldReread(r.TriggerResourceUpdate(mgr, strict, nameClean, success_reload_count))
}
if succeeded {
mgr.Logger().Info("This is the collaps changes %v", collapsedChanges)
delete(collapsedChanges, nameClean)
} else {
change.at = time.Now()
Expand Down
19 changes: 11 additions & 8 deletions internal/config/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ output:
// Watch for configuration changes
testMgr, err := manager.New(manager.ResourceConfig{})
require.NoError(t, err)
require.NoError(t, rdr.BeginFileWatching(testMgr, true))
require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil))

// Overwrite original config
require.NoError(t, os.WriteFile(confFilePath, dummyConfig, 0o644))
Expand Down Expand Up @@ -91,16 +91,19 @@ output:

changeChan := make(chan struct{})
var updatedConf stream.Config
var once sync.Once
require.NoError(t, rdr.SubscribeConfigChanges(func(conf *Type) error {
updatedConf = conf.Config
close(changeChan)
once.Do(func() {
close(changeChan)
})
return nil
}))

// Watch for configuration changes
testMgr, err := manager.New(manager.ResourceConfig{})
require.NoError(t, err)
require.NoError(t, rdr.BeginFileWatching(testMgr, true))
require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil))

// Create a new config folder and place in it a new copy of the config file
newConfDir := filepath.Join(rootDir, "config_new")
Expand Down Expand Up @@ -184,7 +187,7 @@ func TestReaderStreamDirectWatching(t *testing.T) {
// Watch for configuration changes
testMgr, err := manager.New(manager.ResourceConfig{})
require.NoError(t, err)
require.NoError(t, rdr.BeginFileWatching(testMgr, true))
require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil))

require.NoError(t, os.WriteFile(confAPath, []byte(`output: { label: a2, drop: {} }`), 0o644))
require.NoError(t, os.WriteFile(confBPath, []byte(`output: { label: b2, drop: {} }`), 0o644))
Expand Down Expand Up @@ -268,7 +271,7 @@ func TestReaderStreamWildcardWatching(t *testing.T) {
// Watch for configuration changes
testMgr, err := manager.New(manager.ResourceConfig{})
require.NoError(t, err)
require.NoError(t, rdr.BeginFileWatching(testMgr, true))
require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil))

require.NoError(t, os.WriteFile(confAPath, []byte(`output: { label: a2, drop: {} }`), 0o644))
require.NoError(t, os.WriteFile(confBPath, []byte(`output: { label: b2, drop: {} }`), 0o644))
Expand Down Expand Up @@ -352,7 +355,7 @@ func TestReaderStreamDirWatching(t *testing.T) {
// Watch for configuration changes
testMgr, err := manager.New(manager.ResourceConfig{})
require.NoError(t, err)
require.NoError(t, rdr.BeginFileWatching(testMgr, true))
require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil))

require.NoError(t, os.WriteFile(confAPath, []byte(`output: { label: a2, drop: {} }`), 0o644))
require.NoError(t, os.WriteFile(confBPath, []byte(`output: { label: b2, drop: {} }`), 0o644))
Expand Down Expand Up @@ -443,7 +446,7 @@ func TestReaderWatcherRace(t *testing.T) {
// Watch for configuration changes
testMgr, err := manager.New(manager.ResourceConfig{})
require.NoError(t, err)
require.NoError(t, rdr.BeginFileWatching(testMgr, true))
require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil))

for i := 0; i < 2; i++ {
// Wait for the config watcher to reload each config
Expand Down Expand Up @@ -523,7 +526,7 @@ processor_resources:
// Watch for configuration changes
testMgr, err := manager.New(manager.ResourceConfig{})
require.NoError(t, err)
require.NoError(t, rdr.BeginFileWatching(testMgr, true))
require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil))

require.NoError(t, os.WriteFile(confAPath, procConfig("a", "a2"), 0o644))
require.NoError(t, os.WriteFile(confBPath, procConfig("b", "b2"), 0o644))
Expand Down
Loading

0 comments on commit eebed22

Please sign in to comment.