diff --git a/cmd/terraformadmin.go b/cmd/terraformadmin.go index 735645a6c..357db9687 100644 --- a/cmd/terraformadmin.go +++ b/cmd/terraformadmin.go @@ -36,15 +36,12 @@ func (t *TerraformAdmin) NewServer(userConfig legacy.UserConfig, config legacy.C "parsing atlantis url %q", userConfig.AtlantisURL) } - // TODO: we should just supply a yaml file with this info and load it directly into the - // app config struct appConfig, err := createGHAppConfig(userConfig) if err != nil { return nil, err } // we don't need the feature config - cfg := &neptune.Config{ AuthCfg: neptune.AuthConfig{ SslCertFile: userConfig.SSLCertFile, diff --git a/server/neptune/terraformadmin/server.go b/server/neptune/terraformadmin/server.go index 2a8fe6255..3ea5bed6f 100644 --- a/server/neptune/terraformadmin/server.go +++ b/server/neptune/terraformadmin/server.go @@ -26,13 +26,8 @@ import ( "github.com/runatlantis/atlantis/server/metrics" neptune_http "github.com/runatlantis/atlantis/server/neptune/http" lyftActivities "github.com/runatlantis/atlantis/server/neptune/lyft/activities" - "github.com/runatlantis/atlantis/server/neptune/lyft/executor" - "github.com/runatlantis/atlantis/server/neptune/lyft/notifier" - lyftWorkflows "github.com/runatlantis/atlantis/server/neptune/lyft/workflows" "github.com/runatlantis/atlantis/server/neptune/temporal" "github.com/runatlantis/atlantis/server/neptune/temporalworker/config" - "github.com/runatlantis/atlantis/server/neptune/temporalworker/controllers" - "github.com/runatlantis/atlantis/server/neptune/temporalworker/job" "github.com/runatlantis/atlantis/server/neptune/workflows" "github.com/runatlantis/atlantis/server/neptune/workflows/activities" "github.com/runatlantis/atlantis/server/neptune/workflows/plugins" @@ -41,7 +36,6 @@ import ( "github.com/urfave/negroni" "go.temporal.io/sdk/interceptor" "go.temporal.io/sdk/worker" - "go.temporal.io/sdk/workflow" ) const ( @@ -65,17 +59,14 @@ const ( ) type Server struct { - Logger logging.Logger - HTTPServerProxy *neptune_http.ServerProxy - Port int - StatsScope tally.Scope - StatsCloser io.Closer - TemporalClient *temporal.ClientWrapper - JobStreamHandler *job.StreamHandler - DeployActivities *activities.Deploy - TerraformActivities *activities.Terraform - GithubActivities *activities.Github - RevisionSetterActivities *lyftActivities.RevisionSetter + Logger logging.Logger + HTTPServerProxy *neptune_http.ServerProxy + Port int + StatsScope tally.Scope + StatsCloser io.Closer + TemporalClient *temporal.ClientWrapper + TerraformActivities *activities.Terraform + GithubActivities *activities.Github // Temporary until we move this into our private code PRRevisionGithubActivities *lyftActivities.Github TerraformTaskQueue string @@ -96,20 +87,9 @@ func NewServer(config *config.Config) (*Server, error) { } scope = scope.Tagged(map[string]string{ - "mode": "worker", + "mode": "terraformadmin", }) - // Build dependencies required for output handler and jobs controller - jobStore, err := job.NewStorageBackendStore(config.JobConfig, scope.SubScope("job.store"), config.CtxLogger) - if err != nil { - return nil, errors.Wrapf(err, "initializing job store") - } - receiverRegistry := job.NewReceiverRegistry() - - // terraform job output handler - jobStreamHandler := job.NewStreamHandler(jobStore, receiverRegistry, config.TerraformCfg.LogFilters, config.CtxLogger) - jobsController := controllers.NewJobsController(jobStore, receiverRegistry, config.ServerCfg, scope, config.CtxLogger) - // temporal client + worker initialization opts := &temporal.Options{ StatsReporter: statsReporter, @@ -124,8 +104,6 @@ func NewServer(config *config.Config) (*Server, error) { router := mux.NewRouter() router.HandleFunc("/healthz", Healthz).Methods(http.MethodGet) router.PathPrefix("/static/").Handler(http.FileServer(&assetfs.AssetFS{Asset: static.Asset, AssetDir: static.AssetDir, AssetInfo: static.AssetInfo})) - router.HandleFunc("/jobs/{job-id}", jobsController.GetProjectJobs).Methods(http.MethodGet).Name(ProjectJobsViewRouteName) - router.HandleFunc("/jobs/{job-id}/ws", jobsController.GetProjectJobsWS).Methods(http.MethodGet) n := negroni.New(&negroni.Recovery{ Logger: log.New(os.Stdout, "", log.LstdFlags), PrintStack: false, @@ -140,13 +118,6 @@ func NewServer(config *config.Config) (*Server, error) { Logger: config.CtxLogger, } - // we don't need audit activities - - deployActivities, err := activities.NewDeploy(config.DeploymentConfig) - if err != nil { - return nil, errors.Wrap(err, "initializing deploy activities") - } - terraformActivities, err := activities.NewTerraform( config.TerraformCfg, config.ValidationConfig, @@ -155,7 +126,7 @@ func NewServer(config *config.Config) (*Server, error) { config.ServerCfg.URL, config.TemporalCfg.TerraformTaskQueue, config.GithubCfg.TemporalAppInstallationID, - jobStreamHandler, + nil, ) if err != nil { return nil, errors.Wrap(err, "initializing terraform activities") @@ -200,40 +171,19 @@ func NewServer(config *config.Config) (*Server, error) { return nil, errors.Wrap(err, "initializing github activities") } - revisionSetterActivities, err := lyftActivities.NewRevisionSetter(config.RevisionSetter) - if err != nil { - return nil, errors.Wrap(err, "initializing revision setter activities") - } - - prRevisionGithubActivities := &lyftActivities.Github{ - ClientCreator: clientCreator, - InstallationID: config.GithubCfg.TemporalAppInstallationID, - } - server := Server{ - Logger: config.CtxLogger, - HTTPServerProxy: httpServerProxy, - Port: config.ServerCfg.Port, - StatsScope: scope, - StatsCloser: statsCloser, - TemporalClient: temporalClient, - JobStreamHandler: jobStreamHandler, - DeployActivities: deployActivities, - TerraformActivities: terraformActivities, - GithubActivities: githubActivities, - RevisionSetterActivities: revisionSetterActivities, - TerraformTaskQueue: config.TemporalCfg.TerraformTaskQueue, - RevisionSetterConfig: config.RevisionSetter, - PRRevisionGithubActivities: prRevisionGithubActivities, + Logger: config.CtxLogger, + HTTPServerProxy: httpServerProxy, + Port: config.ServerCfg.Port, + StatsScope: scope, + StatsCloser: statsCloser, + TemporalClient: temporalClient, + TerraformActivities: terraformActivities, + GithubActivities: githubActivities, } return &server, nil } -/* - * BIG TODO FOR ATLANTIS DEPRECATION: - * Figure out what workers / activities / stuff and things we don't need and remove them - */ - func (s Server) Start() error { defer s.shutdown() @@ -241,37 +191,6 @@ func (s Server) Start() error { var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - deployWorker := s.buildDeployWorker() - if err := deployWorker.Run(worker.InterruptCh()); err != nil { - log.Fatalln("unable to start deploy worker", err) - } - - s.Logger.InfoContext(ctx, "Shutting down deploy worker, resource clean up may still be occurring in the background") - }() - - wg.Add(1) - go func() { - defer wg.Done() - prWorker := worker.New(s.TemporalClient.Client, workflows.PRTaskQueue, worker.Options{ - WorkerStopTimeout: TemporalWorkerTimeout, - Interceptors: []interceptor.WorkerInterceptor{ - temporal.NewWorkerInterceptor(), - }, - }) - prWorker.RegisterActivity(s.GithubActivities) - prWorker.RegisterActivity(s.TerraformActivities) - prWorker.RegisterWorkflow(workflows.PR) - prWorker.RegisterWorkflow(workflows.Terraform) - if err := prWorker.Run(worker.InterruptCh()); err != nil { - log.Fatalln("unable to start pr worker", err) - } - - s.Logger.InfoContext(ctx, "Shutting down pr worker, resource clean up may still be occurring in the background") - }() - wg.Add(1) go func() { defer wg.Done() @@ -284,50 +203,6 @@ func (s Server) Start() error { s.Logger.InfoContext(ctx, "Shutting down terraform worker, resource clean up may still be occurring in the background") }() - // Spinning up a new worker process here adds complexity to the shutdown logic for this worker - // TODO: Investigate the feasibility of deploying this worker process in it's own worker - wg.Add(1) - go func() { - defer wg.Done() - - prRevisionWorker := worker.New(s.TemporalClient.Client, lyftWorkflows.PRRevisionTaskQueue, worker.Options{ - WorkerStopTimeout: PRRevisionWorkerTimeout, - Interceptors: []interceptor.WorkerInterceptor{ - temporal.NewWorkerInterceptor(), - }, - TaskQueueActivitiesPerSecond: s.RevisionSetterConfig.DefaultTaskQueue.ActivitiesPerSecond, - }) - prRevisionWorker.RegisterWorkflow(lyftWorkflows.PRRevision) - prRevisionWorker.RegisterActivity(s.PRRevisionGithubActivities) - prRevisionWorker.RegisterActivity(s.RevisionSetterActivities) - - if err := prRevisionWorker.Run(worker.InterruptCh()); err != nil { - log.Fatalln("unable to start pr revision default worker", err) - } - - s.Logger.InfoContext(ctx, "Shutting down pr revision default worker, resource clean up may still be occurring in the background") - }() - - wg.Add(1) - go func() { - defer wg.Done() - - prRevisionWorker := worker.New(s.TemporalClient.Client, lyftWorkflows.PRRevisionSlowTaskQueue, worker.Options{ - WorkerStopTimeout: PRRevisionWorkerTimeout, - Interceptors: []interceptor.WorkerInterceptor{ - temporal.NewWorkerInterceptor(), - }, - TaskQueueActivitiesPerSecond: s.RevisionSetterConfig.SlowTaskQueue.ActivitiesPerSecond, - }) - prRevisionWorker.RegisterActivity(s.PRRevisionGithubActivities) - - if err := prRevisionWorker.Run(worker.InterruptCh()); err != nil { - log.Fatalln("unable to start pr revision slow worker", err) - } - - s.Logger.InfoContext(ctx, "Shutting down pr revision slow worker, resource clean up may still be occurring in the background") - }() - // Ensure server gracefully drains connections when stopped. stop := make(chan os.Signal, 1) // Stop on SIGINTs and SIGTERMs. @@ -353,9 +228,6 @@ func (s Server) shutdown() { // On cleanup, stream handler closes all active receivers and persists in memory jobs to storage ctx, cancel := context.WithTimeout(context.Background(), StreamHandlerTimeout) defer cancel() - if err := s.JobStreamHandler.CleanUp(ctx); err != nil { - s.Logger.Error(err.Error()) - } ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -373,41 +245,6 @@ func (s Server) shutdown() { s.Logger.Close() } -// TODO: consider building these before initializing the server so that the server is just responsible -// for running the workers and has no knowledge of their dependencies. -func (s Server) buildDeployWorker() worker.Worker { - // pass the underlying client otherwise this will panic() - deployWorker := worker.New(s.TemporalClient.Client, workflows.DeployTaskQueue, worker.Options{ - WorkerStopTimeout: TemporalWorkerTimeout, - Interceptors: []interceptor.WorkerInterceptor{ - temporal.NewWorkerInterceptor(), - }, - }) - deployWorker.RegisterActivity(s.DeployActivities) - deployWorker.RegisterActivity(s.GithubActivities) - deployWorker.RegisterActivity(s.TerraformActivities) - deployWorker.RegisterWorkflowWithOptions(workflows.GetDeployWithPlugins( - func(ctx workflow.Context, dr workflows.DeployRequest) (plugins.Deploy, error) { - var a *lyftActivities.Audit - - return plugins.Deploy{ - Notifiers: []plugins.TerraformWorkflowNotifier{ - ¬ifier.SNSNotifier{ - Activity: a, - }, - }, - PostDeployExecutors: []plugins.PostDeployExecutor{ - &executor.PRRevisionWorkflowExecutor{TaskQueue: lyftWorkflows.PRRevisionTaskQueue}, - }, - }, nil - }, - ), workflow.RegisterOptions{ - Name: workflows.Deploy, - }) - deployWorker.RegisterWorkflow(workflows.Terraform) - return deployWorker -} - func (s Server) buildTerraformWorker() worker.Worker { // pass the underlying client otherwise this will panic() terraformWorker := worker.New(s.TemporalClient.Client, s.TerraformTaskQueue, worker.Options{