Skip to content

Commit

Permalink
Add persistence layer and new storage interface
Browse files Browse the repository at this point in the history
This adds persistence options to RUN-DSP by doing the following:

- Bump Go to 1.23.3
- Add new storage interfaces
- Create badger implementation for new interfaces
- Make fields in contracts and transfer reqs public for storage.
- Add some helper methods to contracts and transfer request
  • Loading branch information
ainmosni committed Nov 28, 2024
1 parent 1fd0587 commit 40ab062
Show file tree
Hide file tree
Showing 22 changed files with 895 additions and 109 deletions.
2 changes: 1 addition & 1 deletion .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM docker.io/library/golang:1.22
FROM docker.io/library/golang:1.23.3

RUN apt-get update && \
# Go tools:
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/test-and-lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
strategy:
matrix:
go-version:
- 1.22.5
- 1.23.3
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
Expand All @@ -39,7 +39,7 @@ jobs:
strategy:
matrix:
go-version:
- 1.22.5
- 1.23.3
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
Expand All @@ -56,7 +56,7 @@ jobs:
strategy:
matrix:
go-version:
- 1.22.5
- 1.23.3
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
Expand All @@ -77,7 +77,7 @@ jobs:
strategy:
matrix:
go-version:
- 1.22.5
- 1.23.3
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
Expand Down
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ linters:
- asciicheck
- cyclop
- canonicalheader
- copyloopvar
- decorder
- dupl
- dupword
Expand All @@ -29,7 +30,6 @@ linters:
- errname
- errorlint
- exhaustive
- exportloopref
- fatcontext
- forbidigo
- forcetypeassert
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/golangci/golangci-lint
rev: v1.58.2
rev: v1.62.2
hooks:
- id: golangci-lint
- repo: https://github.com/pre-commit/pre-commit-hooks
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM docker.io/library/golang:1.22.9 AS builder
FROM docker.io/library/golang:1.23.3 AS builder
WORKDIR /app
COPY . ./
RUN make build
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.debug
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
###########################################
# DO NOT USE ANYWHERE NEAR SENSITIVE DATA #
###########################################
FROM docker.io/library/golang:1.22.9 AS builder
FROM docker.io/library/golang:1.23.3 AS builder
WORKDIR /app
COPY . ./
RUN make debug
Expand Down
46 changes: 46 additions & 0 deletions dsp/persistance/badger/agreement_saver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2024 go-dataspace
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package badger

import (
"context"

"github.com/go-dataspace/run-dsp/odrl"
"github.com/google/uuid"
)

func mkAgreementKey(id string) []byte {
return []byte("odrl-agreement-" + id)
}

// GetAgreement gets an agreement by ID.
func (sp *StorageProvider) GetAgreement(
ctx context.Context,
id uuid.UUID,
) (*odrl.Agreement, error) {
key := mkAgreementKey(id.String())
return get[*odrl.Agreement](sp.db, key)
}

// PutAgreement stores an agreement, but should return an error if the agreement ID already
// exists.
func (sp *StorageProvider) PutAgreement(ctx context.Context, agreement *odrl.Agreement) error {
id, err := uuid.Parse(agreement.ID)
if err != nil {
return err
}
key := mkAgreementKey(id.String())
return put(sp.db, key, agreement)
}
69 changes: 69 additions & 0 deletions dsp/persistance/badger/contract_saver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2024 go-dataspace
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//nolint:dupl // Bare minimum of duplicated code
package badger

import (
"context"
"fmt"

"github.com/go-dataspace/run-dsp/dsp/statemachine"
"github.com/go-dataspace/run-dsp/logging"
"github.com/google/uuid"
)

// GetContractR gets a contract and sets the read-only property.
// It does not check any locks, as the database transaction already freezes the view.
func (sp *StorageProvider) GetContractR(
ctx context.Context,
pid uuid.UUID,
role statemachine.DataspaceRole,
) (*statemachine.Contract, error) {
key := statemachine.MkContractKey(pid, role)
logger := logging.Extract(ctx).With("pid", pid, "role", role, "key", string(key))
contract, err := get[*statemachine.Contract](sp.db, key)
if err != nil {
logger.Error("Failed to get contract", "err", err)
return nil, fmt.Errorf("could not get contract: %w", err)
}
contract.SetReadOnly()
return contract, nil
}

