Skip to content

Commit

Permalink
core: adjust in-memory processed set dynamically
Browse files Browse the repository at this point in the history
Instead of tick-tocking with sync/async and having an unpredictable data
set we can just try to check for the real amount of keys that be processed
by the underlying DB. Can't be perfect, but still this adds some hard
limit to the amount of in-memory data. It's also adaptive, slower machines
will keep less and faster machines will keep more.

This gives almost perfect 4s cycles for mainnet BoltDB with no tail cutting,
it makes zero sense to process more blocks since we're clearly DB-bound:

2025-01-15T11:35:00.567+0300    INFO    persisted to disk       {"blocks": 1469, "keys": 40579, "headerHeight": 5438141, "blockHeight": 5438140, "velocity": 9912, "took": "4.378939648s"}
2025-01-15T11:35:04.699+0300    INFO    persisted to disk       {"blocks": 1060, "keys": 39976, "headerHeight": 5439201, "blockHeight": 5439200, "velocity": 9888, "took": "4.131985438s"}
2025-01-15T11:35:08.752+0300    INFO    persisted to disk       {"blocks": 1508, "keys": 39658, "headerHeight": 5440709, "blockHeight": 5440708, "velocity": 9877, "took": "4.052347569s"}
2025-01-15T11:35:12.807+0300    INFO    persisted to disk       {"blocks": 1645, "keys": 39565, "headerHeight": 5442354, "blockHeight": 5442353, "velocity": 9864, "took": "4.05547743s"}
2025-01-15T11:35:17.011+0300    INFO    persisted to disk       {"blocks": 1472, "keys": 39519, "headerHeight": 5443826, "blockHeight": 5443825, "velocity": 9817, "took": "4.203258142s"}
2025-01-15T11:35:21.089+0300    INFO    persisted to disk       {"blocks": 1345, "keys": 39529, "headerHeight": 5445171, "blockHeight": 5445170, "velocity": 9804, "took": "4.078297579s"}
2025-01-15T11:35:25.090+0300    INFO    persisted to disk       {"blocks": 1054, "keys": 39326, "headerHeight": 5446225, "blockHeight": 5446224, "velocity": 9806, "took": "4.000524899s"}
2025-01-15T11:35:30.372+0300    INFO    persisted to disk       {"blocks": 1239, "keys": 39349, "headerHeight": 5447464, "blockHeight": 5447463, "velocity": 9744, "took": "4.281444939s"}

2× can be considered, but this calculation isn't perfect for low number of
keys, so somewhat bigger tolerance is preferable for now. Overall it's not
degrading performance, my mainnet/bolt run was even 8% better with this.

Fixes #3249, we don't need any option this way.

Signed-off-by: Roman Khimov <roman@nspcc.ru>
  • Loading branch information
roman-khimov committed Jan 15, 2025
1 parent c07c74d commit c071932
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 11 deletions.
51 changes: 41 additions & 10 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ const (
// either, the next cycle will still do the job (only transfers need this,
// MPT won't notice at all).
defaultBlockTimesCache = 8

// persistSamples is the number of persist velocity samples to use for
// storeBlock limit.
persistSamples = 10
// persistMinForSampling is the minimal number of keys to take persist
// time into account wrt persist velocity.
persistMinForSampling = 100
)

// stateChangeStage denotes the stage of state modification process.
Expand Down Expand Up @@ -165,6 +172,14 @@ type Blockchain struct {
// Current persisted block count.
persistedHeight uint32

// keysPerPersist is the average number of persisted keys per persist
// time limit.
keysPerPersist uint32

// persistCond is signaled each time persist cycle ends, this wakes
// storeBlock if needed (when it has too many queued blocks)
persistCond *sync.Cond

// Index->Timestamp cache for garbage collector. Headers can be gone
// by the time it runs, so we use a tiny little cache to sync block
// removal (performed in storeBlock()) with transfer/MPT GC (tryRunGC())
Expand Down Expand Up @@ -338,6 +353,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
contracts: *native.NewContracts(cfg.ProtocolConfiguration),
}

