From d14293892132cdd4a512c4753cd1a75bd61b27d6 Mon Sep 17 00:00:00 2001 From: beer-1 Date: Fri, 29 Nov 2024 12:08:02 +0900 Subject: [PATCH 1/8] introduce mutex for state and lastCommitInfo to avoid race condition betwwen Commit and CreateQueryContext --- baseapp/abci.go | 72 +++++++++++++++++++++++----------------- baseapp/abci_test.go | 49 +++++++++++++++++++++++++++ baseapp/baseapp.go | 16 ++++++--- baseapp/test_helpers.go | 6 ++-- go.mod | 3 +- store/rootmulti/store.go | 33 +++++++++++++----- 6 files changed, 132 insertions(+), 47 deletions(-) diff --git a/baseapp/abci.go b/baseapp/abci.go index d9921a8e13d5..27e5eeabd82a 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -69,12 +69,13 @@ func (app *BaseApp) InitChain(req *abci.InitChainRequest) (*abci.InitChainRespon // initialize states with a correct header app.setState(execModeFinalize, initHeader) app.setState(execModeCheck, initHeader) + finalizeState := app.getState(execModeFinalize) // Store the consensus params in the BaseApp's param store. Note, this must be // done after the finalizeBlockState and context have been set as it's persisted // to state. if req.ConsensusParams != nil { - err := app.StoreConsensusParams(app.finalizeBlockState.Context(), *req.ConsensusParams) + err := app.StoreConsensusParams(finalizeState.Context(), *req.ConsensusParams) if err != nil { return nil, err } @@ -86,13 +87,14 @@ func (app *BaseApp) InitChain(req *abci.InitChainRequest) (*abci.InitChainRespon // handler, the block height is zero by default. However, after Commit is called // the height needs to reflect the true block height. initHeader.Height = req.InitialHeight - app.checkState.SetContext(app.checkState.Context().WithBlockHeader(initHeader). + checkState := app.getState(execModeCheck) + checkState.SetContext(checkState.Context().WithBlockHeader(initHeader). WithHeaderInfo(coreheader.Info{ ChainID: req.ChainId, Height: req.InitialHeight, Time: req.Time, })) - app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockHeader(initHeader). + finalizeState.SetContext(finalizeState.Context().WithBlockHeader(initHeader). WithHeaderInfo(coreheader.Info{ ChainID: req.ChainId, Height: req.InitialHeight, @@ -105,9 +107,9 @@ func (app *BaseApp) InitChain(req *abci.InitChainRequest) (*abci.InitChainRespon } // add block gas meter for any genesis transactions (allow infinite gas) - app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(storetypes.NewInfiniteGasMeter())) + finalizeState.SetContext(finalizeState.Context().WithBlockGasMeter(storetypes.NewInfiniteGasMeter())) - res, err := app.initChainer(app.finalizeBlockState.Context(), req) + res, err := app.initChainer(finalizeState.Context(), req) if err != nil { return nil, err } @@ -604,7 +606,7 @@ func (app *BaseApp) ExtendVote(_ context.Context, req *abci.ExtendVoteRequest) ( // finalizeBlockState context, otherwise we don't get the uncommitted data // from InitChain. if req.Height == app.initialHeight { - ctx, _ = app.finalizeBlockState.Context().CacheContext() + ctx, _ = app.getState(execModeFinalize).Context().CacheContext() } else { ms := app.cms.CacheMultiStore() ctx = sdk.NewContext(ms, false, app.logger).WithStreamingManager(app.streamingManager).WithChainID(app.chainID).WithBlockHeight(req.Height) @@ -684,7 +686,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.VerifyVoteExtensionRequest) (r // finalizeBlockState context, otherwise we don't get the uncommitted data // from InitChain. if req.Height == app.initialHeight { - ctx, _ = app.finalizeBlockState.Context().CacheContext() + ctx, _ = app.getState(execModeFinalize).Context().CacheContext() } else { ms := app.cms.CacheMultiStore() ctx = sdk.NewContext(ms, false, app.logger).WithStreamingManager(app.streamingManager).WithChainID(app.chainID).WithBlockHeight(req.Height) @@ -742,7 +744,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.VerifyVoteExtensionRequest) (r // internalFinalizeBlock executes the block, called by the Optimistic // Execution flow or by the FinalizeBlock ABCI method. The context received is -// only used to handle early cancellation, for anything related to state app.finalizeBlockState.Context() +// only used to handle early cancellation, for anything related to state app.getState(execModeFinalize).Context() // must be used. func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.FinalizeBlockRequest) (*abci.FinalizeBlockResponse, error) { var events []abci.Event @@ -773,12 +775,14 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz // finalizeBlockState should be set on InitChain or ProcessProposal. If it is // nil, it means we are replaying this block and we need to set the state here // given that during block replay ProcessProposal is not executed by CometBFT. - if app.finalizeBlockState == nil { + finalizeState := app.getState(execModeFinalize) + if finalizeState == nil { app.setState(execModeFinalize, header) + finalizeState = app.getState(execModeFinalize) } // Context is now updated with Header information. - app.finalizeBlockState.SetContext(app.finalizeBlockState.Context(). + finalizeState.SetContext(finalizeState.Context(). WithBlockHeader(header). WithHeaderHash(req.Hash). WithHeaderInfo(coreheader.Info{ @@ -788,7 +792,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz Hash: req.Hash, AppHash: app.LastCommitID().Hash, }). - WithConsensusParams(app.GetConsensusParams(app.finalizeBlockState.Context())). + WithConsensusParams(app.GetConsensusParams(finalizeState.Context())). WithVoteInfos(req.DecidedLastCommit.Votes). WithExecMode(sdk.ExecModeFinalize). WithCometInfo(corecomet.Info{ @@ -799,11 +803,11 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz })) // GasMeter must be set after we get a context with updated consensus params. - gasMeter := app.getBlockGasMeter(app.finalizeBlockState.Context()) - app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter)) + gasMeter := app.getBlockGasMeter(finalizeState.Context()) + finalizeState.SetContext(finalizeState.Context().WithBlockGasMeter(gasMeter)) - if app.checkState != nil { - app.checkState.SetContext(app.checkState.Context(). + if checkState := app.getState(execModeCheck); checkState != nil { + checkState.SetContext(checkState.Context(). WithBlockGasMeter(gasMeter). WithHeaderHash(req.Hash)) } @@ -831,8 +835,8 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz events = append(events, beginBlock.Events...) // Reset the gas meter so that the AnteHandlers aren't required to - gasMeter = app.getBlockGasMeter(app.finalizeBlockState.Context()) - app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter)) + gasMeter = app.getBlockGasMeter(finalizeState.Context()) + finalizeState.SetContext(finalizeState.Context().WithBlockGasMeter(gasMeter)) // Iterate over all raw transactions in the proposal and attempt to execute // them, gathering the execution results. @@ -861,11 +865,11 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz txResults = append(txResults, response) } - if app.finalizeBlockState.ms.TracingEnabled() { - app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore) + if finalizeState.ms.TracingEnabled() { + finalizeState.ms = finalizeState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore) } - endBlock, err := app.endBlock(app.finalizeBlockState.Context()) + endBlock, err := app.endBlock(finalizeState.Context()) if err != nil { return nil, err } @@ -879,7 +883,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz } events = append(events, endBlock.Events...) - cp := app.GetConsensusParams(app.finalizeBlockState.Context()) + cp := app.GetConsensusParams(finalizeState.Context()) return &abci.FinalizeBlockResponse{ Events: events, @@ -903,7 +907,7 @@ func (app *BaseApp) FinalizeBlock(req *abci.FinalizeBlockRequest) (res *abci.Fin defer func() { // call the streaming service hooks with the FinalizeBlock messages for _, streamingListener := range app.streamingManager.ABCIListeners { - if streamErr := streamingListener.ListenFinalizeBlock(app.finalizeBlockState.Context(), *req, *res); streamErr != nil { + if streamErr := streamingListener.ListenFinalizeBlock(app.getState(execModeFinalize).Context(), *req, *res); streamErr != nil { app.logger.Error("ListenFinalizeBlock listening hook failed", "height", req.Height, "err", err) if app.streamingManager.StopNodeOnErr { // if StopNodeOnErr is set, we should return the streamErr in order to stop the node @@ -929,7 +933,10 @@ func (app *BaseApp) FinalizeBlock(req *abci.FinalizeBlockRequest) (res *abci.Fin } // if it was aborted, we need to reset the state + app.stateMut.Lock() app.finalizeBlockState = nil + app.stateMut.Unlock() + app.optimisticExec.Reset() } @@ -968,11 +975,12 @@ func (app *BaseApp) checkHalt(height int64, time time.Time) error { // against that height and gracefully halt if it matches the latest committed // height. func (app *BaseApp) Commit() (*abci.CommitResponse, error) { - header := app.finalizeBlockState.Context().BlockHeader() + finalizeState := app.getState(execModeFinalize) + header := finalizeState.Context().BlockHeader() retainHeight := app.GetBlockRetentionHeight(header.Height) if app.precommiter != nil { - app.precommiter(app.finalizeBlockState.Context()) + app.precommiter(finalizeState.Context()) } rms, ok := app.cms.(*rootmulti.Store) @@ -988,7 +996,7 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) { abciListeners := app.streamingManager.ABCIListeners if len(abciListeners) > 0 { - ctx := app.finalizeBlockState.Context() + ctx := finalizeState.Context() blockHeight := ctx.BlockHeight() changeSet := app.cms.PopStateCache() @@ -1013,10 +1021,12 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) { // Commit. Use the header from this latest block. app.setState(execModeCheck, header) + app.stateMut.Lock() app.finalizeBlockState = nil + app.stateMut.Unlock() if app.prepareCheckStater != nil { - app.prepareCheckStater(app.checkState.Context()) + app.prepareCheckStater(app.getState(execModeCheck).Context()) } // The SnapshotIfApplicable method will create the snapshot by starting the goroutine @@ -1034,7 +1044,7 @@ func (app *BaseApp) workingHash() []byte { // Write the FinalizeBlock state into branched storage and commit the MultiStore. // The write to the FinalizeBlock state writes all state transitions to the root // MultiStore (app.cms) so when Commit() is called it persists those values. - app.finalizeBlockState.ms.Write() + app.getState(execModeFinalize).ms.Write() // Get the hash of all writes in order to return the apphash to the comet in finalizeBlock. commitHash := app.cms.WorkingHash() @@ -1181,7 +1191,7 @@ func (app *BaseApp) FilterPeerByID(info string) *abci.QueryResponse { // access any state changes made in InitChain. func (app *BaseApp) getContextForProposal(ctx sdk.Context, height int64) sdk.Context { if height == app.initialHeight { - ctx, _ = app.finalizeBlockState.Context().CacheContext() + ctx, _ = app.getState(execModeFinalize).Context().CacheContext() // clear all context data set during InitChain to avoid inconsistent behavior ctx = ctx.WithHeaderInfo(coreheader.Info{}).WithBlockHeader(cmtproto.Header{}) @@ -1282,8 +1292,8 @@ func (app *BaseApp) CreateQueryContextWithCheckHeader(height int64, prove, check var header *cmtproto.Header isLatest := height == 0 for _, state := range []*state{ - app.checkState, - app.finalizeBlockState, + app.getState(execModeCheck), + app.getState(execModeFinalize), } { if state != nil { // branch the commit multi-store for safety @@ -1396,7 +1406,7 @@ func (app *BaseApp) GetBlockRetentionHeight(commitHeight int64) int64 { // evidence parameters instead of computing an estimated number of blocks based // on the unbonding period and block commitment time as the two should be // equivalent. - cp := app.GetConsensusParams(app.finalizeBlockState.Context()) + cp := app.GetConsensusParams(app.getState(execModeFinalize).Context()) if cp.Evidence != nil && cp.Evidence.MaxAgeNumBlocks > 0 { retentionHeight = commitHeight - cp.Evidence.MaxAgeNumBlocks } diff --git a/baseapp/abci_test.go b/baseapp/abci_test.go index e404f7c47932..4c36c71cb2f2 100644 --- a/baseapp/abci_test.go +++ b/baseapp/abci_test.go @@ -11,6 +11,7 @@ import ( "math/rand" "strconv" "strings" + "sync/atomic" "testing" "time" @@ -2779,3 +2780,51 @@ func TestABCI_Proposal_FailReCheckTx(t *testing.T) { require.NotEmpty(t, res.TxResults[0].Events) require.True(t, res.TxResults[0].IsOK(), fmt.Sprintf("%v", res)) } + +func TestABCI_Race_Commit_Query(t *testing.T) { + suite := NewBaseAppSuite(t, baseapp.SetChainID("test-chain-id")) + app := suite.baseApp + + _, err := app.InitChain(&abci.InitChainRequest{ + ChainId: "test-chain-id", + ConsensusParams: &cmtproto.ConsensusParams{Block: &cmtproto.BlockParams{MaxGas: 5000000}}, + InitialHeight: 1, + }) + require.NoError(t, err) + _, err = app.Commit() + require.NoError(t, err) + + counter := atomic.Uint64{} + counter.Store(0) + + ctx, cancel := context.WithCancel(context.Background()) + queryCreator := func() { + for { + select { + case <-ctx.Done(): + return + default: + _, err := app.CreateQueryContextWithCheckHeader(0, false, false) + require.NoError(t, err) + + counter.Add(1) + } + } + } + + for i := 0; i < 100; i++ { + go queryCreator() + } + + for i := 0; i < 1000; i++ { + _, err = app.FinalizeBlock(&abci.FinalizeBlockRequest{Height: app.LastBlockHeight() + 1}) + require.NoError(t, err) + + _, err = app.Commit() + require.NoError(t, err) + } + + cancel() + + require.Equal(t, int64(1001), app.GetContextForCheckTx(nil).BlockHeight()) +} diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 98adf6e1a94e..730da6881dd8 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -124,6 +124,7 @@ type BaseApp struct { prepareProposalState *state processProposalState *state finalizeBlockState *state + stateMut sync.RWMutex // An inter-block write-through cache provided to the context during the ABCI // FinalizeBlock call. @@ -494,6 +495,9 @@ func (app *BaseApp) setState(mode execMode, h cmtproto.Header) { WithHeaderInfo(headerInfo), } + app.stateMut.Lock() + defer app.stateMut.Unlock() + switch mode { case execModeCheck: baseState.SetContext(baseState.Context().WithIsCheckTx(true).WithMinGasPrices(app.minGasPrices)) @@ -633,6 +637,9 @@ func validateBasicTxMsgs(router *MsgServiceRouter, msgs []sdk.Msg) error { } func (app *BaseApp) getState(mode execMode) *state { + app.stateMut.RLock() + defer app.stateMut.RUnlock() + switch mode { case execModeFinalize: return app.finalizeBlockState @@ -706,7 +713,8 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context func (app *BaseApp) preBlock(req *abci.FinalizeBlockRequest) ([]abci.Event, error) { var events []abci.Event if app.preBlocker != nil { - ctx := app.finalizeBlockState.Context().WithEventManager(sdk.NewEventManager()) + finalizeState := app.getState(execModeFinalize) + ctx := finalizeState.Context().WithEventManager(sdk.NewEventManager()) if err := app.preBlocker(ctx, req); err != nil { return nil, err } @@ -716,7 +724,7 @@ func (app *BaseApp) preBlock(req *abci.FinalizeBlockRequest) ([]abci.Event, erro // GasMeter must be set after we get a context with updated consensus params. gasMeter := app.getBlockGasMeter(ctx) ctx = ctx.WithBlockGasMeter(gasMeter) - app.finalizeBlockState.SetContext(ctx) + finalizeState.SetContext(ctx) events = ctx.EventManager().ABCIEvents() // append PreBlock attributes to all events @@ -738,7 +746,7 @@ func (app *BaseApp) beginBlock(_ *abci.FinalizeBlockRequest) (sdk.BeginBlock, er ) if app.beginBlocker != nil { - resp, err = app.beginBlocker(app.finalizeBlockState.Context()) + resp, err = app.beginBlocker(app.getState(execModeFinalize).Context()) if err != nil { return resp, err } @@ -801,7 +809,7 @@ func (app *BaseApp) endBlock(_ context.Context) (sdk.EndBlock, error) { var endblock sdk.EndBlock if app.endBlocker != nil { - eb, err := app.endBlocker(app.finalizeBlockState.Context()) + eb, err := app.endBlocker(app.getState(execModeFinalize).Context()) if err != nil { return endblock, err } diff --git a/baseapp/test_helpers.go b/baseapp/test_helpers.go index cffc2589f089..93c905f71809 100644 --- a/baseapp/test_helpers.go +++ b/baseapp/test_helpers.go @@ -44,18 +44,18 @@ func (app *BaseApp) SimDeliver(txEncoder sdk.TxEncoder, tx sdk.Tx) (sdk.GasInfo, // SimWriteState is an entrypoint for simulations only. They are not executed during the normal ABCI finalize // block step but later. Therefore, an extra call to the root multi-store (app.cms) is required to write the changes. func (app *BaseApp) SimWriteState() { - app.finalizeBlockState.ms.Write() + app.getState(execModeFinalize).ms.Write() } // NewContextLegacy returns a new sdk.Context with the provided header func (app *BaseApp) NewContextLegacy(isCheckTx bool, header cmtproto.Header) sdk.Context { if isCheckTx { - return sdk.NewContext(app.checkState.ms, true, app.logger). + return sdk.NewContext(app.getState(execModeCheck).ms, true, app.logger). WithMinGasPrices(app.minGasPrices). WithBlockHeader(header) } - return sdk.NewContext(app.finalizeBlockState.ms, false, app.logger).WithBlockHeader(header) + return sdk.NewContext(app.getState(execModeFinalize).ms, false, app.logger).WithBlockHeader(header) } // NewContext returns a new sdk.Context with a empty header diff --git a/go.mod b/go.mod index 173cda441476..b71cb63bf4cc 100644 --- a/go.mod +++ b/go.mod @@ -186,7 +186,6 @@ require ( // TODO remove after all modules have their own go.mods replace ( cosmossdk.io/api => ./api - cosmossdk.io/store => ./store cosmossdk.io/x/bank => ./x/bank cosmossdk.io/x/staking => ./x/staking cosmossdk.io/x/tx => ./x/tx @@ -216,3 +215,5 @@ retract ( // do not use v0.43.0 ) + +replace cosmossdk.io/store => ./store diff --git a/store/rootmulti/store.go b/store/rootmulti/store.go index e821e930de7f..a93adafa42e9 100644 --- a/store/rootmulti/store.go +++ b/store/rootmulti/store.go @@ -60,6 +60,7 @@ type Store struct { db corestore.KVStoreWithBatch logger iavltree.Logger lastCommitInfo *types.CommitInfo + lastCommitInfoMut sync.RWMutex pruningManager *pruning.Manager iavlCacheSize int iavlDisableFastNode bool @@ -288,7 +289,9 @@ func (rs *Store) loadVersion(ver int64, upgrades *types.StoreUpgrades) error { } } + rs.lastCommitInfoMut.Lock() rs.lastCommitInfo = cInfo + rs.lastCommitInfoMut.Unlock() rs.stores = newStores // load any snapshot heights we missed from disk to be pruned on the next run @@ -434,16 +437,24 @@ func (rs *Store) PopStateCache() []*types.StoreKVPair { // LatestVersion returns the latest version in the store func (rs *Store) LatestVersion() int64 { - if rs.lastCommitInfo == nil { + lastCommitInfo := rs.LastCommitInfo() + if lastCommitInfo == nil { return GetLatestVersion(rs.db) } - return rs.lastCommitInfo.Version + return lastCommitInfo.Version +} + +func (rs *Store) LastCommitInfo() *types.CommitInfo { + rs.lastCommitInfoMut.RLock() + defer rs.lastCommitInfoMut.RUnlock() + return rs.lastCommitInfo } // LastCommitID implements Committer/CommitStore. func (rs *Store) LastCommitID() types.CommitID { - if rs.lastCommitInfo == nil { + lastCommitInfo := rs.LastCommitInfo() + if lastCommitInfo == nil { emptyHash := sha256.Sum256([]byte{}) appHash := emptyHash[:] return types.CommitID{ @@ -451,16 +462,16 @@ func (rs *Store) LastCommitID() types.CommitID { Hash: appHash, // set empty apphash to sha256([]byte{}) if info is nil } } - if len(rs.lastCommitInfo.CommitID().Hash) == 0 { + if len(lastCommitInfo.CommitID().Hash) == 0 { emptyHash := sha256.Sum256([]byte{}) appHash := emptyHash[:] return types.CommitID{ - Version: rs.lastCommitInfo.Version, + Version: lastCommitInfo.Version, Hash: appHash, // set empty apphash to sha256([]byte{}) if hash is nil } } - return rs.lastCommitInfo.CommitID() + return lastCommitInfo.CommitID() } // PausePruning temporarily pauses the pruning of all individual stores which implement @@ -499,10 +510,15 @@ func (rs *Store) Commit() types.CommitID { rs.PausePruning(true) // unset the committing flag on all stores to continue the pruning defer rs.PausePruning(false) + rs.lastCommitInfoMut.Lock() rs.lastCommitInfo = commitStores(version, rs.stores, rs.removalMap) + rs.lastCommitInfoMut.Unlock() }() + rs.lastCommitInfoMut.Lock() rs.lastCommitInfo.Timestamp = rs.commitHeader.Time + rs.lastCommitInfoMut.Unlock() + defer rs.flushMetadata(rs.db, version, rs.lastCommitInfo) // remove remnants of removed stores @@ -781,8 +797,9 @@ func (rs *Store) Query(req *types.RequestQuery) (*types.ResponseQuery, error) { // Otherwise, we query for the commit info from disk. var commitInfo *types.CommitInfo - if res.Height == rs.lastCommitInfo.Version { - commitInfo = rs.lastCommitInfo + lastCommitInfo := rs.LastCommitInfo() + if res.Height == lastCommitInfo.Version { + commitInfo = lastCommitInfo } else { commitInfo, err = rs.GetCommitInfo(res.Height) if err != nil { From 298d02eee9a2fb96de952962a22f45e91c83e891 Mon Sep 17 00:00:00 2001 From: beer-1 Date: Fri, 29 Nov 2024 12:12:26 +0900 Subject: [PATCH 2/8] add changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a3515a5b7a9..79e0f491052a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,6 +56,7 @@ Every module contains its own CHANGELOG.md. Please refer to the module you are i * (sims) [#21906](https://github.com/cosmos/cosmos-sdk/pull/21906) Skip sims test when running dry on validators * (cli) [#21919](https://github.com/cosmos/cosmos-sdk/pull/21919) Query address-by-acc-num by account_id instead of id. +* (baseapp) [#22692](https://github.com/cosmos/cosmos-sdk/pull/22692) Add mutex locks for `state` and `lastCommitInfo` to prevent race conditions between `Commit` and `CreateQueryContext`. ### API Breaking Changes From aa5960a8d1da3f52b96b5d1f63a2d2ca4aba2d8b Mon Sep 17 00:00:00 2001 From: beer-1 Date: Fri, 29 Nov 2024 12:39:18 +0900 Subject: [PATCH 3/8] avoid hold locks for entire commitStores --- store/rootmulti/store.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/store/rootmulti/store.go b/store/rootmulti/store.go index a93adafa42e9..2d190975553b 100644 --- a/store/rootmulti/store.go +++ b/store/rootmulti/store.go @@ -510,8 +510,10 @@ func (rs *Store) Commit() types.CommitID { rs.PausePruning(true) // unset the committing flag on all stores to continue the pruning defer rs.PausePruning(false) + + cInfo := commitStores(version, rs.stores, rs.removalMap) rs.lastCommitInfoMut.Lock() - rs.lastCommitInfo = commitStores(version, rs.stores, rs.removalMap) + rs.lastCommitInfo = cInfo rs.lastCommitInfoMut.Unlock() }() From 847f2bc77130d26fc6ce33c0b13333a832246c57 Mon Sep 17 00:00:00 2001 From: beer-1 Date: Mon, 2 Dec 2024 15:58:30 +0900 Subject: [PATCH 4/8] use atomic pointer --- store/rootmulti/store.go | 37 +++++++++++++------------------------ 1 file changed, 13 insertions(+), 24 deletions(-) diff --git a/store/rootmulti/store.go b/store/rootmulti/store.go index 2d190975553b..47c3f6b2caee 100644 --- a/store/rootmulti/store.go +++ b/store/rootmulti/store.go @@ -9,6 +9,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" cmtproto "github.com/cometbft/cometbft/api/cometbft/types/v1" protoio "github.com/cosmos/gogoproto/io" @@ -59,8 +60,7 @@ func keysFromStoreKeyMap[V any](m map[types.StoreKey]V) []types.StoreKey { type Store struct { db corestore.KVStoreWithBatch logger iavltree.Logger - lastCommitInfo *types.CommitInfo - lastCommitInfoMut sync.RWMutex + lastCommitInfo atomic.Pointer[types.CommitInfo] pruningManager *pruning.Manager iavlCacheSize int iavlDisableFastNode bool @@ -289,9 +289,7 @@ func (rs *Store) loadVersion(ver int64, upgrades *types.StoreUpgrades) error { } } - rs.lastCommitInfoMut.Lock() - rs.lastCommitInfo = cInfo - rs.lastCommitInfoMut.Unlock() + rs.lastCommitInfo.Store(cInfo) rs.stores = newStores // load any snapshot heights we missed from disk to be pruned on the next run @@ -437,7 +435,7 @@ func (rs *Store) PopStateCache() []*types.StoreKVPair { // LatestVersion returns the latest version in the store func (rs *Store) LatestVersion() int64 { - lastCommitInfo := rs.LastCommitInfo() + lastCommitInfo := rs.lastCommitInfo.Load() if lastCommitInfo == nil { return GetLatestVersion(rs.db) } @@ -445,15 +443,9 @@ func (rs *Store) LatestVersion() int64 { return lastCommitInfo.Version } -func (rs *Store) LastCommitInfo() *types.CommitInfo { - rs.lastCommitInfoMut.RLock() - defer rs.lastCommitInfoMut.RUnlock() - return rs.lastCommitInfo -} - // LastCommitID implements Committer/CommitStore. func (rs *Store) LastCommitID() types.CommitID { - lastCommitInfo := rs.LastCommitInfo() + lastCommitInfo := rs.lastCommitInfo.Load() if lastCommitInfo == nil { emptyHash := sha256.Sum256([]byte{}) appHash := emptyHash[:] @@ -487,7 +479,7 @@ func (rs *Store) PausePruning(pause bool) { // Commit implements Committer/CommitStore. func (rs *Store) Commit() types.CommitID { var previousHeight, version int64 - if rs.lastCommitInfo.GetVersion() == 0 && rs.initialVersion > 1 { + if rs.lastCommitInfo.Load().GetVersion() == 0 && rs.initialVersion > 1 { // This case means that no commit has been made in the store, we // start from initialVersion. version = rs.initialVersion @@ -497,7 +489,7 @@ func (rs *Store) Commit() types.CommitID { // case we increment the version from there, // - or there was no previous commit, and initial version was not set, // in which case we start at version 1. - previousHeight = rs.lastCommitInfo.GetVersion() + previousHeight = rs.lastCommitInfo.Load().GetVersion() version = previousHeight + 1 } @@ -512,16 +504,13 @@ func (rs *Store) Commit() types.CommitID { defer rs.PausePruning(false) cInfo := commitStores(version, rs.stores, rs.removalMap) - rs.lastCommitInfoMut.Lock() - rs.lastCommitInfo = cInfo - rs.lastCommitInfoMut.Unlock() + rs.lastCommitInfo.Store(cInfo) }() - rs.lastCommitInfoMut.Lock() - rs.lastCommitInfo.Timestamp = rs.commitHeader.Time - rs.lastCommitInfoMut.Unlock() + cInfo := rs.lastCommitInfo.Load() + cInfo.Timestamp = rs.commitHeader.Time - defer rs.flushMetadata(rs.db, version, rs.lastCommitInfo) + defer rs.flushMetadata(rs.db, version, cInfo) // remove remnants of removed stores for sk := range rs.removalMap { @@ -544,7 +533,7 @@ func (rs *Store) Commit() types.CommitID { return types.CommitID{ Version: version, - Hash: rs.lastCommitInfo.Hash(), + Hash: rs.lastCommitInfo.Load().Hash(), } } @@ -799,7 +788,7 @@ func (rs *Store) Query(req *types.RequestQuery) (*types.ResponseQuery, error) { // Otherwise, we query for the commit info from disk. var commitInfo *types.CommitInfo - lastCommitInfo := rs.LastCommitInfo() + lastCommitInfo := rs.lastCommitInfo.Load() if res.Height == lastCommitInfo.Version { commitInfo = lastCommitInfo } else { From 29d1d8fe2a220de99d77e83ff3658f8a76a6edaf Mon Sep 17 00:00:00 2001 From: beer-1 Date: Fri, 27 Dec 2024 14:17:22 +0900 Subject: [PATCH 5/8] remove direct access to state --- baseapp/abci.go | 31 ++++++++++++++----------------- baseapp/baseapp.go | 22 ++++++++++++++++++++++ 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/baseapp/abci.go b/baseapp/abci.go index 2b469df0b147..ba1d58ce3fc7 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -439,7 +439,8 @@ func (app *BaseApp) PrepareProposal(req *abci.PrepareProposalRequest) (resp *abc return nil, errors.New("PrepareProposal called with invalid height") } - app.prepareProposalState.SetContext(app.getContextForProposal(app.prepareProposalState.Context(), req.Height). + prepareProposalState := app.getState(execModePrepareProposal) + prepareProposalState.SetContext(app.getContextForProposal(prepareProposalState.Context(), req.Height). WithVoteInfos(toVoteInfo(req.LocalLastCommit.Votes)). // this is a set of votes that are not finalized yet, wait for commit WithBlockHeight(req.Height). WithProposer(req.ProposerAddress). @@ -456,9 +457,9 @@ func (app *BaseApp) PrepareProposal(req *abci.PrepareProposalRequest) (resp *abc Time: req.Time, })) - app.prepareProposalState.SetContext(app.prepareProposalState.Context(). - WithConsensusParams(app.GetConsensusParams(app.prepareProposalState.Context())). - WithBlockGasMeter(app.getBlockGasMeter(app.prepareProposalState.Context()))) + prepareProposalState.SetContext(prepareProposalState.Context(). + WithConsensusParams(app.GetConsensusParams(prepareProposalState.Context())). + WithBlockGasMeter(app.getBlockGasMeter(prepareProposalState.Context()))) defer func() { if err := recover(); err != nil { @@ -473,7 +474,7 @@ func (app *BaseApp) PrepareProposal(req *abci.PrepareProposalRequest) (resp *abc } }() - resp, err = app.prepareProposal(app.prepareProposalState.Context(), req) + resp, err = app.prepareProposal(prepareProposalState.Context(), req) if err != nil { app.logger.Error("failed to prepare proposal", "height", req.Height, "time", req.Time, "err", err) return &abci.PrepareProposalResponse{Txs: req.Txs}, nil @@ -531,7 +532,8 @@ func (app *BaseApp) ProcessProposal(req *abci.ProcessProposalRequest) (resp *abc app.setState(execModeFinalize, header) } - app.processProposalState.SetContext(app.getContextForProposal(app.processProposalState.Context(), req.Height). + processProposalState := app.getState(execModeProcessProposal) + processProposalState.SetContext(app.getContextForProposal(processProposalState.Context(), req.Height). WithVoteInfos(req.ProposedLastCommit.Votes). // this is a set of votes that are not finalized yet, wait for commit WithBlockHeight(req.Height). WithHeaderHash(req.Hash). @@ -550,9 +552,9 @@ func (app *BaseApp) ProcessProposal(req *abci.ProcessProposalRequest) (resp *abc Time: req.Time, })) - app.processProposalState.SetContext(app.processProposalState.Context(). - WithConsensusParams(app.GetConsensusParams(app.processProposalState.Context())). - WithBlockGasMeter(app.getBlockGasMeter(app.processProposalState.Context()))) + processProposalState.SetContext(processProposalState.Context(). + WithConsensusParams(app.GetConsensusParams(processProposalState.Context())). + WithBlockGasMeter(app.getBlockGasMeter(processProposalState.Context()))) defer func() { if err := recover(); err != nil { @@ -567,7 +569,7 @@ func (app *BaseApp) ProcessProposal(req *abci.ProcessProposalRequest) (resp *abc } }() - resp, err = app.processProposal(app.processProposalState.Context(), req) + resp, err = app.processProposal(processProposalState.Context(), req) if err != nil { app.logger.Error("failed to process proposal", "height", req.Height, "time", req.Time, "hash", fmt.Sprintf("%X", req.Hash), "err", err) return &abci.ProcessProposalResponse{Status: abci.PROCESS_PROPOSAL_STATUS_REJECT}, nil @@ -935,9 +937,7 @@ func (app *BaseApp) FinalizeBlock(req *abci.FinalizeBlockRequest) (res *abci.Fin } // if it was aborted, we need to reset the state - app.stateMut.Lock() - app.finalizeBlockState = nil - app.stateMut.Unlock() + app.clearState(execModeFinalize) app.optimisticExec.Reset() } @@ -1026,10 +1026,7 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) { // NOTE: This is safe because CometBFT holds a lock on the mempool for // Commit. Use the header from this latest block. app.setState(execModeCheck, header) - - app.stateMut.Lock() - app.finalizeBlockState = nil - app.stateMut.Unlock() + app.clearState(execModeFinalize) if app.prepareCheckStater != nil { app.prepareCheckStater(app.getState(execModeCheck).Context()) diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 730da6881dd8..fdb3e1372c47 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -517,6 +517,28 @@ func (app *BaseApp) setState(mode execMode, h cmtproto.Header) { } } +func (app *BaseApp) clearState(mode execMode) { + app.stateMut.Lock() + defer app.stateMut.Unlock() + + switch mode { + case execModeCheck: + app.checkState = nil + + case execModePrepareProposal: + app.prepareProposalState = nil + + case execModeProcessProposal: + app.processProposalState = nil + + case execModeFinalize: + app.finalizeBlockState = nil + + default: + panic(fmt.Sprintf("invalid runTxMode for clearState: %d", mode)) + } +} + // SetCircuitBreaker sets the circuit breaker for the BaseApp. // The circuit breaker is checked on every message execution to verify if a transaction should be executed or not. func (app *BaseApp) SetCircuitBreaker(cb CircuitBreaker) { From 0d6e6a102bffbf0e80c6484eb84e98c89d779acb Mon Sep 17 00:00:00 2001 From: beer-1 Date: Fri, 27 Dec 2024 14:19:50 +0900 Subject: [PATCH 6/8] revert comment --- baseapp/abci.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/baseapp/abci.go b/baseapp/abci.go index ba1d58ce3fc7..86ed3dc5dd94 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -748,7 +748,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.VerifyVoteExtensionRequest) (r // internalFinalizeBlock executes the block, called by the Optimistic // Execution flow or by the FinalizeBlock ABCI method. The context received is -// only used to handle early cancellation, for anything related to state app.getState(execModeFinalize).Context() +// only used to handle early cancellation, for anything related to state app.finalizeBlockState.Context() // must be used. func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.FinalizeBlockRequest) (*abci.FinalizeBlockResponse, error) { var events []abci.Event From c6f902af35444dcf24cbe02cd3e3d03cb1315b7e Mon Sep 17 00:00:00 2001 From: beer-1 Date: Fri, 27 Dec 2024 14:26:34 +0900 Subject: [PATCH 7/8] add note comment --- baseapp/baseapp.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index fdb3e1372c47..7a335d8899fa 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -120,6 +120,10 @@ type BaseApp struct { // // - finalizeBlockState: Used for FinalizeBlock, which is set based on the // previous block's state. This state is committed. + // + // NOTE: The states should be accessed via getter and setter to avoid race conditions. + // - getter: getState + // - setter: setState and clearState checkState *state prepareProposalState *state processProposalState *state From b7854135746c523ca53e0eefe11557f47172c63f Mon Sep 17 00:00:00 2001 From: beer-1 Date: Fri, 27 Dec 2024 14:28:11 +0900 Subject: [PATCH 8/8] update change log --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 73d64a2f3890..af3b73ed2521 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,7 +49,7 @@ Every module contains its own CHANGELOG.md. Please refer to the module you are i ### Bug Fixes -* (baseapp) [#22692](https://github.com/cosmos/cosmos-sdk/pull/22692) Add mutex locks for `state` and `lastCommitInfo` to prevent race conditions between `Commit` and `CreateQueryContext`. +* (baseapp) [#22692](https://github.com/cosmos/cosmos-sdk/pull/22692) Add mutex locks for `state` and make `lastCommitInfo` atomic to prevent race conditions between `Commit` and `CreateQueryContext`. ### API Breaking Changes