Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
tlin4194 committed Oct 30, 2024
1 parent f9f0431 commit 556a695
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type queue interface {
CanPop() bool
Pop() (terraform.DeploymentInfo, error)
SetLockForMergedItems(ctx workflow.Context, state LockState)
GetQueuedRevisionsSummary() string
}

type deployer interface {
Expand Down Expand Up @@ -96,7 +97,7 @@ func NewWorker(
},
}

tfWorkflowRunner := terraform.NewWorkflowRunner(tfWorkflow, notifiers, additionalNotifiers...)
tfWorkflowRunner := terraform.NewWorkflowRunner(q, tfWorkflow, notifiers, additionalNotifiers...)
deployer := &Deployer{
Activities: a,
TerraformWorkflowRunner: tfWorkflowRunner,
Expand Down
13 changes: 12 additions & 1 deletion server/neptune/workflows/internal/deploy/terraform/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,21 @@ type stateReceiver interface {
Receive(ctx workflow.Context, c workflow.ReceiveChannel, deploymentInfo DeploymentInfo)
}

func NewWorkflowRunner(w Workflow, internalNotifiers []WorkflowNotifier, additionalNotifiers ...plugins.TerraformWorkflowNotifier) *WorkflowRunner {
type deployQueue interface {
// IsEmpty() bool
// CanPop() bool
// Pop() (DeploymentInfo, error)
// Scan() []DeploymentInfo
// GetLockState() queue.LockState
// SetLockForMergedItems(ctx workflow.Context, state queue.LockState)
GetQueuedRevisionsSummary() string
}

func NewWorkflowRunner(queue deployQueue, w Workflow, internalNotifiers []WorkflowNotifier, additionalNotifiers ...plugins.TerraformWorkflowNotifier) *WorkflowRunner {
return &WorkflowRunner{
Workflow: w,
StateReceiver: &StateReceiver{
Queue: queue,
InternalNotifiers: internalNotifiers,
AdditionalNotifiers: additionalNotifiers,
},
Expand Down
8 changes: 7 additions & 1 deletion server/neptune/workflows/internal/deploy/terraform/state.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package terraform

import (
"reflect"

"github.com/pkg/errors"
"github.com/runatlantis/atlantis/server/metrics"
"github.com/runatlantis/atlantis/server/neptune/workflows/internal/notifier"

"github.com/runatlantis/atlantis/server/neptune/workflows/internal/terraform/state"
"github.com/runatlantis/atlantis/server/neptune/workflows/plugins"
"go.temporal.io/sdk/workflow"
Expand All @@ -18,6 +19,7 @@ type StateReceiver struct {

// We have separate classes of notifiers since we can be more flexible with our internal ones in terms of the data model
// What we support externally should be well thought out so for now this is kept to a minimum.
Queue deployQueue
InternalNotifiers []WorkflowNotifier
AdditionalNotifiers []plugins.TerraformWorkflowNotifier
}
Expand All @@ -39,6 +41,10 @@ func (n *StateReceiver) Receive(ctx workflow.Context, c workflow.ReceiveChannel,
}
}

if workflowState.Apply.Status == state.WaitingJobStatus && reflect.ValueOf(workflowState.Apply.OnWaitingActions).IsZero() {
// lock the queue item
}

for _, notifier := range n.InternalNotifiers {
if err := notifier.Notify(ctx, deploymentInfo.ToInternalInfo(), workflowState); err != nil {
workflow.GetMetricsHandler(ctx).Counter("notifier_failure").Inc(1)
Expand Down

0 comments on commit 556a695

Please sign in to comment.