Skip to content

Commit

Permalink
Add contract hooks.
Browse files Browse the repository at this point in the history
This PR adds a GRPC client for the contract service while also
adding control functions to send contract negotiation messages.
This makes it possible to have fine grained control over contract
negotiation approval. This is lacking the termination implementation
but that will be implemented soon in a separate PR.

Changes:

- Split up receive and send steps in state machine.
- Only do receive steps in http handlers.
- Add new control GRPC methods in charge of sending steps.
- Add new mocks for contract service.
- Upgrade GRPC bindings.
- Update mockery.
- Add token verification and storage for the token.
  • Loading branch information
ainmosni committed Jan 22, 2025
1 parent 55abe33 commit bca1a9e
Show file tree
Hide file tree
Showing 28 changed files with 2,111 additions and 907 deletions.
8 changes: 7 additions & 1 deletion .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@
# limitations under the License.

with-expecter: true
issue-845-fix: true
packages:
github.com/go-dataspace/run-dsrpc/gen/go/dsp/v1alpha1:
github.com/go-dataspace/run-dsrpc/gen/go/dsp/v1alpha2:
config:
resolve-type-alias: false
interfaces:
ProviderServiceClient:
config:
ContractServiceClient:
config:
10 changes: 5 additions & 5 deletions dsp/catalog_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

"github.com/go-dataspace/run-dsp/dsp/shared"
"github.com/go-dataspace/run-dsp/logging"
providerv1alpha1 "github.com/go-dataspace/run-dsrpc/gen/go/dsp/v1alpha1"
provider "github.com/go-dataspace/run-dsrpc/gen/go/dsp/v1alpha2"
)

// CatalogError implements HTTPError for catalog requests.
Expand Down Expand Up @@ -80,7 +80,7 @@ func (ch *dspHandlers) catalogRequestHandler(w http.ResponseWriter, req *http.Re
}
logger.Debug("Got catalog request", "req", catalogReq)
// As the filter option is undefined, we will not fill anything
resp, err := ch.provider.GetCatalogue(req.Context(), &providerv1alpha1.GetCatalogueRequest{})
resp, err := ch.provider.GetCatalogue(req.Context(), &provider.GetCatalogueRequest{})
if err != nil {
return grpcErrorHandler(err)
}
Expand Down Expand Up @@ -117,7 +117,7 @@ func (ch *dspHandlers) datasetRequestHandler(w http.ResponseWriter, req *http.Re
return catalogError(err.Error(), http.StatusBadRequest, "400", "Invalid dataset request")
}
logger.Debug("Got dataset request", "req", datasetReq)
resp, err := ch.provider.GetDataset(ctx, &providerv1alpha1.GetDatasetRequest{
resp, err := ch.provider.GetDataset(ctx, &provider.GetDatasetRequest{
DatasetId: paramID,
})
if err != nil {
Expand All @@ -131,7 +131,7 @@ func (ch *dspHandlers) datasetRequestHandler(w http.ResponseWriter, req *http.Re
return nil
}

func processProviderDataset(pds *providerv1alpha1.Dataset, service shared.DataService) shared.Dataset {
func processProviderDataset(pds *provider.Dataset, service shared.DataService) shared.Dataset {
var checksum *shared.Checksum
cs := pds.GetChecksum()
if cs != nil {
Expand Down Expand Up @@ -180,7 +180,7 @@ func processProviderDataset(pds *providerv1alpha1.Dataset, service shared.DataSe
return ds
}

func processProviderCatalogue(gdc []*providerv1alpha1.Dataset, service shared.DataService) []shared.Dataset {
func processProviderCatalogue(gdc []*provider.Dataset, service shared.DataService) []shared.Dataset {
datasets := make([]shared.Dataset, len(gdc))
for i, f := range gdc {
datasets[i] = processProviderDataset(f, service)
Expand Down
5 changes: 3 additions & 2 deletions dsp/common_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ import (
"github.com/go-dataspace/run-dsp/dsp/shared"
"github.com/go-dataspace/run-dsp/dsp/statemachine"
"github.com/go-dataspace/run-dsp/internal/constants"
providerv1 "github.com/go-dataspace/run-dsrpc/gen/go/dsp/v1alpha1"
provider "github.com/go-dataspace/run-dsrpc/gen/go/dsp/v1alpha2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type dspHandlers struct {
store persistence.StorageProvider
provider providerv1.ProviderServiceClient
provider provider.ProviderServiceClient
contractService provider.ContractServiceClient
reconciler statemachine.Reconciler
selfURL *url.URL
dataserviceID string
Expand Down
19 changes: 10 additions & 9 deletions dsp/contract_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,30 +175,31 @@ func processMessage[T any](
req.Context(),
contract,
dh.provider,
dh.contractService,
dh.reconciler,
)

ctx, nextState, err := pState.Recv(ctx, msg)
ctx, apply, err := pState.Recv(ctx, msg)
if err != nil {
return contractError(fmt.Sprintf("invalid request: %s", err),
http.StatusBadRequest, "400", "Invalid request", pState.GetContract())
}

apply, err := nextState.Send(ctx)
if err != nil {
return contractError(fmt.Sprintf("couldn't progress to next state: %s", err.Error()),
http.StatusInternalServerError, "500", "Not able to progress state", nextState.GetContract())
}
err = storeNegotiation(ctx, dh.store, nextState.GetContract())
err = storeNegotiation(ctx, dh.store, pState.GetContract())
if err != nil {
return err
}

if err := shared.EncodeValid(w, req, http.StatusOK, nextState.GetContractNegotiation()); err != nil {
if err := apply(); err != nil {
return contractError(fmt.Sprintf("failed to propagate: %s", err),
http.StatusInternalServerError, "500", "Internal error", pState.GetContract(),
)
}

if err := shared.EncodeValid(w, req, http.StatusOK, pState.GetContractNegotiation()); err != nil {
logger.Error("Couldn't serve response", "err", err)
}

go apply()
return nil
}

Expand Down
Loading

0 comments on commit bca1a9e

Please sign in to comment.