Skip to content

Commit

Permalink
add orchestrator
Browse files Browse the repository at this point in the history
  • Loading branch information
mcalhoun committed Dec 24, 2023
1 parent 48b776c commit 87f2a5e
Show file tree
Hide file tree
Showing 12 changed files with 250 additions and 5 deletions.
20 changes: 20 additions & 0 deletions fixtures/get-parameters-by-path-output.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[
{
"Name": "/terraform/config/listeners/us-east-2/123456789012",
"Type": "String",
"Value": "https://sqs.us-east-1.amazonaws.com/123456789012/mcal-core-use2-acct1-tf-ssm-sync-orchestrator.fifo",
"Version": 1,
"LastModifiedDate": "2023-12-08T23:59:13.856000-05:00",
"ARN": "arn:aws:ssm:us-east-2:112233445566:parameter/terraform/config/us-east-2/123456789012",
"DataType": "text"
},
{
"Name": "/terraform/config/listeners/us-east-2/112233445566",
"Type": "String",
"Value": "https://sqs.us-east-1.amazonaws.com/112233445566/mcal-core-use2-acct2-tf-ssm-sync-orchestrator.fifo",
"Version": 1,
"LastModifiedDate": "2023-12-08T23:59:13.856000-05:00",
"ARN": "arn:aws:ssm:us-east-2:112233445566:parameter/terraform/config/us-east-2/112233445566",
"DataType": "text"
}
]
1 change: 1 addition & 0 deletions internal/model/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ package model
type Config struct {
OrchestratorQueueURL string `json:"orchestrator_queue_url"`
SSMBasePath string `json:"ssm_base_path"`
SSMConfigPath string `json:"ssm_config_path"`
SSMSharedPath string `json:"ssm_shared_config_path"`
}
1 change: 1 addition & 0 deletions internal/service/aws/awssvciface/sqs_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ package awssvciface

type SQSService interface {
SendMessage(message string, messageGroupId string) error
SetQueueUrl(queueUrl string) error
}
1 change: 1 addition & 0 deletions internal/service/aws/awssvciface/ssm_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ type SSMService interface {
SetRegion(name string) error
DeleteParameter(name string) (*ssm.DeleteParameterOutput, error)
GetParameter(name string) (*ssm.GetParameterOutput, error)
GetParametersByPathPages(input *ssm.GetParametersByPathInput) ([]*ssm.Parameter, error)
SetParameter(name string, paramType string, value string) (*ssm.PutParameterOutput, error)
}
5 changes: 5 additions & 0 deletions internal/service/aws/sqs_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,8 @@ func (s *SQSService) SendMessage(message string, messageGroupId string) error {

return nil
}

func (s *SQSService) SetQueueUrl(url string) error {
s.QueueUrl = url
return nil
}
15 changes: 15 additions & 0 deletions internal/service/aws/ssm_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ func (s *SSMService) GetParameter(name string) (*ssm.GetParameterOutput, error)
})
}

func (s *SSMService) GetParametersByPathPages(input *ssm.GetParametersByPathInput) ([]*ssm.Parameter, error) {
var params []*ssm.Parameter

err := s.client.GetParametersByPathPages(input, func(page *ssm.GetParametersByPathOutput, lastPage bool) bool {
params = append(params, page.Parameters...)
return !lastPage
})

if err != nil {
return nil, err
}

return params, nil
}

