Skip to content

Commit

Permalink
ok
Browse files Browse the repository at this point in the history
  • Loading branch information
smonero committed Mar 1, 2024
1 parent 4ab7d40 commit 8f4893b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 184 deletions.
3 changes: 0 additions & 3 deletions cmd/terraformadmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,12 @@ func (t *TerraformAdmin) NewServer(userConfig legacy.UserConfig, config legacy.C
"parsing atlantis url %q", userConfig.AtlantisURL)
}

// TODO: we should just supply a yaml file with this info and load it directly into the
// app config struct
appConfig, err := createGHAppConfig(userConfig)
if err != nil {
return nil, err
}

// we don't need the feature config

cfg := &neptune.Config{
AuthCfg: neptune.AuthConfig{
SslCertFile: userConfig.SSLCertFile,
Expand Down
199 changes: 18 additions & 181 deletions server/neptune/terraformadmin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,8 @@ import (
"github.com/runatlantis/atlantis/server/metrics"
neptune_http "github.com/runatlantis/atlantis/server/neptune/http"
lyftActivities "github.com/runatlantis/atlantis/server/neptune/lyft/activities"
"github.com/runatlantis/atlantis/server/neptune/lyft/executor"
"github.com/runatlantis/atlantis/server/neptune/lyft/notifier"
lyftWorkflows "github.com/runatlantis/atlantis/server/neptune/lyft/workflows"
"github.com/runatlantis/atlantis/server/neptune/temporal"
"github.com/runatlantis/atlantis/server/neptune/temporalworker/config"
"github.com/runatlantis/atlantis/server/neptune/temporalworker/controllers"
"github.com/runatlantis/atlantis/server/neptune/temporalworker/job"
"github.com/runatlantis/atlantis/server/neptune/workflows"
"github.com/runatlantis/atlantis/server/neptune/workflows/activities"
"github.com/runatlantis/atlantis/server/neptune/workflows/plugins"
Expand All @@ -41,7 +36,6 @@ import (
"github.com/urfave/negroni"
"go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)

const (
Expand All @@ -65,17 +59,14 @@ const (
)

type Server struct {
Logger logging.Logger
HTTPServerProxy *neptune_http.ServerProxy
Port int
StatsScope tally.Scope
StatsCloser io.Closer
TemporalClient *temporal.ClientWrapper
JobStreamHandler *job.StreamHandler
DeployActivities *activities.Deploy
TerraformActivities *activities.Terraform
GithubActivities *activities.Github
RevisionSetterActivities *lyftActivities.RevisionSetter
Logger logging.Logger
HTTPServerProxy *neptune_http.ServerProxy
Port int
StatsScope tally.Scope
StatsCloser io.Closer
TemporalClient *temporal.ClientWrapper
TerraformActivities *activities.Terraform
GithubActivities *activities.Github
// Temporary until we move this into our private code
PRRevisionGithubActivities *lyftActivities.Github
TerraformTaskQueue string
Expand All @@ -96,20 +87,9 @@ func NewServer(config *config.Config) (*Server, error) {
}

scope = scope.Tagged(map[string]string{
"mode": "worker",
"mode": "terraformadmin",
})

// Build dependencies required for output handler and jobs controller
jobStore, err := job.NewStorageBackendStore(config.JobConfig, scope.SubScope("job.store"), config.CtxLogger)
if err != nil {
return nil, errors.Wrapf(err, "initializing job store")
}
receiverRegistry := job.NewReceiverRegistry()

// terraform job output handler
jobStreamHandler := job.NewStreamHandler(jobStore, receiverRegistry, config.TerraformCfg.LogFilters, config.CtxLogger)
jobsController := controllers.NewJobsController(jobStore, receiverRegistry, config.ServerCfg, scope, config.CtxLogger)

// temporal client + worker initialization
opts := &temporal.Options{
StatsReporter: statsReporter,
Expand All @@ -124,8 +104,6 @@ func NewServer(config *config.Config) (*Server, error) {
router := mux.NewRouter()
router.HandleFunc("/healthz", Healthz).Methods(http.MethodGet)
router.PathPrefix("/static/").Handler(http.FileServer(&assetfs.AssetFS{Asset: static.Asset, AssetDir: static.AssetDir, AssetInfo: static.AssetInfo}))
router.HandleFunc("/jobs/{job-id}", jobsController.GetProjectJobs).Methods(http.MethodGet).Name(ProjectJobsViewRouteName)
router.HandleFunc("/jobs/{job-id}/ws", jobsController.GetProjectJobsWS).Methods(http.MethodGet)
n := negroni.New(&negroni.Recovery{
Logger: log.New(os.Stdout, "", log.LstdFlags),
PrintStack: false,
Expand All @@ -140,13 +118,6 @@ func NewServer(config *config.Config) (*Server, error) {
Logger: config.CtxLogger,
}

// we don't need audit activities

deployActivities, err := activities.NewDeploy(config.DeploymentConfig)
if err != nil {
return nil, errors.Wrap(err, "initializing deploy activities")
}

terraformActivities, err := activities.NewTerraform(
config.TerraformCfg,
config.ValidationConfig,
Expand All @@ -155,7 +126,7 @@ func NewServer(config *config.Config) (*Server, error) {
config.ServerCfg.URL,
config.TemporalCfg.TerraformTaskQueue,
config.GithubCfg.TemporalAppInstallationID,
jobStreamHandler,
nil,
)
if err != nil {
return nil, errors.Wrap(err, "initializing terraform activities")
Expand Down Expand Up @@ -200,78 +171,26 @@ func NewServer(config *config.Config) (*Server, error) {
return nil, errors.Wrap(err, "initializing github activities")
}

revisionSetterActivities, err := lyftActivities.NewRevisionSetter(config.RevisionSetter)
if err != nil {
return nil, errors.Wrap(err, "initializing revision setter activities")
}

prRevisionGithubActivities := &lyftActivities.Github{
ClientCreator: clientCreator,
InstallationID: config.GithubCfg.TemporalAppInstallationID,
}

server := Server{
Logger: config.CtxLogger,
HTTPServerProxy: httpServerProxy,
Port: config.ServerCfg.Port,
StatsScope: scope,
StatsCloser: statsCloser,
TemporalClient: temporalClient,
JobStreamHandler: jobStreamHandler,
DeployActivities: deployActivities,
TerraformActivities: terraformActivities,
GithubActivities: githubActivities,
RevisionSetterActivities: revisionSetterActivities,
TerraformTaskQueue: config.TemporalCfg.TerraformTaskQueue,
RevisionSetterConfig: config.RevisionSetter,
PRRevisionGithubActivities: prRevisionGithubActivities,
Logger: config.CtxLogger,
HTTPServerProxy: httpServerProxy,
Port: config.ServerCfg.Port,
StatsScope: scope,
StatsCloser: statsCloser,
TemporalClient: temporalClient,
TerraformActivities: terraformActivities,
GithubActivities: githubActivities,
}
return &server, nil
}

/*
* BIG TODO FOR ATLANTIS DEPRECATION:
* Figure out what workers / activities / stuff and things we don't need and remove them
*/

func (s Server) Start() error {
defer s.shutdown()

ctx := context.Background()

var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
deployWorker := s.buildDeployWorker()
if err := deployWorker.Run(worker.InterruptCh()); err != nil {
log.Fatalln("unable to start deploy worker", err)
}

s.Logger.InfoContext(ctx, "Shutting down deploy worker, resource clean up may still be occurring in the background")
}()

wg.Add(1)
go func() {
defer wg.Done()
prWorker := worker.New(s.TemporalClient.Client, workflows.PRTaskQueue, worker.Options{
WorkerStopTimeout: TemporalWorkerTimeout,
Interceptors: []interceptor.WorkerInterceptor{
temporal.NewWorkerInterceptor(),
},
})
prWorker.RegisterActivity(s.GithubActivities)
prWorker.RegisterActivity(s.TerraformActivities)
prWorker.RegisterWorkflow(workflows.PR)
prWorker.RegisterWorkflow(workflows.Terraform)
if err := prWorker.Run(worker.InterruptCh()); err != nil {
log.Fatalln("unable to start pr worker", err)
}

s.Logger.InfoContext(ctx, "Shutting down pr worker, resource clean up may still be occurring in the background")
}()

wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -284,50 +203,6 @@ func (s Server) Start() error {
s.Logger.InfoContext(ctx, "Shutting down terraform worker, resource clean up may still be occurring in the background")
}()

// Spinning up a new worker process here adds complexity to the shutdown logic for this worker
// TODO: Investigate the feasibility of deploying this worker process in it's own worker
wg.Add(1)
go func() {
defer wg.Done()

prRevisionWorker := worker.New(s.TemporalClient.Client, lyftWorkflows.PRRevisionTaskQueue, worker.Options{
WorkerStopTimeout: PRRevisionWorkerTimeout,
Interceptors: []interceptor.WorkerInterceptor{
temporal.NewWorkerInterceptor(),
},
TaskQueueActivitiesPerSecond: s.RevisionSetterConfig.DefaultTaskQueue.ActivitiesPerSecond,
})
prRevisionWorker.RegisterWorkflow(lyftWorkflows.PRRevision)
prRevisionWorker.RegisterActivity(s.PRRevisionGithubActivities)
prRevisionWorker.RegisterActivity(s.RevisionSetterActivities)

if err := prRevisionWorker.Run(worker.InterruptCh()); err != nil {
log.Fatalln("unable to start pr revision default worker", err)
}

s.Logger.InfoContext(ctx, "Shutting down pr revision default worker, resource clean up may still be occurring in the background")
}()

wg.Add(1)
go func() {
defer wg.Done()

prRevisionWorker := worker.New(s.TemporalClient.Client, lyftWorkflows.PRRevisionSlowTaskQueue, worker.Options{
WorkerStopTimeout: PRRevisionWorkerTimeout,
Interceptors: []interceptor.WorkerInterceptor{
temporal.NewWorkerInterceptor(),
},
TaskQueueActivitiesPerSecond: s.RevisionSetterConfig.SlowTaskQueue.ActivitiesPerSecond,
})
prRevisionWorker.RegisterActivity(s.PRRevisionGithubActivities)

if err := prRevisionWorker.Run(worker.InterruptCh()); err != nil {
log.Fatalln("unable to start pr revision slow worker", err)
}

s.Logger.InfoContext(ctx, "Shutting down pr revision slow worker, resource clean up may still be occurring in the background")
}()

// Ensure server gracefully drains connections when stopped.
stop := make(chan os.Signal, 1)
// Stop on SIGINTs and SIGTERMs.
Expand All @@ -353,9 +228,6 @@ func (s Server) shutdown() {
// On cleanup, stream handler closes all active receivers and persists in memory jobs to storage
ctx, cancel := context.WithTimeout(context.Background(), StreamHandlerTimeout)

Check failure on line 229 in server/neptune/terraformadmin/server.go

View workflow job for this annotation

GitHub Actions / runner / golangci-lint

ineffectual assignment to ctx (ineffassign)
defer cancel()
if err := s.JobStreamHandler.CleanUp(ctx); err != nil {
s.Logger.Error(err.Error())
}

ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand All @@ -373,41 +245,6 @@ func (s Server) shutdown() {
s.Logger.Close()
}

// TODO: consider building these before initializing the server so that the server is just responsible
// for running the workers and has no knowledge of their dependencies.
func (s Server) buildDeployWorker() worker.Worker {
// pass the underlying client otherwise this will panic()
deployWorker := worker.New(s.TemporalClient.Client, workflows.DeployTaskQueue, worker.Options{
WorkerStopTimeout: TemporalWorkerTimeout,
Interceptors: []interceptor.WorkerInterceptor{
temporal.NewWorkerInterceptor(),
},
})
deployWorker.RegisterActivity(s.DeployActivities)
deployWorker.RegisterActivity(s.GithubActivities)
deployWorker.RegisterActivity(s.TerraformActivities)
deployWorker.RegisterWorkflowWithOptions(workflows.GetDeployWithPlugins(
func(ctx workflow.Context, dr workflows.DeployRequest) (plugins.Deploy, error) {
var a *lyftActivities.Audit

return plugins.Deploy{
Notifiers: []plugins.TerraformWorkflowNotifier{
&notifier.SNSNotifier{
Activity: a,
},
},
PostDeployExecutors: []plugins.PostDeployExecutor{
&executor.PRRevisionWorkflowExecutor{TaskQueue: lyftWorkflows.PRRevisionTaskQueue},
},
}, nil
},
), workflow.RegisterOptions{
Name: workflows.Deploy,
})
deployWorker.RegisterWorkflow(workflows.Terraform)
return deployWorker
}

func (s Server) buildTerraformWorker() worker.Worker {
// pass the underlying client otherwise this will panic()
terraformWorker := worker.New(s.TemporalClient.Client, s.TerraformTaskQueue, worker.Options{
Expand Down

0 comments on commit 8f4893b

Please sign in to comment.