Skip to content

Commit

Permalink
Fixed DDB table ProvisionedThroughput, raised retry time during lock,…
Browse files Browse the repository at this point in the history
… deleted test tables after testing
  • Loading branch information
greg-szabo committed Sep 24, 2018
1 parent e8275c5 commit 2d25966
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 24 deletions.
40 changes: 21 additions & 19 deletions ddb/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@ import (
// A Mutex is a mutual exclusion lock.
// This version of a Mutex has extra properties for the AWS session and DynamoDB session details.
type Mutex struct {
initialized bool

// Name of the Mutex used in the DynamoDB table.
Name string
// Removed soon
Value string

// Amount of time before a locked mutex is considered abandoned.
Expiry time.Duration
id int64

// The AWS Region where the DynamoDB table resides.
AWSRegion string
Expand All @@ -40,8 +36,14 @@ type Mutex struct {
DDBSession *dynamodb.DynamoDB
// The DynamoDB Table name
DDBTableName string
timeout time.Duration
timeoutSet bool

initialized bool

timeout time.Duration
timeoutSet bool

value string
id int64
}

func (m *Mutex) initialization() (err error) {
Expand All @@ -61,8 +63,8 @@ func (m *Mutex) initialization() (err error) {
m.Name = "Lock"
}

if m.Value == "" {
m.Value = "0"
if m.GetValueString() == "" {
m.SetValueInt64(0)
}

// Create AWS session, if it does not exist
Expand Down Expand Up @@ -108,8 +110,8 @@ func (m *Mutex) initialization() (err error) {
},
// Todo: Make the capacity units configurable
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(1),
WriteCapacityUnits: aws.Int64(1),
ReadCapacityUnits: aws.Int64(5),
WriteCapacityUnits: aws.Int64(5),
},
TableName: aws.String(m.DDBTableName),
})
Expand Down Expand Up @@ -199,7 +201,7 @@ func (m *Mutex) tryLock() (err error) {
}

if value, ok := result.Attributes["Value"]; ok {
m.Value = *value.S
m.SetValueString(*value.S)
}

return
Expand Down Expand Up @@ -228,7 +230,7 @@ func (m *Mutex) tryUnlock() (err error) {
N: aws.String("0"),
},
":value": {
S: aws.String(m.Value),
S: aws.String(m.GetValueString()),
},
},
Key: map[string]*dynamodb.AttributeValue{
Expand Down Expand Up @@ -271,7 +273,7 @@ func (m *Mutex) Lock() {
if started < time.Now().UnixNano()-m.timeout.Nanoseconds() {
panic(errors.New("could not lock mutex"))
} else {
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
continue
}
}
Expand Down Expand Up @@ -306,11 +308,11 @@ func (m *Mutex) Unlock() {
// It does not check if the Mutex was locked beforehand. An unlocked Mutex will return an out-of-sync result.
func (m *Mutex) GetValueInt64() int64 {

if m.Value == "" {
if m.value == "" {
return 0
}

result, err := strconv.ParseInt(m.Value, 10, 64)
result, err := strconv.ParseInt(m.value, 10, 64)
if err != nil {
panic(err.Error())
}
Expand All @@ -323,22 +325,22 @@ func (m *Mutex) GetValueInt64() int64 {
//
// See example(s) at GetValueInt64
func (m *Mutex) SetValueInt64(value int64) {
m.Value = strconv.FormatInt(value, 10)
m.value = strconv.FormatInt(value, 10)
}

// GetValueString gets the value from the Mutex and returns it as a string.
//
// It does not check if the Mutex was locked beforehand. An unlocked Mutex will return an out-of-sync result.
func (m *Mutex) GetValueString() string {
return m.Value
return m.value
}

// SetValueString sets the string value in the Mutex. It does not check if the Mutex was locked beforehand. It does not write
// the value into the database. The value is written to the database during Unlock.
//
// See example(s) at GetValueString
func (m *Mutex) SetValueString(value string) {
m.Value = value
m.value = value
}

// LockAndGetValueString is shorthand for locking the Mutex and retrieving its string value.
Expand Down
24 changes: 19 additions & 5 deletions ddb/sync/sync_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package sync

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/stretchr/testify/assert"
"testing"

Expand All @@ -10,6 +12,12 @@ import (
"time"
)

func DeleteTable(m Mutex) {
m.DDBSession.DeleteTable(&dynamodb.DeleteTableInput{
TableName: aws.String(m.DDBTableName),
})
}

func Test_DDBLock_Default(t *testing.T) {
m := Mutex{}
assert.NotPanics(t, m.Lock)
Expand All @@ -24,6 +32,7 @@ func Test_DDBLock_InitSettings(t *testing.T) {
}
assert.NotPanics(t, m.Lock)
assert.NotPanics(t, m.Unlock)
DeleteTable(m)
}

func Test_DDBLock_ReuseConnection(t *testing.T) {
Expand All @@ -38,6 +47,7 @@ func Test_DDBLock_ReuseConnection(t *testing.T) {
assert.NotPanics(t, m.Unlock)
assert.NotPanics(t, n.Lock)
assert.NotPanics(t, n.Unlock)
DeleteTable(m)
}

func Test_DDBLock_TwoConnectionsAndADeadlock(t *testing.T) {
Expand All @@ -46,6 +56,7 @@ func Test_DDBLock_TwoConnectionsAndADeadlock(t *testing.T) {
n := Mutex{DDBTableName: TableName}
assert.NotPanics(t, m.Lock)
assert.Panics(t, n.Lock)
DeleteTable(m)
}

func Test_DDBLock_CannotUnlockOthers(t *testing.T) {
Expand All @@ -54,6 +65,7 @@ func Test_DDBLock_CannotUnlockOthers(t *testing.T) {
n := Mutex{DDBTableName: TableName}
assert.NotPanics(t, m.Lock)
assert.Panics(t, n.Unlock)
DeleteTable(m)
}

func Test_DDBLock_ParallelCount(t *testing.T) {
Expand All @@ -67,13 +79,14 @@ func Test_DDBLock_ParallelCount(t *testing.T) {
defer wg.Done()
assert.NotPanics(t, m.Lock)
defer assert.NotPanics(t, m.Unlock)
i, _ := strconv.Atoi(m.Value)
m.Value = strconv.Itoa(i + 1)
i := m.GetValueInt64()
m.SetValueInt64(i + 1)
}(m)
}
wg.Wait()
assert.Equal(t, strconv.Itoa(thisMany), m.LockAndGetValueString())
assert.NotPanics(t, m.Unlock)
DeleteTable(m)
}

func Test_ValueTests(t *testing.T) {
Expand All @@ -88,6 +101,7 @@ func Test_ValueTests(t *testing.T) {
assert.Equal(t, m.GetValueInt64(), testValueInt64+1)
assert.Equal(t, m.GetValueString(), strconv.FormatInt(testValueInt64+1, 10))
assert.NotPanics(t, m.Unlock)
DeleteTable(m)
}

func Test_Timeout(t *testing.T) {
Expand All @@ -104,12 +118,13 @@ func Test_Timeout(t *testing.T) {
assert.Equal(t, endTime.Sub(startTime) > timeout, true)
assert.Equal(t, endTime.Sub(startTime) < timeout+time.Second, true)
assert.NotPanics(t, m.Unlock)
DeleteTable(m)
}

func Test_Expiry(t *testing.T) {
timeout := 1 * time.Second
expiry := 3 * time.Second
TableName := fmt.Sprintf("Test-Values-%d", time.Now().Unix())
TableName := fmt.Sprintf("Test-Expiry-%d", time.Now().Unix())
m := Mutex{DDBTableName: TableName, Expiry: expiry}.WithTimeout(timeout)
n := Mutex{DDBTableName: TableName, Expiry: expiry}.WithTimeout(timeout)
assert.NotPanics(t, m.Lock)
Expand All @@ -121,10 +136,9 @@ func Test_Expiry(t *testing.T) {
assert.Equal(t, endTime.Sub(startTime) < 1*time.Second, true)
assert.Panics(t, m.Unlock)
assert.NotPanics(t, n.Unlock)
DeleteTable(m)
}

// Todo: Cleanup test tables at the end

func ExampleMutex_Lock() {
m := Mutex{}
m.Lock()
Expand Down

0 comments on commit 2d25966

Please sign in to comment.