bc.persistCond = sync.NewCond(&bc.lock)
bc.gcBlockTimes, _ = lru.New[uint32, uint64](defaultBlockTimesCache) // Never errors for positive size
bc.stateRoot = stateroot.NewModule(cfg, bc.VerifyWitness, bc.log, bc.dao.Store)
bc.contracts.Designate.StateRootService = bc.stateRoot
Expand Down Expand Up @@ -1102,7 +1118,7 @@ func (bc *Blockchain) Run() {
persistTimer := time.NewTimer(persistInterval)
defer func() {
persistTimer.Stop()
if _, err := bc.persist(true); err != nil {
if _, err := bc.persist(); err != nil {
bc.log.Warn("failed to persist", zap.Error(err))
}
if err := bc.dao.Store.Close(); err != nil {
Expand All @@ -1112,7 +1128,6 @@ func (bc *Blockchain) Run() {
close(bc.runToExitCh)
}()
go bc.notificationDispatcher()
var nextSync bool
for {
select {
case <-bc.stopCh:
Expand All @@ -1123,14 +1138,13 @@ func (bc *Blockchain) Run() {
if bc.config.Ledger.RemoveUntraceableBlocks {
oldPersisted = atomic.LoadUint32(&bc.persistedHeight)
}
dur, err := bc.persist(nextSync)
dur, err := bc.persist()
if err != nil {
bc.log.Error("failed to persist blockchain", zap.Error(err))

Check warning on line 1143 in pkg/core/blockchain.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/blockchain.go#L1143

Added line #L1143 was not covered by tests
}
if bc.config.Ledger.RemoveUntraceableBlocks {
dur += bc.tryRunGC(oldPersisted)
}
nextSync = dur > persistInterval*2
interval := persistInterval - dur
interval = max(interval, time.Microsecond) // Reset doesn't work with zero or negative value.
persistTimer.Reset(interval)
Expand Down Expand Up @@ -1797,6 +1811,14 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
}

bc.lock.Lock()
// Wait for a while if we're lagging behind the persistence routine,
// it's too easy to OOM otherwise. Keep in mind that this check can't
// be perfect, so some tolerance (accepting more) is OK to have.
var persistVelocity = atomic.LoadUint32(&bc.keysPerPersist)
for persistVelocity != 0 && uint32(bc.dao.Store.Len()) > persistVelocity*4 {
bc.persistCond.Wait()
}

Check warning on line 1820 in pkg/core/blockchain.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/blockchain.go#L1819-L1820

Added lines #L1819 - L1820 were not covered by tests

_, err = aerCache.Persist()
if err != nil {
bc.lock.Unlock()
Expand Down Expand Up @@ -2148,19 +2170,15 @@ func (bc *Blockchain) LastBatch() *storage.MemBatch {
}

// persist flushes current in-memory Store contents to the persistent storage.
func (bc *Blockchain) persist(isSync bool) (time.Duration, error) {
func (bc *Blockchain) persist() (time.Duration, error) {
var (
start = time.Now()
duration time.Duration
persisted int
err error
)

if isSync {
persisted, err = bc.dao.PersistSync()
} else {
persisted, err = bc.dao.Persist()
}
persisted, err = bc.dao.Persist()
if err != nil {
return 0, err
}
Expand All @@ -2177,6 +2195,18 @@ func (bc *Blockchain) persist(isSync bool) (time.Duration, error) {
return 0, err
}
duration = time.Since(start)
if persisted > persistMinForSampling { // Low number of keys is not representative.
var (
currentVelocity = uint32(int64(persisted) * int64(persistInterval) / int64(duration))
persistVelocity = atomic.LoadUint32(&bc.keysPerPersist)
)
if persistVelocity != 0 {
currentVelocity = min(currentVelocity, 2*persistVelocity) // Normalize sudden spikes.
currentVelocity = (persistVelocity*(persistSamples-1) + currentVelocity) / persistSamples
} // Otherwise it's the first sample and we take it as is.
atomic.StoreUint32(&bc.keysPerPersist, currentVelocity)
updateEstimatedPersistVelocityMetric(currentVelocity)
}
bc.log.Info("persisted to disk",
zap.Uint32("blocks", diff),
zap.Int("keys", persisted),
Expand All @@ -2187,6 +2217,7 @@ func (bc *Blockchain) persist(isSync bool) (time.Duration, error) {
// update monitoring metrics.
updatePersistedHeightMetric(bHeight)
}
bc.persistCond.Signal()

return duration, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/blockchain_core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestAddBlock(t *testing.T) {
assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash())

// This one tests persisting blocks, so it does need to persist()
_, err = bc.persist(false)
_, err = bc.persist()
require.NoError(t, err)

key := make([]byte, 1+util.Uint256Size)
Expand Down
13 changes: 13 additions & 0 deletions pkg/core/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ var (
Namespace: "neogo",
},
)
// estimatedPersistVelocity prometheus metric.
estimatedPersistVelocity = prometheus.NewGauge(
prometheus.GaugeOpts{
Help: "Estimation of persist velocity per cycle (1s by default)",
Name: "estimated_persist_velocity",
Namespace: "neogo",
},
)
// headerHeight prometheus metric.
headerHeight = prometheus.NewGauge(
prometheus.GaugeOpts{
Expand All @@ -44,6 +52,7 @@ func init() {
prometheus.MustRegister(
blockHeight,
persistedHeight,
estimatedPersistVelocity,
headerHeight,
mempoolUnsortedTx,
)
Expand All @@ -53,6 +62,10 @@ func updatePersistedHeightMetric(pHeight uint32) {
persistedHeight.Set(float64(pHeight))
}

func updateEstimatedPersistVelocityMetric(v uint32) {
estimatedPersistVelocity.Set(float64(v))
}

func updateHeaderHeightMetric(hHeight uint32) {
headerHeight.Set(float64(hHeight))
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/core/storage/memcached_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ func TestMemCachedPutGetDelete(t *testing.T) {

s.Put(key, value)

require.Equal(t, 1, s.Len())
result, err := s.Get(key)
assert.Nil(t, err)
require.Equal(t, value, result)

s.Delete(key)

require.Equal(t, 1, s.Len()) // deletion marker
_, err = s.Get(key)
assert.NotNil(t, err)
assert.Equal(t, err, ErrKeyNotFound)
Expand Down
7 changes: 7 additions & 0 deletions pkg/core/storage/memory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ func (s *MemoryStore) putChangeSet(puts map[string][]byte, stores map[string][]b
}
}

// Len returns the number of keys stored.
func (s *MemoryStore) Len() int {
s.mut.RLock()
defer s.mut.RUnlock()
return len(s.mem) + len(s.stor)
}

// Seek implements the Store interface.
func (s *MemoryStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
s.seek(rng, f, s.mut.RLock, s.mut.RUnlock)
Expand Down

0 comments on commit c071932

Please sign in to comment.