// GetContractRW gets a contract but does NOT set the read-only property, allowing changes to be saved.
// It will try to acquire a lock, and if it can't it will panic. The panic will be replaced once
// RUN-DSP reaches beta, but right now we want contract problems to be extremely visible.
func (sp *StorageProvider) GetContractRW(
ctx context.Context,
pid uuid.UUID,
role statemachine.DataspaceRole,
) (*statemachine.Contract, error) {
key := statemachine.MkContractKey(pid, role)
ctx, _ = logging.InjectLabels(ctx, "type", "contract", "pid", pid, "role", role, "key", string(key))
return getLocked[*statemachine.Contract](ctx, sp, key)
}

// PutContract saves a contract to the database.
// If the contract is set to read-only, it will panic as this is a bug in the code.
// It will release the lock after it has saved.
func (sp *StorageProvider) PutContract(ctx context.Context, contract *statemachine.Contract) error {
ctx, _ = logging.InjectLabels(
ctx,
"consumer_pid", contract.ConsumerPID,
"provider_pid", contract.ProviderPID,
"role", contract.Role,
)
return putUnlock(ctx, sp, contract)
}
20 changes: 20 additions & 0 deletions dsp/persistance/badger/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2024 go-dataspace
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package badger contains an implementation of the persistance.StorageProvider interface.
// Badger is a pure-go key-value database, not unlike redis. It is made to be embeddable in
// go applications, and offers both on-disk and in-memory backends.
//
// This is intended to be the default storage backend for RUN-DSP.
package badger
129 changes: 129 additions & 0 deletions dsp/persistance/badger/locking.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright 2024 go-dataspace
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package badger

import (
"context"
"errors"
"fmt"
"time"

"github.com/dgraph-io/badger/v4"
"github.com/go-dataspace/run-dsp/logging"
)

const (
lockTTL = 15 * time.Minute
maxWaitTime = 10 * time.Minute
lockCheckTime = 10 * time.Millisecond

logKey = "lock_key"
)

type lockKey struct {
k []byte
}

func newLockKey(key []byte) lockKey {
return lockKey{
k: append([]byte("lock-"), key...),
}
}

func (l lockKey) key() []byte {
return l.k
}

func (l lockKey) String() string {
return string(l.k)
}

func (sp *StorageProvider) AcquireLock(ctx context.Context, k lockKey) error {
err := sp.waitLock(ctx, k)
if err != nil {
return err
}
return sp.setLock(ctx, k)
}

func (sp *StorageProvider) ReleaseLock(ctx context.Context, k lockKey) error {
logger := logging.Extract(ctx).With(logKey, k.String())
return sp.db.Update(func(txn *badger.Txn) error {
logger.Debug("Attempting to release lock")
err := txn.Delete(k.key())
if err != nil {
logger.Error("Could not release lock", "err", err)
}
return err
})
}

func (sp *StorageProvider) isLocked(ctx context.Context, k lockKey) bool {
logger := logging.Extract(ctx).With(logKey, k.String())
err := sp.db.View(func(txn *badger.Txn) error {
logger.Debug("Checking if lock set")
_, err := txn.Get(k.key())
return err
})
if err != nil {
if errors.Is(err, badger.ErrKeyNotFound) {
logger.Debug("No key found, reporting unlocked")
return false
}
logger.Error("Got an error, reporting locked", "err", err)
return true
}
logger.Debug("No error, reporting locked")
return true
}

func (sp *StorageProvider) setLock(ctx context.Context, k lockKey) error {
logger := logging.Extract(ctx).With("key", k.String())
err := sp.db.Update(func(txn *badger.Txn) error {
logger.Debug("Setting lock")
entry := badger.NewEntry(k.key(), []byte{1}).WithTTL(lockTTL)
return txn.SetEntry(entry)
})
if err != nil {
logger.Error("Couldn't set lock", "err", err)
return err
}
logger.Debug("Lock set")
return nil
}

func (sp *StorageProvider) waitLock(ctx context.Context, k lockKey) error {
logger := logging.Extract(ctx).With("key", k.String())
ticker := time.NewTicker(lockCheckTime)
defer ticker.Stop()
timer := time.NewTicker(maxWaitTime)
defer timer.Stop()
logger.Debug("Starting to wait for lock")
for {
select {
case <-ticker.C:
if sp.isLocked(ctx, k) {
continue
}
return nil
case <-timer.C:
logger.Error("Timeout reached, exiting with error")
return fmt.Errorf("timed out waiting for lock")
case <-ctx.Done():
logger.Info("Shutting down waiting for lock")
return fmt.Errorf("context cancelled")
}
}
}
Loading

0 comments on commit 40ab062

Please sign in to comment.