func (s *SSMService) SetParameter(name string, paramType string, value string) (*ssm.PutParameterOutput, error) {
return s.client.PutParameter(&ssm.PutParameterInput{
Name: &name,
Expand Down
7 changes: 7 additions & 0 deletions internal/service/config_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ func GetConfig() (*model.Config, error) {
basePath = "/terraform"
}

configSubPath := os.Getenv("SSM_CONFIG_PATH")
if configSubPath == "" {
configSubPath = "config"
}
configPath := fmt.Sprintf("%s/%s", basePath, configSubPath)

sharedSubPath := os.Getenv("SSM_SHARED_PATH")
if sharedSubPath == "" {
sharedSubPath = "shared"
Expand All @@ -27,6 +33,7 @@ func GetConfig() (*model.Config, error) {

return &model.Config{
SSMBasePath: basePath,
SSMConfigPath: configPath,
SSMSharedPath: sharedPath,
OrchestratorQueueURL: queueUrl,
}, nil
Expand Down
87 changes: 87 additions & 0 deletions internal/service/orchestrator_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package service

import (
"fmt"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ssm"
"github.com/cloudposse/ssm-syncronizer/internal/model"
"github.com/cloudposse/ssm-syncronizer/internal/service/aws/awssvciface"
"github.com/cloudposse/ssm-syncronizer/internal/util"
)

type OrchestratorService struct {
AccountService awssvciface.AccountInfoService
ConfigPath string
SSMService awssvciface.SSMService
SQSService awssvciface.SQSService
}

type enabledAccount struct {
Account string `json:"account"`
QueueUrl string `json:"queue_url"`
}

func NewOrchestratorService(accountService awssvciface.AccountInfoService, configPath string, ssmService awssvciface.SSMService, sqsService awssvciface.SQSService) *OrchestratorService {
return &OrchestratorService{
AccountService: accountService,
ConfigPath: configPath,
SSMService: ssmService,
SQSService: sqsService,
}
}

func (s *OrchestratorService) getEnabledListenerAccounts() ([]enabledAccount, error) {
region, err := s.AccountService.GetRegion()
if err != nil {
return nil, err
}

accountsEnabledPath := strings.Join([]string{s.ConfigPath, "listeners", region}, "/")

var accounts []enabledAccount

input := &ssm.GetParametersByPathInput{
Path: aws.String(accountsEnabledPath),
}

params, err := s.SSMService.GetParametersByPathPages(input)
for _, param := range params {
account := strings.Split(*param.Name, fmt.Sprintf("%s/", accountsEnabledPath))[1]
accounts = append(accounts, enabledAccount{Account: account, QueueUrl: *param.Value})
}

if err != nil {
return nil, err
}

return accounts, nil
}

func (s *OrchestratorService) sendToRemoteListener(account string, queue string, event model.ParameterStoreChangeEvent) error {
message, err := util.Marshal(event)
if err != nil {
return err
}

s.SQSService.SetQueueUrl(queue)

Check failure on line 68 in internal/service/orchestrator_service.go

View workflow job for this annotation

GitHub Actions / build

Error return value of `s.SQSService.SetQueueUrl` is not checked (errcheck)
s.SQSService.SendMessage(string(message), event.Detail.Name)

Check failure on line 69 in internal/service/orchestrator_service.go

View workflow job for this annotation

GitHub Actions / build

Error return value of `s.SQSService.SendMessage` is not checked (errcheck)
return nil
}

func (s *OrchestratorService) Sync(event model.ParameterStoreChangeEvent) error {
accounts, err := s.getEnabledListenerAccounts()
if err != nil {
return err
}

for _, account := range accounts {
err := s.sendToRemoteListener(account.Account, account.QueueUrl, event)
if err != nil {
return err
}
}

return nil
}
65 changes: 65 additions & 0 deletions internal/service/orchestrator_service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package service

import (
"testing"

"github.com/aws/aws-sdk-go/service/ssm"
"github.com/cloudposse/ssm-syncronizer/internal/model"
"github.com/cloudposse/ssm-syncronizer/internal/util"
"github.com/cloudposse/ssm-syncronizer/mocks"
"github.com/stretchr/testify/assert"
)

func getParametersValue(t *testing.T) []*ssm.Parameter {
file := "../../fixtures/get-parameters-by-path-output.json"
param, err := util.UnmarshalFile[[]*ssm.Parameter](file)
if err != nil {
t.Fatalf("Failed to unmarshal %s", file)
}
return param
}

func TestOrchestratorService_Sync(t *testing.T) {
tests := []struct {
name string
eventFile string
currentAccount string
currentRegion string
expectError bool
}{
{
name: "Foo",
eventFile: "../../fixtures/param-store-event-same-account-and-region.json",
currentAccount: "111111222222",
currentRegion: "us-east-2",
expectError: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

event, err := util.UnmarshalFile[model.ParameterStoreChangeEvent](tt.eventFile)
if err != nil {
t.Fatalf("Failed to unmarshal event %s", tt.eventFile)
}

getParametersValue := getParametersValue(t)

accountSvc := &mocks.AccountServiceMock{GetAccountResponse: tt.currentAccount, GetRegionResponse: tt.currentRegion}
ssmSvc := &mocks.SSMServiceMock{GetParametersByPathPagesOutput: getParametersValue}
sqsSvc := &mocks.SQSServiceMock{}

svc := &OrchestratorService{AccountService: accountSvc, ConfigPath: "/terraform/config", SSMService: ssmSvc, SQSService: sqsSvc}

err = svc.Sync(event)
if (err != nil) != tt.expectError {
t.Errorf("parseArn() error = %v, expectedErr %v", err, tt.expectError)
return
}

assert.Equal(t, 2, sqsSvc.SendMessageCalls)
assert.Equal(t, 2, sqsSvc.SetQueueUrlCalls)
})
}
}
31 changes: 31 additions & 0 deletions internal/util/marshaller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package util

import (
"testing"

"github.com/stretchr/testify/assert"
)

type testStruct struct {
Name string `json:"name"`
Count int `json:"count"`
}

func TestMarshal(t *testing.T) {
test := testStruct{
Name: "test",
Count: 1,
}

output, err := Marshal(test)
assert.NoError(t, err)
assert.Equal(t, `{"name":"test","count":1}`, string(output))
}

func TestUnmarshal(t *testing.T) {
input := []byte(`{"name":"test","count":1}`)

output, err := Unmarshal(input)
assert.NoError(t, err)
assert.Equal(t, map[string]interface{}{"name": "test", "count": 1.0}, output)
}
9 changes: 8 additions & 1 deletion mocks/sqs_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,15 @@ package mocks

type SQSServiceMock struct {
SendMessageCalls int
SetQueueUrlCalls int
}

func (m *SQSServiceMock) SendMessage(message string, messageGroupId string) error {
func (s *SQSServiceMock) SendMessage(message string, messageGroupId string) error {
s.SendMessageCalls = s.SendMessageCalls + 1
return nil
}

func (s *SQSServiceMock) SetQueueUrl(url string) error {
s.SetQueueUrlCalls = s.SetQueueUrlCalls + 1
return nil
}
13 changes: 9 additions & 4 deletions mocks/ssm_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ type SetParameterCall struct {
}

type SSMServiceMock struct {
PutParameterOutput ssm.PutParameterOutput
GetParameterOutput ssm.GetParameterOutput
SetParameterCalls []SetParameterCall
SetRegionCalls []string
PutParameterOutput ssm.PutParameterOutput
GetParameterOutput ssm.GetParameterOutput
GetParametersByPathPagesOutput []*ssm.Parameter
SetParameterCalls []SetParameterCall
SetRegionCalls []string
}

func (m *SSMServiceMock) SetRegion(name string) error {
Expand All @@ -30,6 +31,10 @@ func (m *SSMServiceMock) GetParameter(name string) (*ssm.GetParameterOutput, err
return &ssm.GetParameterOutput{}, nil
}

func (s *SSMServiceMock) GetParametersByPathPages(input *ssm.GetParametersByPathInput) ([]*ssm.Parameter, error) {
return s.GetParametersByPathPagesOutput, nil
}

func (m *SSMServiceMock) SetParameter(name string, paramType string, value string) (*ssm.PutParameterOutput, error) {
m.SetParameterCalls = append(m.SetParameterCalls, SetParameterCall{Name: name, ParamType: paramType, Value: value})
return &ssm.PutParameterOutput{}, nil
Expand Down

0 comments on commit 87f2a5e

Please sign in to comment.