diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index d14fa9852f..1bdab7b5fe 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -70,6 +70,13 @@ const ( // either, the next cycle will still do the job (only transfers need this, // MPT won't notice at all). defaultBlockTimesCache = 8 + + // persistSamples is the number of persist velocity samples to use for + // storeBlock limit. + persistSamples = 10 + // persistMinForSampling is the minimal number of keys to take persist + // time into account wrt persist velocity. + persistMinForSampling = 100 ) // stateChangeStage denotes the stage of state modification process. @@ -165,6 +172,14 @@ type Blockchain struct { // Current persisted block count. persistedHeight uint32 + // keysPerPersist is the average number of persisted keys per persist + // time limit. + keysPerPersist uint32 + + // persistCond is signaled each time persist cycle ends, this wakes + // storeBlock if needed (when it has too many queued blocks) + persistCond *sync.Cond + // Index->Timestamp cache for garbage collector. Headers can be gone // by the time it runs, so we use a tiny little cache to sync block // removal (performed in storeBlock()) with transfer/MPT GC (tryRunGC()) @@ -338,6 +353,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl contracts: *native.NewContracts(cfg.ProtocolConfiguration), } + bc.persistCond = sync.NewCond(&bc.lock) bc.gcBlockTimes, _ = lru.New[uint32, uint64](defaultBlockTimesCache) // Never errors for positive size bc.stateRoot = stateroot.NewModule(cfg, bc.VerifyWitness, bc.log, bc.dao.Store) bc.contracts.Designate.StateRootService = bc.stateRoot @@ -1102,7 +1118,7 @@ func (bc *Blockchain) Run() { persistTimer := time.NewTimer(persistInterval) defer func() { persistTimer.Stop() - if _, err := bc.persist(true); err != nil { + if _, err := bc.persist(); err != nil { bc.log.Warn("failed to persist", zap.Error(err)) } if err := bc.dao.Store.Close(); err != nil { @@ -1112,7 +1128,6 @@ func (bc *Blockchain) Run() { close(bc.runToExitCh) }() go bc.notificationDispatcher() - var nextSync bool for { select { case <-bc.stopCh: @@ -1123,14 +1138,13 @@ func (bc *Blockchain) Run() { if bc.config.Ledger.RemoveUntraceableBlocks { oldPersisted = atomic.LoadUint32(&bc.persistedHeight) } - dur, err := bc.persist(nextSync) + dur, err := bc.persist() if err != nil { bc.log.Error("failed to persist blockchain", zap.Error(err)) } if bc.config.Ledger.RemoveUntraceableBlocks { dur += bc.tryRunGC(oldPersisted) } - nextSync = dur > persistInterval*2 interval := persistInterval - dur interval = max(interval, time.Microsecond) // Reset doesn't work with zero or negative value. persistTimer.Reset(interval) @@ -1797,6 +1811,14 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error } bc.lock.Lock() + // Wait for a while if we're lagging behind the persistence routine, + // it's too easy to OOM otherwise. Keep in mind that this check can't + // be perfect, so some tolerance (accepting more) is OK to have. + var persistVelocity = atomic.LoadUint32(&bc.keysPerPersist) + for persistVelocity != 0 && uint32(bc.dao.Store.Len()) > persistVelocity*4 { + bc.persistCond.Wait() + } + _, err = aerCache.Persist() if err != nil { bc.lock.Unlock() @@ -2148,7 +2170,7 @@ func (bc *Blockchain) LastBatch() *storage.MemBatch { } // persist flushes current in-memory Store contents to the persistent storage. -func (bc *Blockchain) persist(isSync bool) (time.Duration, error) { +func (bc *Blockchain) persist() (time.Duration, error) { var ( start = time.Now() duration time.Duration @@ -2156,11 +2178,7 @@ func (bc *Blockchain) persist(isSync bool) (time.Duration, error) { err error ) - if isSync { - persisted, err = bc.dao.PersistSync() - } else { - persisted, err = bc.dao.Persist() - } + persisted, err = bc.dao.Persist() if err != nil { return 0, err } @@ -2177,6 +2195,18 @@ func (bc *Blockchain) persist(isSync bool) (time.Duration, error) { return 0, err } duration = time.Since(start) + if persisted > persistMinForSampling { // Low number of keys is not representative. + var ( + currentVelocity = uint32(int64(persisted) * int64(persistInterval) / int64(duration)) + persistVelocity = atomic.LoadUint32(&bc.keysPerPersist) + ) + if persistVelocity != 0 { + currentVelocity = min(currentVelocity, 2*persistVelocity) // Normalize sudden spikes. + currentVelocity = (persistVelocity*(persistSamples-1) + currentVelocity) / persistSamples + } // Otherwise it's the first sample and we take it as is. + atomic.StoreUint32(&bc.keysPerPersist, currentVelocity) + updateEstimatedPersistVelocityMetric(currentVelocity) + } bc.log.Info("persisted to disk", zap.Uint32("blocks", diff), zap.Int("keys", persisted), @@ -2187,6 +2217,7 @@ func (bc *Blockchain) persist(isSync bool) (time.Duration, error) { // update monitoring metrics. updatePersistedHeightMetric(bHeight) } + bc.persistCond.Signal() return duration, nil } diff --git a/pkg/core/blockchain_core_test.go b/pkg/core/blockchain_core_test.go index 7f71a57905..f99fa5ad29 100644 --- a/pkg/core/blockchain_core_test.go +++ b/pkg/core/blockchain_core_test.go @@ -63,7 +63,7 @@ func TestAddBlock(t *testing.T) { assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash()) // This one tests persisting blocks, so it does need to persist() - _, err = bc.persist(false) + _, err = bc.persist() require.NoError(t, err) key := make([]byte, 1+util.Uint256Size) diff --git a/pkg/core/prometheus.go b/pkg/core/prometheus.go index 8e429d0446..b649cd26ae 100644 --- a/pkg/core/prometheus.go +++ b/pkg/core/prometheus.go @@ -22,6 +22,14 @@ var ( Namespace: "neogo", }, ) + // estimatedPersistVelocity prometheus metric. + estimatedPersistVelocity = prometheus.NewGauge( + prometheus.GaugeOpts{ + Help: "Estimation of persist velocity per cycle (1s by default)", + Name: "estimated_persist_velocity", + Namespace: "neogo", + }, + ) // headerHeight prometheus metric. headerHeight = prometheus.NewGauge( prometheus.GaugeOpts{ @@ -44,6 +52,7 @@ func init() { prometheus.MustRegister( blockHeight, persistedHeight, + estimatedPersistVelocity, headerHeight, mempoolUnsortedTx, ) @@ -53,6 +62,10 @@ func updatePersistedHeightMetric(pHeight uint32) { persistedHeight.Set(float64(pHeight)) } +func updateEstimatedPersistVelocityMetric(v uint32) { + estimatedPersistVelocity.Set(float64(v)) +} + func updateHeaderHeightMetric(hHeight uint32) { headerHeight.Set(float64(hHeight)) } diff --git a/pkg/core/storage/memcached_store_test.go b/pkg/core/storage/memcached_store_test.go index 586e5e06ed..cc041b4214 100644 --- a/pkg/core/storage/memcached_store_test.go +++ b/pkg/core/storage/memcached_store_test.go @@ -19,12 +19,14 @@ func TestMemCachedPutGetDelete(t *testing.T) { s.Put(key, value) + require.Equal(t, 1, s.Len()) result, err := s.Get(key) assert.Nil(t, err) require.Equal(t, value, result) s.Delete(key) + require.Equal(t, 1, s.Len()) // deletion marker _, err = s.Get(key) assert.NotNil(t, err) assert.Equal(t, err, ErrKeyNotFound) diff --git a/pkg/core/storage/memory_store.go b/pkg/core/storage/memory_store.go index e4e8c20d6e..dd962cb87d 100644 --- a/pkg/core/storage/memory_store.go +++ b/pkg/core/storage/memory_store.go @@ -66,6 +66,13 @@ func (s *MemoryStore) putChangeSet(puts map[string][]byte, stores map[string][]b } } +// Len returns the number of keys stored. +func (s *MemoryStore) Len() int { + s.mut.RLock() + defer s.mut.RUnlock() + return len(s.mem) + len(s.stor) +} + // Seek implements the Store interface. func (s *MemoryStore) Seek(rng SeekRange, f func(k, v []byte) bool) { s.seek(rng, f, s.mut.RLock, s.mut.RUnlock)