From cefdbd94b2ccaf67c9df807bc180c04ccc470207 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 11 Dec 2024 00:07:27 +0300 Subject: [PATCH] ir: maintain container placement in container contract After https://github.com/nspcc-dev/neofs-contract/pull/438 Container contract now has API for container-side placement/verification operations. But building placement vectors and updating when needed is still a complex operation that should be done on a Go application side. This commit adds such a responsibility for Alphabet nodes. For simplicity, updating is done every epoch and once for _every_ container if netmap has been changed. Additional optimization can be considered. Signed-off-by: Pavel Karpy --- pkg/innerring/processors/container.go | 37 ++++++++++++ .../processors/container/process_container.go | 43 +++++++++++--- .../processors/container/processor.go | 4 ++ .../processors/netmap/cleanup_table.go | 16 +++++- .../processors/netmap/process_epoch.go | 56 ++++++++++++++++++- 5 files changed, 146 insertions(+), 10 deletions(-) create mode 100644 pkg/innerring/processors/container.go diff --git a/pkg/innerring/processors/container.go b/pkg/innerring/processors/container.go new file mode 100644 index 0000000000..59d78e7755 --- /dev/null +++ b/pkg/innerring/processors/container.go @@ -0,0 +1,37 @@ +package processors + +import ( + "fmt" + + cnrcli "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/netmap" +) + +// UpdatePlacementVectors updates placement vectors after a container placement +// change in Container contract. Empty vectors drops container vectors from the +// contract. +func UpdatePlacementVectors(cID cid.ID, cnrCli *cnrcli.Client, vectors [][]netmap.NodeInfo, replicas []uint32) error { + for i, vector := range vectors { + err := cnrCli.AddNextEpochNodes(cID, i, pubKeys(vector)) + if err != nil { + return fmt.Errorf("can't add %d placement vector to Container contract: %w", i, err) + } + } + + err := cnrCli.CommitContainerListUpdate(cID, replicas) + if err != nil { + return fmt.Errorf("can't commit container list to Container contract: %w", err) + } + + return nil +} + +func pubKeys(nodes []netmap.NodeInfo) [][]byte { + res := make([][]byte, 0, len(nodes)) + for _, node := range nodes { + res = append(res, node.PublicKey()) + } + + return res +} diff --git a/pkg/innerring/processors/container/process_container.go b/pkg/innerring/processors/container/process_container.go index 7a1d5ef0fc..fd2ee59c16 100644 --- a/pkg/innerring/processors/container/process_container.go +++ b/pkg/innerring/processors/container/process_container.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/nspcc-dev/neo-go/pkg/network/payload" + "github.com/nspcc-dev/neofs-node/pkg/innerring/processors" cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" "github.com/nspcc-dev/neofs-node/pkg/morph/event" containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container" @@ -26,7 +27,10 @@ type putEvent interface { type putContainerContext struct { e putEvent - d containerSDK.Domain + // must be filled when verifying raw data from e + cID cid.ID + cnr containerSDK.Container + d containerSDK.Domain } // Process a new container from the user by checking the container sanity @@ -55,15 +59,15 @@ func (cp *Processor) processContainerPut(put putEvent) { func (cp *Processor) checkPutContainer(ctx *putContainerContext) error { binCnr := ctx.e.Container() - var cnr containerSDK.Container + ctx.cID = cid.NewFromMarshalledContainer(binCnr) - err := cnr.Unmarshal(binCnr) + err := ctx.cnr.Unmarshal(binCnr) if err != nil { return fmt.Errorf("invalid binary container: %w", err) } err = cp.verifySignature(signatureVerificationData{ - ownerContainer: cnr.Owner(), + ownerContainer: ctx.cnr.Owner(), verb: session.VerbContainerPut, binTokenSession: ctx.e.SessionToken(), binPublicKey: ctx.e.PublicKey(), @@ -75,13 +79,13 @@ func (cp *Processor) checkPutContainer(ctx *putContainerContext) error { } // check homomorphic hashing setting - err = checkHomomorphicHashing(cp.netState, cnr) + err = checkHomomorphicHashing(cp.netState, ctx.cnr) if err != nil { return fmt.Errorf("incorrect homomorphic hashing setting: %w", err) } // check native name and zone - err = checkNNS(ctx, cnr) + err = checkNNS(ctx, ctx.cnr) if err != nil { return fmt.Errorf("NNS: %w", err) } @@ -104,12 +108,37 @@ func (cp *Processor) approvePutContainer(ctx *putContainerContext) { prm.SetZone(ctx.d.Zone()) nr := e.NotaryRequest() - err = cp.cnrClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction, false) + err = cp.cnrClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction, true) if err != nil { cp.log.Error("could not approve put container", zap.Error(err), ) + return + } + + nm, err := cp.netState.NetMap() + if err != nil { + cp.log.Error("could not get netmap for Container contract update", zap.Stringer("cid", ctx.cID), zap.Error(err)) + return + } + + policy := ctx.cnr.PlacementPolicy() + vectors, err := nm.ContainerNodes(policy, ctx.cID) + if err != nil { + cp.log.Error("could not build placement for Container contract update", zap.Stringer("cid", ctx.cID), zap.Error(err)) + return + } + + replicas := make([]uint32, 0, policy.NumberOfReplicas()) + for i := range vectors { + replicas = append(replicas, policy.ReplicaNumberByIndex(i)) + } + + err = processors.UpdatePlacementVectors(ctx.cID, cp.cnrClient, vectors, replicas) + if err != nil { + cp.log.Error("could not update Container contract", zap.Stringer("cid", ctx.cID), zap.Error(err)) + return } } diff --git a/pkg/innerring/processors/container/processor.go b/pkg/innerring/processors/container/processor.go index f27cd77bb4..fccb6128c3 100644 --- a/pkg/innerring/processors/container/processor.go +++ b/pkg/innerring/processors/container/processor.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/morph/client/neofsid" "github.com/nspcc-dev/neofs-node/pkg/morph/event" containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container" + "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -55,6 +56,9 @@ type NetworkState interface { // // which did not allow reading the value. HomomorphicHashDisabled() (bool, error) + + // NetMap must return actual network map. + NetMap() (*netmap.NetMap, error) } // New creates a container contract processor instance. diff --git a/pkg/innerring/processors/netmap/cleanup_table.go b/pkg/innerring/processors/netmap/cleanup_table.go index 226a7d4471..d3cd023146 100644 --- a/pkg/innerring/processors/netmap/cleanup_table.go +++ b/pkg/innerring/processors/netmap/cleanup_table.go @@ -2,6 +2,7 @@ package netmap import ( "bytes" + "slices" "sync" "github.com/nspcc-dev/neofs-sdk-go/netmap" @@ -13,6 +14,8 @@ type ( enabled bool threshold uint64 lastAccess map[string]epochStampWithNodeInfo + + prev netmap.NetMap } epochStamp struct { @@ -36,8 +39,9 @@ func newCleanupTable(enabled bool, threshold uint64) cleanupTable { } } -// Update cleanup table based on on-chain information about netmap. -func (c *cleanupTable) update(snapshot netmap.NetMap, now uint64) { +// Update cleanup table based on on-chain information about netmap. Returned +// value indicates if the composition of network map memebers has changed. +func (c *cleanupTable) update(snapshot netmap.NetMap, now uint64) bool { c.Lock() defer c.Unlock() @@ -64,6 +68,14 @@ func (c *cleanupTable) update(snapshot netmap.NetMap, now uint64) { } c.lastAccess = newMap + + // order is expected to be the same from epoch to epoch + mapChanged := !slices.EqualFunc(c.prev.Nodes(), nmNodes, func(i1 netmap.NodeInfo, i2 netmap.NodeInfo) bool { + return bytes.Equal(i1.PublicKey(), i2.PublicKey()) + }) + c.prev = snapshot + + return mapChanged } // updates last access time of the netmap node by string public key. diff --git a/pkg/innerring/processors/netmap/process_epoch.go b/pkg/innerring/processors/netmap/process_epoch.go index 01e63de45c..ab7d85551e 100644 --- a/pkg/innerring/processors/netmap/process_epoch.go +++ b/pkg/innerring/processors/netmap/process_epoch.go @@ -1,11 +1,15 @@ package netmap import ( + "fmt" + + "github.com/nspcc-dev/neofs-node/pkg/innerring/processors" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/audit" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/governance" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement" cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" + "github.com/nspcc-dev/neofs-sdk-go/netmap" "go.uber.org/zap" ) @@ -65,7 +69,15 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) { } } - np.netmapSnapshot.update(*networkMap, epoch) + if np.netmapSnapshot.update(*networkMap, epoch) { + l.Debug("updating placements in Container contract...") + err = np.updatePlacementInContract(*networkMap, l) + if err != nil { + l.Error("can't update placements in Container contract", zap.Error(err)) + } else { + l.Debug("updated placements in Container contract") + } + } np.handleCleanupTick(netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()}) np.handleNewAudit(audit.NewAuditStartEvent(epoch)) np.handleAuditSettlements(settlement.NewAuditEvent(epoch)) @@ -73,6 +85,48 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) { np.handleNotaryDeposit(ev) } +func (np *Processor) updatePlacementInContract(nm netmap.NetMap, l *zap.Logger) error { + // TODO: https://github.com/nspcc-dev/neofs-node/issues/3045 + cids, err := np.containerWrp.List(nil) + if err != nil { + return fmt.Errorf("can't get containers list: %w", err) + } + + for _, cID := range cids { + l := l.With(zap.Stringer("cid", cID)) + l.Debug("updating container placement in Container contract...") + + cnr, err := np.containerWrp.Get(cID[:]) + if err != nil { + l.Error("can't get container to update its placement in Container contract", zap.Error(err)) + continue + } + + policy := cnr.Value.PlacementPolicy() + + vectors, err := nm.ContainerNodes(policy, cID) + if err != nil { + l.Error("can't build placement vectors for update in Container contract", zap.Error(err)) + continue + } + + replicas := make([]uint32, 0, policy.NumberOfReplicas()) + for i := range vectors { + replicas = append(replicas, policy.ReplicaNumberByIndex(i)) + } + + err = processors.UpdatePlacementVectors(cID, np.containerWrp, vectors, replicas) + if err != nil { + l.Error("can't put placement vectors to Container contract", zap.Error(err)) + continue + } + + l.Debug("updated container placement in Container contract") + } + + return nil +} + // Process new epoch tick by invoking new epoch method in network map contract. func (np *Processor) processNewEpochTick() { if !np.alphabetState.IsAlphabet() {