diff --git a/docs/node-configuration.md b/docs/node-configuration.md index 26565f9e7f..7ba47982c9 100644 --- a/docs/node-configuration.md +++ b/docs/node-configuration.md @@ -18,7 +18,7 @@ node-related settings described in the table below. | --- | --- | --- | --- | | DBConfiguration | [DB Configuration](#DB-Configuration) | | Describes configuration for database. See the [DB Configuration](#DB-Configuration) section for details. | | LogLevel | `string` | "info" | Minimal logged messages level (can be "debug", "info", "warn", "error", "dpanic", "panic" or "fatal"). | -| GarbageCollectionPeriod | `uint32` | 10000 | Controls MPT garbage collection interval (in blocks) for configurations with `RemoveUntraceableBlocks` enabled and `KeepOnlyLatestState` disabled. In this mode the node stores a number of MPT trees (corresponding to `MaxTraceableBlocks` and `StateSyncInterval`), but the DB needs to be clean from old entries from time to time. Doing it too often will cause too much processing overhead, doing it too rarely will leave more useless data in the DB. | +| GarbageCollectionPeriod | `uint32` | 10000 | Controls MPT garbage collection interval (in blocks) for configurations with `RemoveUntraceableBlocks` enabled and `KeepOnlyLatestState` disabled. In this mode the node stores a number of MPT trees (corresponding to `MaxTraceableBlocks` and `StateSyncInterval`), but the DB needs to be clean from old entries from time to time. Doing it too often will cause too much processing overhead (it requires going through the whole DB which can take minutes), doing it too rarely will leave more useless data in the DB. Always compare this to `MaxTraceableBlocks`, values lower than 10% of it are likely too low, values higher than 50% are likely to leave more garbage than is possible to collect. The default value is more aligned with NeoFS networks that have low MTB values, but for N3 mainnet it's too low. | | KeepOnlyLatestState | `bool` | `false` | Specifies if MPT should only store the latest state (or a set of latest states, see `P2PStateExchangeExtensions` section in the ProtocolConfiguration for details). If true, DB size will be smaller, but older roots won't be accessible. This value should remain the same for the same database. | | | LogPath | `string` | "", so only console logging | File path where to store node logs. | | NeoFSBlockFetcher | [NeoFS BlockFetcher Configuration](#NeoFS-BlockFetcher-Configuration) | | NeoFS BlockFetcher module configuration. See the [NeoFS BlockFetcher Configuration](#NeoFS-BlockFetcher-Configuration) section for details. | @@ -100,7 +100,12 @@ DBConfiguration: ``` where: - `Type` is the database type (string value). Supported types: `leveldb`, `boltdb` and - `inmemory` (not recommended for production usage). + `inmemory` (not recommended for production usage). LevelDB is better for archive nodes + that store all data, it better deals with writes in general, but any tail-cutting node + options seriously degrade its performance. BoltDB works much better in various + performance tests, however it can seriously degrade GC in case the DB size is bigger + than the amount of available memory. BoltDB is also more memory-demanding for some + operations, so GC can be problematic from that angle as well. - `LevelDBOptions` are settings for LevelDB. Includes the DB files path and ReadOnly mode toggle. If ReadOnly mode is on, then an error will be returned on attempt to connect to unexisting or empty database. Database doesn't allow changes in this mode, a warning will be logged on DB persist attempts. diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index a223f01bd6..85291b9ee9 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -70,6 +70,13 @@ const ( // either, the next cycle will still do the job (only transfers need this, // MPT won't notice at all). defaultBlockTimesCache = 8 + + // persistSamples is the number of persist velocity samples to use for + // storeBlock limit. + persistSamples = 10 + // persistMinForSampling is the minimal number of keys to take persist + // time into account wrt persist velocity. + persistMinForSampling = 100 ) // stateChangeStage denotes the stage of state modification process. @@ -165,6 +172,14 @@ type Blockchain struct { // Current persisted block count. persistedHeight uint32 + // keysPerPersist is the average number of persisted keys per persist + // time limit. + keysPerPersist uint32 + + // persistCond is signaled each time persist cycle ends, this wakes + // storeBlock if needed (when it has too many queued blocks) + persistCond *sync.Cond + // Index->Timestamp cache for garbage collector. Headers can be gone // by the time it runs, so we use a tiny little cache to sync block // removal (performed in storeBlock()) with transfer/MPT GC (tryRunGC()) @@ -338,6 +353,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl contracts: *native.NewContracts(cfg.ProtocolConfiguration), } + bc.persistCond = sync.NewCond(&bc.lock) bc.gcBlockTimes, _ = lru.New[uint32, uint64](defaultBlockTimesCache) // Never errors for positive size bc.stateRoot = stateroot.NewModule(cfg, bc.VerifyWitness, bc.log, bc.dao.Store) bc.contracts.Designate.StateRootService = bc.stateRoot @@ -1102,7 +1118,7 @@ func (bc *Blockchain) Run() { persistTimer := time.NewTimer(persistInterval) defer func() { persistTimer.Stop() - if _, err := bc.persist(true); err != nil { + if _, err := bc.persist(); err != nil { bc.log.Warn("failed to persist", zap.Error(err)) } if err := bc.dao.Store.Close(); err != nil { @@ -1112,27 +1128,24 @@ func (bc *Blockchain) Run() { close(bc.runToExitCh) }() go bc.notificationDispatcher() - var nextSync bool for { select { case <-bc.stopCh: return case <-persistTimer.C: var oldPersisted uint32 - var gcDur time.Duration if bc.config.Ledger.RemoveUntraceableBlocks { oldPersisted = atomic.LoadUint32(&bc.persistedHeight) } - dur, err := bc.persist(nextSync) + dur, err := bc.persist() if err != nil { - bc.log.Warn("failed to persist blockchain", zap.Error(err)) + bc.log.Error("failed to persist blockchain", zap.Error(err)) } if bc.config.Ledger.RemoveUntraceableBlocks { - gcDur = bc.tryRunGC(oldPersisted) + dur += bc.tryRunGC(oldPersisted) } - nextSync = dur > persistInterval*2 - interval := persistInterval - dur - gcDur + interval := persistInterval - dur interval = max(interval, time.Microsecond) // Reset doesn't work with zero or negative value. persistTimer.Reset(interval) } @@ -1159,8 +1172,8 @@ func (bc *Blockchain) tryRunGC(oldHeight uint32) time.Duration { oldHeight /= bc.config.Ledger.GarbageCollectionPeriod newHeight /= bc.config.Ledger.GarbageCollectionPeriod if tgtBlock > int64(bc.config.Ledger.GarbageCollectionPeriod) && newHeight != oldHeight { - dur = bc.stateRoot.GC(uint32(tgtBlock), bc.store) - dur += bc.removeOldTransfers(uint32(tgtBlock)) + dur = bc.removeOldTransfers(uint32(tgtBlock)) + dur += bc.stateRoot.GC(uint32(tgtBlock), bc.store) } return dur } @@ -1644,7 +1657,7 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error } for index := start; index < stop; index++ { ts, err := kvcache.DeleteBlock(bc.GetHeaderHash(index), bc.config.Ledger.RemoveUntraceableHeaders) - if bc.config.Ledger.RemoveUntraceableHeaders && index%bc.config.Ledger.GarbageCollectionPeriod == 0 { + if index%bc.config.Ledger.GarbageCollectionPeriod == 0 { _ = bc.gcBlockTimes.Add(index, ts) } if err != nil { @@ -1798,6 +1811,14 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error } bc.lock.Lock() + // Wait for a while if we're lagging behind the persistence routine, + // it's too easy to OOM otherwise. Keep in mind that this check can't + // be perfect, so some tolerance (accepting more) is OK to have. + var persistVelocity = atomic.LoadUint32(&bc.keysPerPersist) + for persistVelocity != 0 && uint32(bc.dao.Store.Len()) > persistVelocity*4 { + bc.persistCond.Wait() + } + _, err = aerCache.Persist() if err != nil { bc.lock.Unlock() @@ -2149,7 +2170,7 @@ func (bc *Blockchain) LastBatch() *storage.MemBatch { } // persist flushes current in-memory Store contents to the persistent storage. -func (bc *Blockchain) persist(isSync bool) (time.Duration, error) { +func (bc *Blockchain) persist() (time.Duration, error) { var ( start = time.Now() duration time.Duration @@ -2157,11 +2178,7 @@ func (bc *Blockchain) persist(isSync bool) (time.Duration, error) { err error ) - if isSync { - persisted, err = bc.dao.PersistSync() - } else { - persisted, err = bc.dao.Persist() - } + persisted, err = bc.dao.Persist() if err != nil { return 0, err } @@ -2178,6 +2195,20 @@ func (bc *Blockchain) persist(isSync bool) (time.Duration, error) { return 0, err } duration = time.Since(start) + // Low number of keys is not representative and duration _can_ + // be zero in tests on strange platforms like Windows. + if duration > 0 && persisted > persistMinForSampling { + var ( + currentVelocity = uint32(int64(persisted) * int64(persistInterval) / int64(duration)) + persistVelocity = atomic.LoadUint32(&bc.keysPerPersist) + ) + if persistVelocity != 0 { + currentVelocity = min(currentVelocity, 2*persistVelocity) // Normalize sudden spikes. + currentVelocity = (persistVelocity*(persistSamples-1) + currentVelocity) / persistSamples + } // Otherwise it's the first sample and we take it as is. + atomic.StoreUint32(&bc.keysPerPersist, currentVelocity) + updateEstimatedPersistVelocityMetric(currentVelocity) + } bc.log.Info("persisted to disk", zap.Uint32("blocks", diff), zap.Int("keys", persisted), @@ -2188,6 +2219,7 @@ func (bc *Blockchain) persist(isSync bool) (time.Duration, error) { // update monitoring metrics. updatePersistedHeightMetric(bHeight) } + bc.persistCond.Signal() return duration, nil } diff --git a/pkg/core/blockchain_core_test.go b/pkg/core/blockchain_core_test.go index 7f71a57905..f99fa5ad29 100644 --- a/pkg/core/blockchain_core_test.go +++ b/pkg/core/blockchain_core_test.go @@ -63,7 +63,7 @@ func TestAddBlock(t *testing.T) { assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash()) // This one tests persisting blocks, so it does need to persist() - _, err = bc.persist(false) + _, err = bc.persist() require.NoError(t, err) key := make([]byte, 1+util.Uint256Size) diff --git a/pkg/core/prometheus.go b/pkg/core/prometheus.go index 8e429d0446..b649cd26ae 100644 --- a/pkg/core/prometheus.go +++ b/pkg/core/prometheus.go @@ -22,6 +22,14 @@ var ( Namespace: "neogo", }, ) + // estimatedPersistVelocity prometheus metric. + estimatedPersistVelocity = prometheus.NewGauge( + prometheus.GaugeOpts{ + Help: "Estimation of persist velocity per cycle (1s by default)", + Name: "estimated_persist_velocity", + Namespace: "neogo", + }, + ) // headerHeight prometheus metric. headerHeight = prometheus.NewGauge( prometheus.GaugeOpts{ @@ -44,6 +52,7 @@ func init() { prometheus.MustRegister( blockHeight, persistedHeight, + estimatedPersistVelocity, headerHeight, mempoolUnsortedTx, ) @@ -53,6 +62,10 @@ func updatePersistedHeightMetric(pHeight uint32) { persistedHeight.Set(float64(pHeight)) } +func updateEstimatedPersistVelocityMetric(v uint32) { + estimatedPersistVelocity.Set(float64(v)) +} + func updateHeaderHeightMetric(hHeight uint32) { headerHeight.Set(float64(hHeight)) } diff --git a/pkg/core/storage/memcached_store_test.go b/pkg/core/storage/memcached_store_test.go index 586e5e06ed..cc041b4214 100644 --- a/pkg/core/storage/memcached_store_test.go +++ b/pkg/core/storage/memcached_store_test.go @@ -19,12 +19,14 @@ func TestMemCachedPutGetDelete(t *testing.T) { s.Put(key, value) + require.Equal(t, 1, s.Len()) result, err := s.Get(key) assert.Nil(t, err) require.Equal(t, value, result) s.Delete(key) + require.Equal(t, 1, s.Len()) // deletion marker _, err = s.Get(key) assert.NotNil(t, err) assert.Equal(t, err, ErrKeyNotFound) diff --git a/pkg/core/storage/memory_store.go b/pkg/core/storage/memory_store.go index e4e8c20d6e..dd962cb87d 100644 --- a/pkg/core/storage/memory_store.go +++ b/pkg/core/storage/memory_store.go @@ -66,6 +66,13 @@ func (s *MemoryStore) putChangeSet(puts map[string][]byte, stores map[string][]b } } +// Len returns the number of keys stored. +func (s *MemoryStore) Len() int { + s.mut.RLock() + defer s.mut.RUnlock() + return len(s.mem) + len(s.stor) +} + // Seek implements the Store interface. func (s *MemoryStore) Seek(rng SeekRange, f func(k, v []byte) bool) { s.seek(rng, f, s.mut.RLock, s.mut.RUnlock)