diff --git a/server/legacy/lyft/gateway/events_controller.go b/server/legacy/lyft/gateway/events_controller.go index 0c5476224..335060a52 100644 --- a/server/legacy/lyft/gateway/events_controller.go +++ b/server/legacy/lyft/gateway/events_controller.go @@ -38,6 +38,7 @@ func NewVCSEventsController( scope tally.Scope, webhookSecret []byte, allowDraftPRs bool, + snsWriter gateway_handlers.Writer, commentParser events.CommentParsing, repoAllowlistChecker *events.RepoAllowlistChecker, vcsClient vcs.Client, @@ -60,10 +61,19 @@ func NewVCSEventsController( clientCreator githubapp.ClientCreator, defaultTFVersion string, ) *VCSEventsController { + pullEventSNSProxy := gateway_handlers.NewSNSWorkerProxy( + snsWriter, logger, + ) + legacyHandler := &gateway_handlers.LegacyPullHandler{ + Logger: logger, + WorkerProxy: pullEventSNSProxy, + VCSStatusUpdater: vcsStatusUpdater, + } prSignaler := &pr.WorkflowSignaler{TemporalClient: temporalClient, DefaultTFVersion: defaultTFVersion} prRequirementChecker := requirement.NewPRAggregate(globalCfg) - modifiedPullHandler := gateway_handlers.NewModifiedPullHandler(logger, asyncScheduler, rootConfigBuilder, globalCfg, prRequirementChecker, prSignaler) + modifiedPullHandler := gateway_handlers.NewModifiedPullHandler(logger, asyncScheduler, rootConfigBuilder, globalCfg, prRequirementChecker, prSignaler, legacyHandler) closedPullHandler := &gateway_handlers.ClosedPullRequestHandler{ + WorkerProxy: pullEventSNSProxy, Logger: logger, PRCloseSignaler: prSignaler, Scope: scope.SubScope("pull.closed"), @@ -108,6 +118,7 @@ func NewVCSEventsController( vcsClient, gateway_handlers.NewCommentEventWorkerProxy( logger, + snsWriter, asyncScheduler, prSignaler, deploySignaler, @@ -143,6 +154,7 @@ func NewVCSEventsController( pullRequestReviewHandler := &gateway_handlers.PullRequestReviewWorkerProxy{ Scheduler: asyncScheduler, + SnsWriter: snsWriter, Logger: logger, CheckRunFetcher: checkRunFetcher, WorkflowSignaler: prSignaler, diff --git a/server/neptune/gateway/event/closed_pull_request_handler.go b/server/neptune/gateway/event/closed_pull_request_handler.go index 0cbef91e3..bff466937 100644 --- a/server/neptune/gateway/event/closed_pull_request_handler.go +++ b/server/neptune/gateway/event/closed_pull_request_handler.go @@ -15,12 +15,17 @@ type prCloseSignaler interface { } type ClosedPullRequestHandler struct { + WorkerProxy workerProxy Logger logging.Logger PRCloseSignaler prCloseSignaler Scope tally.Scope } func (c *ClosedPullRequestHandler) Handle(ctx context.Context, request *http.BufferedRequest, event PullRequest) error { + if err := c.WorkerProxy.Handle(ctx, request, event); err != nil { + c.Logger.ErrorContext(ctx, err.Error()) + } + if err := c.handlePlatformMode(ctx, event); err != nil { return errors.Wrap(err, "handling platform mode") } diff --git a/server/neptune/gateway/event/closed_pull_request_handler_test.go b/server/neptune/gateway/event/closed_pull_request_handler_test.go index 6617f743f..fbf9dfc17 100644 --- a/server/neptune/gateway/event/closed_pull_request_handler_test.go +++ b/server/neptune/gateway/event/closed_pull_request_handler_test.go @@ -15,6 +15,7 @@ import ( ) func TestClosedPullHandler_Handle(t *testing.T) { + workerProxy := &mockWorkerProxy{} signaler := &testCloseSignaler{ t: t, expectedRepoName: "repo", @@ -22,6 +23,7 @@ func TestClosedPullHandler_Handle(t *testing.T) { } pullHandler := event.ClosedPullRequestHandler{ Logger: logging.NewNoopCtxLogger(t), + WorkerProxy: workerProxy, PRCloseSignaler: signaler, } pr := event.PullRequest{ @@ -35,10 +37,12 @@ func TestClosedPullHandler_Handle(t *testing.T) { } err := pullHandler.Handle(context.Background(), &http.BufferedRequest{}, pr) assert.True(t, signaler.called) + assert.True(t, workerProxy.called) assert.NoError(t, err) } func TestClosedPullHandler_Handle_SignalError(t *testing.T) { + workerProxy := &mockWorkerProxy{} signaler := &testCloseSignaler{ t: t, err: assert.AnError, @@ -47,6 +51,7 @@ func TestClosedPullHandler_Handle_SignalError(t *testing.T) { } pullHandler := event.ClosedPullRequestHandler{ Logger: logging.NewNoopCtxLogger(t), + WorkerProxy: workerProxy, PRCloseSignaler: signaler, } pr := event.PullRequest{ @@ -60,10 +65,12 @@ func TestClosedPullHandler_Handle_SignalError(t *testing.T) { } err := pullHandler.Handle(context.Background(), &http.BufferedRequest{}, pr) assert.True(t, signaler.called) + assert.True(t, workerProxy.called) assert.Error(t, err) } func TestClosedPullHandler_Handle_SignalNotFoundError(t *testing.T) { + workerProxy := &mockWorkerProxy{} signaler := &testCloseSignaler{ t: t, expectedRepoName: "repo", @@ -72,6 +79,7 @@ func TestClosedPullHandler_Handle_SignalNotFoundError(t *testing.T) { } pullHandler := event.ClosedPullRequestHandler{ Logger: logging.NewNoopCtxLogger(t), + WorkerProxy: workerProxy, PRCloseSignaler: signaler, Scope: tally.NewTestScope("", map[string]string{}), } @@ -86,6 +94,7 @@ func TestClosedPullHandler_Handle_SignalNotFoundError(t *testing.T) { } err := pullHandler.Handle(context.Background(), &http.BufferedRequest{}, pr) assert.True(t, signaler.called) + assert.True(t, workerProxy.called) assert.NoError(t, err) } diff --git a/server/neptune/gateway/event/comment_handler.go b/server/neptune/gateway/event/comment_handler.go index ea8a5f25f..5b224a4d5 100644 --- a/server/neptune/gateway/event/comment_handler.go +++ b/server/neptune/gateway/event/comment_handler.go @@ -67,10 +67,15 @@ func (c Comment) GetRepo() models.Repo { return c.BaseRepo } -func NewCommentEventWorkerProxy(logger logging.Logger, scheduler scheduler, prSignaler prSignaler, deploySignaler deploySignaler, commentCreator commentCreator, vcsStatusUpdater statusUpdater, globalCfg valid.GlobalCfg, rootConfigBuilder rootConfigBuilder, legacyErrorHandler errorHandler, neptuneErrorHandler errorHandler, requirementChecker requirementChecker) *CommentEventWorkerProxy { +func NewCommentEventWorkerProxy(logger logging.Logger, snsWriter Writer, scheduler scheduler, prSignaler prSignaler, deploySignaler deploySignaler, commentCreator commentCreator, vcsStatusUpdater statusUpdater, globalCfg valid.GlobalCfg, rootConfigBuilder rootConfigBuilder, legacyErrorHandler errorHandler, neptuneErrorHandler errorHandler, requirementChecker requirementChecker) *CommentEventWorkerProxy { return &CommentEventWorkerProxy{ logger: logger, scheduler: scheduler, + legacyHandler: &LegacyCommentHandler{ + logger: logger, + snsWriter: snsWriter, + globalCfg: globalCfg, + }, neptuneWorkerProxy: &NeptuneWorkerProxy{ logger: logger, deploySignaler: deploySignaler, @@ -172,6 +177,7 @@ type CommentEventWorkerProxy struct { scheduler scheduler vcsStatusUpdater statusUpdater rootConfigBuilder rootConfigBuilder + legacyHandler *LegacyCommentHandler neptuneWorkerProxy *NeptuneWorkerProxy neptuneErrorHandler errorHandler legacyErrorHandler errorHandler @@ -203,6 +209,9 @@ func (p *CommentEventWorkerProxy) handle(ctx context.Context, request *http.Buff } fxns := []sync.Executor{ + p.legacyErrorHandler.WrapWithHandling(ctx, event, cmd.CommandName().String(), func(ctx context.Context) error { + return p.legacyHandler.Handle(ctx, event, cmd, roots, request) + }), p.neptuneErrorHandler.WrapWithHandling(ctx, event, cmd.CommandName().String(), func(ctx context.Context) error { return p.neptuneWorkerProxy.Handle(ctx, event, cmd, roots, request) }), diff --git a/server/neptune/gateway/event/comment_handler_test.go b/server/neptune/gateway/event/comment_handler_test.go index 5c83873d8..c4a034a2b 100644 --- a/server/neptune/gateway/event/comment_handler_test.go +++ b/server/neptune/gateway/event/comment_handler_test.go @@ -160,6 +160,7 @@ func TestCommentEventWorkerProxy_HandleForceApply(t *testing.T) { }, }, } + writer := &mockSnsWriter{} scheduler := &sync.SynchronousScheduler{Logger: logger} commentCreator := &mockCommentCreator{ expectedT: t, @@ -172,7 +173,7 @@ func TestCommentEventWorkerProxy_HandleForceApply(t *testing.T) { prSignaler := &mockPRSignaler{ expectedT: t, } - commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{}) + commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, writer, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{}) bufReq := buildRequest(t) cmd := &command.Comment{ Name: command.Apply, @@ -182,6 +183,7 @@ func TestCommentEventWorkerProxy_HandleForceApply(t *testing.T) { assert.NoError(t, err) assert.True(t, commentCreator.isCalled) assert.True(t, testSignaler.called()) + assert.False(t, writer.isCalled) assert.False(t, statusUpdater.isCalled) } @@ -220,11 +222,12 @@ func TestCommentEventWorkerProxy_HandleApplyComment_RequirementsFailed(t *testin expectedT: t, } + writer := &mockSnsWriter{} scheduler := &sync.SynchronousScheduler{Logger: logger} commentCreator := &mockCommentCreator{} statusUpdater := &mockStatusUpdater{} cfg := valid.NewGlobalCfg("somedir") - commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{ + commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, writer, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{ err: assert.AnError, }) bufReq := buildRequest(t) @@ -236,6 +239,7 @@ func TestCommentEventWorkerProxy_HandleApplyComment_RequirementsFailed(t *testin assert.False(t, statusUpdater.isCalled) assert.False(t, commentCreator.isCalled) assert.False(t, testSignaler.called) + assert.False(t, writer.isCalled) } func TestCommentEventWorkerProxy_HandleApplyComment(t *testing.T) { @@ -293,6 +297,8 @@ func TestCommentEventWorkerProxy_HandleApplyComment(t *testing.T) { }, }, } + + writer := &mockSnsWriter{} scheduler := &sync.SynchronousScheduler{Logger: logger} commentCreator := &mockCommentCreator{} statusUpdater := &mockStatusUpdater{} @@ -300,7 +306,7 @@ func TestCommentEventWorkerProxy_HandleApplyComment(t *testing.T) { prSignaler := &mockPRSignaler{ expectedT: t, } - commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{}) + commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, writer, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{}) bufReq := buildRequest(t) cmd := &command.Comment{ Name: command.Apply, @@ -310,6 +316,7 @@ func TestCommentEventWorkerProxy_HandleApplyComment(t *testing.T) { assert.False(t, statusUpdater.isCalled) assert.False(t, commentCreator.isCalled) assert.True(t, testSignaler.called()) + assert.False(t, writer.isCalled) } func TestCommentEventWorkerProxy_HandlePlanComment_NoCmds(t *testing.T) { @@ -335,6 +342,7 @@ func TestCommentEventWorkerProxy_HandlePlanComment_NoCmds(t *testing.T) { }, InstallationToken: 123, } + writer := &mockSnsWriter{} scheduler := &sync.SynchronousScheduler{Logger: logger} commentCreator := &mockCommentCreator{} statusUpdater := &multiMockStatusUpdater{ @@ -369,7 +377,7 @@ func TestCommentEventWorkerProxy_HandlePlanComment_NoCmds(t *testing.T) { prSignaler := &mockPRSignaler{ expectedT: t, } - commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{}) + commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, writer, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{}) bufReq := buildRequest(t) cmd := &command.Comment{ Name: command.Plan, @@ -379,6 +387,7 @@ func TestCommentEventWorkerProxy_HandlePlanComment_NoCmds(t *testing.T) { assert.False(t, statusUpdater.AllCalled()) assert.False(t, commentCreator.isCalled) assert.False(t, testSignaler.called) + assert.False(t, writer.isCalled) } func TestCommentEventWorkerProxy_HandleApplyComment_NoCmds(t *testing.T) { @@ -404,6 +413,7 @@ func TestCommentEventWorkerProxy_HandleApplyComment_NoCmds(t *testing.T) { }, InstallationToken: 123, } + writer := &mockSnsWriter{} scheduler := &sync.SynchronousScheduler{Logger: logger} commentCreator := &mockCommentCreator{} statusUpdater := &multiMockStatusUpdater{ @@ -422,7 +432,7 @@ func TestCommentEventWorkerProxy_HandleApplyComment_NoCmds(t *testing.T) { prSignaler := &mockPRSignaler{ expectedT: t, } - commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{}) + commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, writer, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{}) bufReq := buildRequest(t) cmd := &command.Comment{ Name: command.Apply, @@ -432,6 +442,7 @@ func TestCommentEventWorkerProxy_HandleApplyComment_NoCmds(t *testing.T) { assert.False(t, statusUpdater.AllCalled()) assert.False(t, commentCreator.isCalled) assert.False(t, testSignaler.called) + assert.False(t, writer.isCalled) } func TestCommentEventWorkerProxy_HandlePlanComment(t *testing.T) { @@ -479,6 +490,7 @@ func TestCommentEventWorkerProxy_HandlePlanComment(t *testing.T) { }, InstallationToken: 123, } + writer := &mockSnsWriter{} scheduler := &sync.SynchronousScheduler{Logger: logger} commentCreator := &mockCommentCreator{} statusUpdater := &mockStatusUpdater{} @@ -488,7 +500,7 @@ func TestCommentEventWorkerProxy_HandlePlanComment(t *testing.T) { expectedRoots: roots, expectedPRRequest: prRequest, } - commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, scheduler, prSignaler, deploySignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{}) + commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, writer, scheduler, prSignaler, deploySignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{}) bufReq := buildRequest(t) cmd := &command.Comment{ Name: command.Plan, @@ -499,6 +511,7 @@ func TestCommentEventWorkerProxy_HandlePlanComment(t *testing.T) { assert.False(t, commentCreator.isCalled) assert.False(t, deploySignaler.called) assert.True(t, prSignaler.called) + assert.True(t, writer.isCalled) } func TestCommentEventWorkerProxy_HandlePlanCommentAllocatorEnabled(t *testing.T) { @@ -546,6 +559,7 @@ func TestCommentEventWorkerProxy_HandlePlanCommentAllocatorEnabled(t *testing.T) }, InstallationToken: 123, } + writer := &mockSnsWriter{} scheduler := &sync.SynchronousScheduler{Logger: logger} commentCreator := &mockCommentCreator{} statusUpdater := &mockStatusUpdater{} @@ -555,7 +569,7 @@ func TestCommentEventWorkerProxy_HandlePlanCommentAllocatorEnabled(t *testing.T) expectedRoots: roots, expectedPRRequest: prRequest, } - commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{}) + commentEventWorkerProxy := event.NewCommentEventWorkerProxy(logger, writer, scheduler, prSignaler, testSignaler, commentCreator, statusUpdater, cfg, rootConfigBuilder, noopErrorHandler{}, noopErrorHandler{}, &requirementsChecker{}) bufReq := buildRequest(t) cmd := &command.Comment{ Name: command.Plan, @@ -565,6 +579,7 @@ func TestCommentEventWorkerProxy_HandlePlanCommentAllocatorEnabled(t *testing.T) assert.False(t, statusUpdater.isCalled) assert.False(t, commentCreator.isCalled) assert.False(t, testSignaler.called) + assert.True(t, writer.isCalled) assert.True(t, prSignaler.called) } diff --git a/server/neptune/gateway/event/legacy_comment_handler.go b/server/neptune/gateway/event/legacy_comment_handler.go new file mode 100644 index 000000000..eef09dad7 --- /dev/null +++ b/server/neptune/gateway/event/legacy_comment_handler.go @@ -0,0 +1,43 @@ +package event + +import ( + "bytes" + "context" + "github.com/pkg/errors" + "github.com/runatlantis/atlantis/server/config/valid" + "github.com/runatlantis/atlantis/server/legacy/events/command" + "github.com/runatlantis/atlantis/server/legacy/http" + "github.com/runatlantis/atlantis/server/logging" +) + +type LegacyCommentHandler struct { + logger logging.Logger + snsWriter Writer + globalCfg valid.GlobalCfg +} + +func (p *LegacyCommentHandler) Handle(ctx context.Context, event Comment, cmd *command.Comment, roots []*valid.MergedProjectCfg, request *http.BufferedRequest) error { + // legacy mode should not be handling any type of apply command anymore + if cmd.Name == command.Apply { + return nil + } + // forward everything to sns for now since platform mode doesn't do anything w.r.t to comments atm. + if err := p.ForwardToSns(ctx, request); err != nil { + return errors.Wrap(err, "forwarding request through sns") + } + return nil +} + +func (p *LegacyCommentHandler) ForwardToSns(ctx context.Context, request *http.BufferedRequest) error { + buffer := bytes.NewBuffer([]byte{}) + if err := request.GetRequestWithContext(ctx).Write(buffer); err != nil { + return errors.Wrap(err, "writing request to buffer") + } + + if err := p.snsWriter.WriteWithContext(ctx, buffer.Bytes()); err != nil { + return errors.Wrap(err, "writing buffer to sns") + } + p.logger.InfoContext(ctx, "proxied request to sns") + + return nil +} diff --git a/server/neptune/gateway/event/legacy_pull_handler.go b/server/neptune/gateway/event/legacy_pull_handler.go new file mode 100644 index 000000000..e8c30e7de --- /dev/null +++ b/server/neptune/gateway/event/legacy_pull_handler.go @@ -0,0 +1,48 @@ +package event + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + "github.com/runatlantis/atlantis/server/config/valid" + "github.com/runatlantis/atlantis/server/legacy/events/command" + "github.com/runatlantis/atlantis/server/legacy/http" + "github.com/runatlantis/atlantis/server/logging" + "github.com/runatlantis/atlantis/server/models" +) + +const PlatformModeApplyStatusMessage = "THIS IS A LEGACY STATUS CHECK AND IS NOT RELEVANT PLEASE LOOK AT atlantis/deploy status checks" + +type vcsStatusUpdater interface { + UpdateCombined(ctx context.Context, repo models.Repo, pull models.PullRequest, status models.VCSStatus, cmdName fmt.Stringer, statusID string, output string) (string, error) + UpdateCombinedCount(ctx context.Context, repo models.Repo, pull models.PullRequest, status models.VCSStatus, cmdName fmt.Stringer, numSuccess int, numTotal int, statusID string) (string, error) +} + +type workerProxy interface { + Handle(ctx context.Context, request *http.BufferedRequest, event PullRequest) error +} + +type LegacyPullHandler struct { + VCSStatusUpdater vcsStatusUpdater + WorkerProxy workerProxy + Logger logging.Logger +} + +func (l *LegacyPullHandler) Handle(ctx context.Context, request *http.BufferedRequest, event PullRequest, allRoots []*valid.MergedProjectCfg) error { + // mark legacy statuses as successful if there are no roots in general + // this is processed here to make it easy to clean up when we deprecate legacy mode + if len(allRoots) == 0 { + if _, statusErr := l.VCSStatusUpdater.UpdateCombinedCount(ctx, event.Pull.HeadRepo, event.Pull, models.SuccessVCSStatus, command.Plan, 0, 0, ""); statusErr != nil { + l.Logger.WarnContext(ctx, fmt.Sprintf("unable to update commit status: %s", statusErr)) + } + return nil + } + + // forward to sns + err := l.WorkerProxy.Handle(ctx, request, event) + if err != nil { + return errors.Wrap(err, "proxying request to sns") + } + return nil +} diff --git a/server/neptune/gateway/event/legacy_pull_handler_test.go b/server/neptune/gateway/event/legacy_pull_handler_test.go new file mode 100644 index 000000000..95c0a58e2 --- /dev/null +++ b/server/neptune/gateway/event/legacy_pull_handler_test.go @@ -0,0 +1,94 @@ +package event_test + +import ( + "context" + "fmt" + "testing" + + "github.com/runatlantis/atlantis/server/config/valid" + "github.com/runatlantis/atlantis/server/legacy/http" + "github.com/runatlantis/atlantis/server/logging" + "github.com/runatlantis/atlantis/server/models" + "github.com/runatlantis/atlantis/server/neptune/gateway/event" + "github.com/stretchr/testify/assert" +) + +func TestLegacyHandler_Handle_NoRoots(t *testing.T) { + logger := logging.NewNoopCtxLogger(t) + statusUpdater := &mockVCSStatusUpdater{} + workerProxy := &mockWorkerProxy{} + legacyHandler := event.LegacyPullHandler{ + Logger: logger, + VCSStatusUpdater: statusUpdater, + WorkerProxy: workerProxy, + } + err := legacyHandler.Handle(context.Background(), &http.BufferedRequest{}, event.PullRequest{}, []*valid.MergedProjectCfg{}) + assert.NoError(t, err) + assert.False(t, workerProxy.called) + assert.Equal(t, statusUpdater.combinedCountCalls, 1) + assert.Equal(t, statusUpdater.combinedCalls, 0) +} + +func TestLegacyHandler_Handle_WorkerProxyFailure(t *testing.T) { + logger := logging.NewNoopCtxLogger(t) + statusUpdater := &mockVCSStatusUpdater{} + legacyRoot := &valid.MergedProjectCfg{ + Name: "legacy", + } + legacyHandler := event.LegacyPullHandler{ + Logger: logger, + VCSStatusUpdater: statusUpdater, + WorkerProxy: &mockWorkerProxy{err: assert.AnError}, + } + err := legacyHandler.Handle(context.Background(), &http.BufferedRequest{}, event.PullRequest{}, []*valid.MergedProjectCfg{legacyRoot}) + assert.ErrorIs(t, err, assert.AnError) + assert.Equal(t, 0, statusUpdater.combinedCountCalls) + assert.Equal(t, 0, statusUpdater.combinedCalls) +} + +func TestLegacyHandler_Handle_WorkerProxySuccess(t *testing.T) { + logger := logging.NewNoopCtxLogger(t) + statusUpdater := &mockVCSStatusUpdater{} + workerProxy := &mockWorkerProxy{} + legacyRoot := &valid.MergedProjectCfg{ + Name: "legacy", + } + legacyHandler := event.LegacyPullHandler{ + Logger: logger, + VCSStatusUpdater: statusUpdater, + WorkerProxy: workerProxy, + } + err := legacyHandler.Handle(context.Background(), &http.BufferedRequest{}, event.PullRequest{}, []*valid.MergedProjectCfg{legacyRoot}) + assert.NoError(t, err) + assert.True(t, workerProxy.called) + assert.Equal(t, 0, statusUpdater.combinedCountCalls) + assert.Equal(t, 0, statusUpdater.combinedCalls) +} + +type mockVCSStatusUpdater struct { + combinedCalls int + combinedError error + + combinedCountError error + combinedCountCalls int +} + +func (m *mockVCSStatusUpdater) UpdateCombined(ctx context.Context, repo models.Repo, pull models.PullRequest, status models.VCSStatus, cmdName fmt.Stringer, statusID string, output string) (string, error) { + m.combinedCalls++ + return "", m.combinedError +} + +func (m *mockVCSStatusUpdater) UpdateCombinedCount(ctx context.Context, repo models.Repo, pull models.PullRequest, status models.VCSStatus, cmdName fmt.Stringer, numSuccess int, numTotal int, statusID string) (string, error) { + m.combinedCountCalls++ + return "", m.combinedCountError +} + +type mockWorkerProxy struct { + called bool + err error +} + +func (w *mockWorkerProxy) Handle(ctx context.Context, request *http.BufferedRequest, event event.PullRequest) error { + w.called = true + return w.err +} diff --git a/server/neptune/gateway/event/modified_pull_request_handler.go b/server/neptune/gateway/event/modified_pull_request_handler.go index 67598b581..5280b3b4d 100644 --- a/server/neptune/gateway/event/modified_pull_request_handler.go +++ b/server/neptune/gateway/event/modified_pull_request_handler.go @@ -2,9 +2,8 @@ package event import ( "context" - "time" - "github.com/hashicorp/go-multierror" + "time" "github.com/runatlantis/atlantis/server/config/valid" "github.com/runatlantis/atlantis/server/neptune/gateway/config" @@ -19,6 +18,10 @@ import ( "github.com/runatlantis/atlantis/server/models" ) +type legacyHandler interface { + Handle(ctx context.Context, request *http.BufferedRequest, event PullRequest, allRoots []*valid.MergedProjectCfg) error +} + type prSignaler interface { SignalWithStartWorkflow(ctx context.Context, rootCfgs []*valid.MergedProjectCfg, prRequest pr.Request) (client.WorkflowRun, error) } @@ -29,6 +32,7 @@ type ModifiedPullHandler struct { RootConfigBuilder rootConfigBuilder GlobalCfg valid.GlobalCfg RequirementChecker requirementChecker + LegacyHandler legacyHandler PRSignaler prSignaler } @@ -41,13 +45,14 @@ type PullRequest struct { InstallationToken int64 } -func NewModifiedPullHandler(logger logging.Logger, scheduler scheduler, rootConfigBuilder rootConfigBuilder, globalCfg valid.GlobalCfg, requirementChecker requirementChecker, prSignaler prSignaler) *ModifiedPullHandler { +func NewModifiedPullHandler(logger logging.Logger, scheduler scheduler, rootConfigBuilder rootConfigBuilder, globalCfg valid.GlobalCfg, requirementChecker requirementChecker, prSignaler prSignaler, legacyHandler legacyHandler) *ModifiedPullHandler { return &ModifiedPullHandler{ Logger: logger, Scheduler: scheduler, RootConfigBuilder: rootConfigBuilder, GlobalCfg: globalCfg, RequirementChecker: requirementChecker, + LegacyHandler: legacyHandler, PRSignaler: prSignaler, } } @@ -97,6 +102,7 @@ func (p *ModifiedPullHandler) handle(ctx context.Context, request *http.Buffered } fxns := []func(ctx context.Context, request *http.BufferedRequest, event PullRequest, allRoots []*valid.MergedProjectCfg) error{ + p.LegacyHandler.Handle, // TODO: remove when we deprecate legacy mode p.handlePlatformMode, } var combinedErrors *multierror.Error diff --git a/server/neptune/gateway/event/modified_pull_request_handler_test.go b/server/neptune/gateway/event/modified_pull_request_handler_test.go index 97f13593f..33a888d7b 100644 --- a/server/neptune/gateway/event/modified_pull_request_handler_test.go +++ b/server/neptune/gateway/event/modified_pull_request_handler_test.go @@ -61,6 +61,10 @@ func TestModifiedPullHandler_Handle_SignalerFailure(t *testing.T) { expectedT: t, rootConfigs: []*valid.MergedProjectCfg{root}, }, + LegacyHandler: &mockLegacyHandler{ + expectedAllRoots: []*valid.MergedProjectCfg{root}, + expectedT: t, + }, PRSignaler: &mockPRSignaler{ error: assert.AnError, expectedRoots: []*valid.MergedProjectCfg{root}, @@ -106,6 +110,11 @@ func TestModifiedPullHandler_Handle_BranchStrategy(t *testing.T) { pull := event.PullRequest{ Pull: pullRequest, } + legacyHandler := &mockLegacyHandler{ + expectedEvent: pull, + expectedAllRoots: []*valid.MergedProjectCfg{legacyRoot}, + expectedT: t, + } prRequest := pr.Request{ Revision: "sha", Repo: testRepo, @@ -134,10 +143,12 @@ func TestModifiedPullHandler_Handle_BranchStrategy(t *testing.T) { expectedT: t, rootConfigs: []*valid.MergedProjectCfg{legacyRoot}, }, - PRSignaler: signaler, + LegacyHandler: legacyHandler, + PRSignaler: signaler, } err := pullHandler.Handle(context.Background(), &http.BufferedRequest{}, pull) assert.NoError(t, err) + assert.True(t, legacyHandler.called) assert.True(t, signaler.called) } @@ -185,6 +196,11 @@ func TestModifiedPullHandler_Handle_MergeStrategy(t *testing.T) { pr := event.PullRequest{ Pull: pullRequest, } + legacyHandler := &mockLegacyHandler{ + expectedEvent: pr, + expectedAllRoots: []*valid.MergedProjectCfg{root}, + expectedT: t, + } pullHandler := event.ModifiedPullHandler{ Logger: logger, Scheduler: &sync.SynchronousScheduler{Logger: logger}, @@ -195,10 +211,12 @@ func TestModifiedPullHandler_Handle_MergeStrategy(t *testing.T) { expectedT: t, rootConfigs: []*valid.MergedProjectCfg{root}, }, - PRSignaler: signaler, + LegacyHandler: legacyHandler, + PRSignaler: signaler, } err := pullHandler.Handle(context.Background(), &http.BufferedRequest{}, pr) assert.NoError(t, err) + assert.True(t, legacyHandler.called) assert.True(t, signaler.called) } @@ -219,6 +237,21 @@ func (r *mockConfigBuilder) Build(_ context.Context, commit *config.RepoCommit, return r.rootConfigs, r.error } +type mockLegacyHandler struct { + expectedEvent event.PullRequest + expectedAllRoots []*valid.MergedProjectCfg + expectedT *testing.T + error error + called bool +} + +func (l *mockLegacyHandler) Handle(ctx context.Context, _ *http.BufferedRequest, event event.PullRequest, allRoots []*valid.MergedProjectCfg) error { + l.called = true + assert.Equal(l.expectedT, l.expectedEvent, event) + assert.Equal(l.expectedT, l.expectedAllRoots, allRoots) + return l.error +} + type mockPRSignaler struct { called bool error error diff --git a/server/neptune/gateway/event/pull_request_review_handler.go b/server/neptune/gateway/event/pull_request_review_handler.go index 45a644d8c..6b7b85a5d 100644 --- a/server/neptune/gateway/event/pull_request_review_handler.go +++ b/server/neptune/gateway/event/pull_request_review_handler.go @@ -1,12 +1,12 @@ package event import ( + "bytes" "context" - "time" - "github.com/runatlantis/atlantis/server/config/valid" "github.com/runatlantis/atlantis/server/neptune/gateway/config" "github.com/runatlantis/atlantis/server/vcs/provider/github" + "time" "github.com/runatlantis/atlantis/server/neptune/gateway/pr" "github.com/uber-go/tally/v4" @@ -46,6 +46,7 @@ type workflowSignaler interface { type PullRequestReviewWorkerProxy struct { Scheduler scheduler + SnsWriter Writer Logger logging.Logger CheckRunFetcher fetcher WorkflowSignaler workflowSignaler @@ -84,8 +85,23 @@ func (p *PullRequestReviewWorkerProxy) handleLegacyMode(ctx context.Context, req if len(failedPolicyCheckRuns) == 0 { return nil } + // Forward to SNS to further process in the worker + return p.forwardToSns(ctx, request) +} + +func (p *PullRequestReviewWorkerProxy) forwardToSns(ctx context.Context, request *http.BufferedRequest) error { + buffer := bytes.NewBuffer([]byte{}) + if err := request.GetRequestWithContext(ctx).Write(buffer); err != nil { + return errors.Wrap(err, "writing request to buffer") + } + + if err := p.SnsWriter.WriteWithContext(ctx, buffer.Bytes()); err != nil { + return errors.Wrap(err, "writing buffer to sns") + } + p.Logger.InfoContext(ctx, "proxied request to sns") return nil } + func (p *PullRequestReviewWorkerProxy) handlePlatformMode(ctx context.Context, request *http.BufferedRequest, event PullRequestReview) error { // Ignore events that are neither approved nor changes requested if event.State != Approved && event.State != ChangesRequested { diff --git a/server/neptune/gateway/event/pull_request_review_handler_test.go b/server/neptune/gateway/event/pull_request_review_handler_test.go index 2ca37ccf3..3d3fdcc18 100644 --- a/server/neptune/gateway/event/pull_request_review_handler_test.go +++ b/server/neptune/gateway/event/pull_request_review_handler_test.go @@ -3,13 +3,12 @@ package event_test import ( "bytes" "context" - "io" - "net/http" - "testing" - "github.com/runatlantis/atlantis/server/config/valid" "github.com/runatlantis/atlantis/server/neptune/gateway/config" "github.com/runatlantis/atlantis/server/neptune/gateway/pr" + "io" + "net/http" + "testing" buffered "github.com/runatlantis/atlantis/server/legacy/http" "github.com/runatlantis/atlantis/server/logging" @@ -34,6 +33,38 @@ func buildRequest(t *testing.T) *buffered.BufferedRequest { return r } +func TestPullRequestReviewWorkerProxy_HandleApprovalWithFailedPolicies(t *testing.T) { + writer := &mockSnsWriter{} + mockFetcher := &mockCheckRunFetcher{ + failedPolicies: []string{"failed policy"}, + } + logger := logging.NewNoopCtxLogger(t) + signaler := &reviewSignaler{ + t: t, + expectedRepoName: "repo", + expectedPullNum: 0, + expectedRevision: ref, + } + proxy := event.PullRequestReviewWorkerProxy{ + Scheduler: &sync.SynchronousScheduler{Logger: logger}, + SnsWriter: writer, + Logger: logger, + CheckRunFetcher: mockFetcher, + WorkflowSignaler: signaler, + Scope: tally.NewTestScope("", map[string]string{}), + } + prrEvent := event.PullRequestReview{ + State: event.Approved, + Repo: models.Repo{FullName: repoFullName}, + Ref: "ref", + } + err := proxy.Handle(context.Background(), prrEvent, buildRequest(t)) + assert.NoError(t, err) + assert.True(t, writer.isCalled) + assert.True(t, mockFetcher.called) + assert.True(t, signaler.called) +} + func TestPullRequestReviewWorkerProxy_HandleChangesRequestedWithFailedPolicies(t *testing.T) { logger := logging.NewNoopCtxLogger(t) signaler := &reviewSignaler{ @@ -95,6 +126,7 @@ func TestPullRequestReviewWorkerProxy_HandleSuccessNoFailedPolicies(t *testing.T } proxy := event.PullRequestReviewWorkerProxy{ Scheduler: &sync.SynchronousScheduler{Logger: logger}, + SnsWriter: writer, Logger: logger, CheckRunFetcher: mockFetcher, WorkflowSignaler: signaler, @@ -112,6 +144,129 @@ func TestPullRequestReviewWorkerProxy_HandleSuccessNoFailedPolicies(t *testing.T assert.True(t, signaler.called) } +func TestPullRequestReviewWorkerProxy_InvalidEvent(t *testing.T) { + writer := &mockSnsWriter{} + mockFetcher := &mockCheckRunFetcher{} + logger := logging.NewNoopCtxLogger(t) + signaler := &reviewSignaler{} + proxy := event.PullRequestReviewWorkerProxy{ + Scheduler: &sync.SynchronousScheduler{Logger: logger}, + SnsWriter: writer, + Logger: logger, + CheckRunFetcher: mockFetcher, + WorkflowSignaler: signaler, + Scope: tally.NewTestScope("", map[string]string{}), + } + prrEvent := event.PullRequestReview{ + State: "something else", + Repo: models.Repo{FullName: repoFullName}, + } + err := proxy.Handle(context.Background(), prrEvent, buildRequest(t)) + assert.NoError(t, err) + assert.False(t, writer.isCalled) + assert.False(t, mockFetcher.called) + assert.False(t, signaler.called) +} + +func TestPullRequestReviewWorkerProxy_FetcherError(t *testing.T) { + writer := &mockSnsWriter{} + mockFetcher := &mockCheckRunFetcher{ + err: assert.AnError, + } + logger := logging.NewNoopCtxLogger(t) + signaler := &reviewSignaler{ + t: t, + expectedRepoName: "repo", + expectedPullNum: 0, + expectedRevision: ref, + } + proxy := event.PullRequestReviewWorkerProxy{ + Scheduler: &sync.SynchronousScheduler{Logger: logger}, + SnsWriter: writer, + Logger: logger, + CheckRunFetcher: mockFetcher, + WorkflowSignaler: signaler, + Scope: tally.NewTestScope("", map[string]string{}), + } + prrEvent := event.PullRequestReview{ + State: event.Approved, + Repo: models.Repo{FullName: repoFullName}, + Ref: ref, + } + err := proxy.Handle(context.Background(), prrEvent, buildRequest(t)) + assert.Error(t, err) + assert.False(t, writer.isCalled) + assert.True(t, mockFetcher.called) + assert.True(t, signaler.called) +} + +func TestPullRequestReviewWorkerProxy_SNSError(t *testing.T) { + writer := &mockSnsWriter{} + mockFetcher := &mockCheckRunFetcher{ + failedPolicies: []string{"failed policy"}, + } + logger := logging.NewNoopCtxLogger(t) + signaler := &reviewSignaler{ + t: t, + expectedRepoName: "repo", + expectedPullNum: 0, + expectedRevision: ref, + } + proxy := event.PullRequestReviewWorkerProxy{ + Scheduler: &sync.SynchronousScheduler{Logger: logger}, + SnsWriter: writer, + Logger: logger, + CheckRunFetcher: mockFetcher, + WorkflowSignaler: signaler, + Scope: tally.NewTestScope("", map[string]string{}), + } + prrEvent := event.PullRequestReview{ + State: event.Approved, + Repo: models.Repo{FullName: repoFullName}, + Ref: ref, + } + + err := proxy.Handle(context.Background(), prrEvent, buildRequest(t)) + assert.NoError(t, err) + assert.True(t, writer.isCalled) + assert.True(t, mockFetcher.called) + assert.True(t, signaler.called) +} + +func TestPullRequestReviewWorkerProxy_SignalerError(t *testing.T) { + writer := &mockSnsWriter{} + mockFetcher := &mockCheckRunFetcher{ + failedPolicies: []string{"failed policy"}, + } + logger := logging.NewNoopCtxLogger(t) + signaler := &reviewSignaler{ + t: t, + expectedRepoName: "repo", + expectedPullNum: 0, + expectedRevision: ref, + err: assert.AnError, + } + proxy := event.PullRequestReviewWorkerProxy{ + Scheduler: &sync.SynchronousScheduler{Logger: logger}, + SnsWriter: writer, + Logger: logger, + CheckRunFetcher: mockFetcher, + WorkflowSignaler: signaler, + Scope: tally.NewTestScope("", map[string]string{}), + } + prrEvent := event.PullRequestReview{ + State: event.Approved, + Repo: models.Repo{FullName: repoFullName}, + Ref: ref, + } + + err := proxy.Handle(context.Background(), prrEvent, buildRequest(t)) + assert.Error(t, err) + assert.True(t, writer.isCalled) + assert.True(t, mockFetcher.called) + assert.True(t, signaler.called) +} + type mockSnsWriter struct { err error isCalled bool diff --git a/server/neptune/gateway/event/sns_worker_proxy.go b/server/neptune/gateway/event/sns_worker_proxy.go new file mode 100644 index 000000000..3e0d102fc --- /dev/null +++ b/server/neptune/gateway/event/sns_worker_proxy.go @@ -0,0 +1,42 @@ +package event + +// TODO: delete when legacy mode is deprecated +import ( + "bytes" + "context" + + "github.com/pkg/errors" + "github.com/runatlantis/atlantis/server/legacy/http" + "github.com/runatlantis/atlantis/server/logging" +) + +type Writer interface { + WriteWithContext(ctx context.Context, payload []byte) error +} + +type PullSNSWorkerProxy struct { + snsWriter Writer + logger logging.Logger +} + +func NewSNSWorkerProxy(snsWriter Writer, logger logging.Logger) *PullSNSWorkerProxy { + return &PullSNSWorkerProxy{ + snsWriter: snsWriter, + logger: logger, + } +} + +func (p *PullSNSWorkerProxy) Handle(ctx context.Context, request *http.BufferedRequest, event PullRequest) error { + buffer := bytes.NewBuffer([]byte{}) + + if err := request.GetRequestWithContext(ctx).Write(buffer); err != nil { + return errors.Wrap(err, "writing request to buffer") + } + + if err := p.snsWriter.WriteWithContext(ctx, buffer.Bytes()); err != nil { + return errors.Wrap(err, "writing buffer to sns") + } + + p.logger.InfoContext(ctx, "proxied request to sns") + return nil +} diff --git a/server/neptune/gateway/server.go b/server/neptune/gateway/server.go index 53e5fb466..bf0db4c56 100644 --- a/server/neptune/gateway/server.go +++ b/server/neptune/gateway/server.go @@ -19,6 +19,8 @@ import ( "github.com/runatlantis/atlantis/server/legacy/events" "github.com/runatlantis/atlantis/server/legacy/events/command" "github.com/runatlantis/atlantis/server/legacy/events/vcs" + "github.com/runatlantis/atlantis/server/legacy/lyft/aws" + "github.com/runatlantis/atlantis/server/legacy/lyft/aws/sns" lyft_gateway "github.com/runatlantis/atlantis/server/legacy/lyft/gateway" "github.com/runatlantis/atlantis/server/logging" "github.com/runatlantis/atlantis/server/metrics" @@ -172,6 +174,8 @@ func NewServer(config Config) (*Server, error) { } vcsClient := vcs.NewInstrumentedGithubClient(rawGithubClient, statsScope, ctxLogger) + + session, err := aws.NewSession() if err != nil { return nil, errors.Wrap(err, "initializing new aws session") } @@ -191,6 +195,8 @@ func NewServer(config Config) (*Server, error) { PanicRecoveryEnabled: true, } asyncScheduler := sync.NewAsyncScheduler(ctxLogger, syncScheduler) + + gatewaySnsWriter := sns.NewWriterWithStats(session, config.SNSTopicArn, statsScope.SubScope("aws.sns.gateway")) vcsStatusUpdater := &command.VCSStatusUpdater{Client: vcsClient, TitleBuilder: vcs.StatusTitleBuilder{TitlePrefix: config.GithubStatusName}} repoConverter := github_converter.RepoConverter{} @@ -257,6 +263,7 @@ func NewServer(config Config) (*Server, error) { statsScope, []byte(config.GithubWebhookSecret), false, + gatewaySnsWriter, commentParser, repoAllowlist, vcsClient,