Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GC and persistence improvements #3787

Merged
merged 7 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions docs/node-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ node-related settings described in the table below.
| --- | --- | --- | --- |
| DBConfiguration | [DB Configuration](#DB-Configuration) | | Describes configuration for database. See the [DB Configuration](#DB-Configuration) section for details. |
| LogLevel | `string` | "info" | Minimal logged messages level (can be "debug", "info", "warn", "error", "dpanic", "panic" or "fatal"). |
| GarbageCollectionPeriod | `uint32` | 10000 | Controls MPT garbage collection interval (in blocks) for configurations with `RemoveUntraceableBlocks` enabled and `KeepOnlyLatestState` disabled. In this mode the node stores a number of MPT trees (corresponding to `MaxTraceableBlocks` and `StateSyncInterval`), but the DB needs to be clean from old entries from time to time. Doing it too often will cause too much processing overhead, doing it too rarely will leave more useless data in the DB. |
| GarbageCollectionPeriod | `uint32` | 10000 | Controls MPT garbage collection interval (in blocks) for configurations with `RemoveUntraceableBlocks` enabled and `KeepOnlyLatestState` disabled. In this mode the node stores a number of MPT trees (corresponding to `MaxTraceableBlocks` and `StateSyncInterval`), but the DB needs to be clean from old entries from time to time. Doing it too often will cause too much processing overhead (it requires going through the whole DB which can take minutes), doing it too rarely will leave more useless data in the DB. Always compare this to `MaxTraceableBlocks`, values lower than 10% of it are likely too low, values higher than 50% are likely to leave more garbage than is possible to collect. The default value is more aligned with NeoFS networks that have low MTB values, but for N3 mainnet it's too low. |
| KeepOnlyLatestState | `bool` | `false` | Specifies if MPT should only store the latest state (or a set of latest states, see `P2PStateExchangeExtensions` section in the ProtocolConfiguration for details). If true, DB size will be smaller, but older roots won't be accessible. This value should remain the same for the same database. | |
| LogPath | `string` | "", so only console logging | File path where to store node logs. |
| NeoFSBlockFetcher | [NeoFS BlockFetcher Configuration](#NeoFS-BlockFetcher-Configuration) | | NeoFS BlockFetcher module configuration. See the [NeoFS BlockFetcher Configuration](#NeoFS-BlockFetcher-Configuration) section for details. |
Expand Down Expand Up @@ -100,7 +100,12 @@ DBConfiguration:
```
where:
- `Type` is the database type (string value). Supported types: `leveldb`, `boltdb` and
`inmemory` (not recommended for production usage).
`inmemory` (not recommended for production usage). LevelDB is better for archive nodes
that store all data, it better deals with writes in general, but any tail-cutting node
options seriously degrade its performance. BoltDB works much better in various
performance tests, however it can seriously degrade GC in case the DB size is bigger
than the amount of available memory. BoltDB is also more memory-demanding for some
operations, so GC can be problematic from that angle as well.
- `LevelDBOptions` are settings for LevelDB. Includes the DB files path and ReadOnly mode toggle.
If ReadOnly mode is on, then an error will be returned on attempt to connect to unexisting or empty
database. Database doesn't allow changes in this mode, a warning will be logged on DB persist attempts.
Expand Down
66 changes: 49 additions & 17 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@
// either, the next cycle will still do the job (only transfers need this,
// MPT won't notice at all).
defaultBlockTimesCache = 8

// persistSamples is the number of persist velocity samples to use for
// storeBlock limit.
persistSamples = 10
// persistMinForSampling is the minimal number of keys to take persist
// time into account wrt persist velocity.
persistMinForSampling = 100
)

// stateChangeStage denotes the stage of state modification process.
Expand Down Expand Up @@ -165,6 +172,14 @@
// Current persisted block count.
persistedHeight uint32

// keysPerPersist is the average number of persisted keys per persist
// time limit.
keysPerPersist uint32

// persistCond is signaled each time persist cycle ends, this wakes
// storeBlock if needed (when it has too many queued blocks)
persistCond *sync.Cond

// Index->Timestamp cache for garbage collector. Headers can be gone
// by the time it runs, so we use a tiny little cache to sync block
// removal (performed in storeBlock()) with transfer/MPT GC (tryRunGC())
Expand Down Expand Up @@ -338,6 +353,7 @@
contracts: *native.NewContracts(cfg.ProtocolConfiguration),
}

bc.persistCond = sync.NewCond(&bc.lock)
bc.gcBlockTimes, _ = lru.New[uint32, uint64](defaultBlockTimesCache) // Never errors for positive size
bc.stateRoot = stateroot.NewModule(cfg, bc.VerifyWitness, bc.log, bc.dao.Store)
bc.contracts.Designate.StateRootService = bc.stateRoot
Expand Down Expand Up @@ -1102,7 +1118,7 @@
persistTimer := time.NewTimer(persistInterval)
defer func() {
persistTimer.Stop()
if _, err := bc.persist(true); err != nil {
if _, err := bc.persist(); err != nil {
bc.log.Warn("failed to persist", zap.Error(err))
}
if err := bc.dao.Store.Close(); err != nil {
Expand All @@ -1112,27 +1128,24 @@
close(bc.runToExitCh)
}()
go bc.notificationDispatcher()
var nextSync bool
for {
select {
case <-bc.stopCh:
return
case <-persistTimer.C:
var oldPersisted uint32
var gcDur time.Duration

if bc.config.Ledger.RemoveUntraceableBlocks {
oldPersisted = atomic.LoadUint32(&bc.persistedHeight)
}
dur, err := bc.persist(nextSync)
dur, err := bc.persist()
if err != nil {
bc.log.Warn("failed to persist blockchain", zap.Error(err))
bc.log.Error("failed to persist blockchain", zap.Error(err))

Check warning on line 1143 in pkg/core/blockchain.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/blockchain.go#L1143

Added line #L1143 was not covered by tests
}
if bc.config.Ledger.RemoveUntraceableBlocks {
gcDur = bc.tryRunGC(oldPersisted)
dur += bc.tryRunGC(oldPersisted)
}
nextSync = dur > persistInterval*2
interval := persistInterval - dur - gcDur
interval := persistInterval - dur
interval = max(interval, time.Microsecond) // Reset doesn't work with zero or negative value.
persistTimer.Reset(interval)
}
Expand All @@ -1159,8 +1172,8 @@
oldHeight /= bc.config.Ledger.GarbageCollectionPeriod
newHeight /= bc.config.Ledger.GarbageCollectionPeriod
if tgtBlock > int64(bc.config.Ledger.GarbageCollectionPeriod) && newHeight != oldHeight {
dur = bc.stateRoot.GC(uint32(tgtBlock), bc.store)
dur += bc.removeOldTransfers(uint32(tgtBlock))
dur = bc.removeOldTransfers(uint32(tgtBlock))
dur += bc.stateRoot.GC(uint32(tgtBlock), bc.store)
}
return dur
}
Expand Down Expand Up @@ -1644,7 +1657,7 @@
}
for index := start; index < stop; index++ {
ts, err := kvcache.DeleteBlock(bc.GetHeaderHash(index), bc.config.Ledger.RemoveUntraceableHeaders)
if bc.config.Ledger.RemoveUntraceableHeaders && index%bc.config.Ledger.GarbageCollectionPeriod == 0 {
if index%bc.config.Ledger.GarbageCollectionPeriod == 0 {
_ = bc.gcBlockTimes.Add(index, ts)
}
if err != nil {
Expand Down Expand Up @@ -1798,6 +1811,14 @@
}

bc.lock.Lock()
// Wait for a while if we're lagging behind the persistence routine,
// it's too easy to OOM otherwise. Keep in mind that this check can't
// be perfect, so some tolerance (accepting more) is OK to have.
var persistVelocity = atomic.LoadUint32(&bc.keysPerPersist)
for persistVelocity != 0 && uint32(bc.dao.Store.Len()) > persistVelocity*4 {
bc.persistCond.Wait()
}

Check warning on line 1820 in pkg/core/blockchain.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/blockchain.go#L1819-L1820

Added lines #L1819 - L1820 were not covered by tests

_, err = aerCache.Persist()
if err != nil {
bc.lock.Unlock()
Expand Down Expand Up @@ -2149,19 +2170,15 @@
}

// persist flushes current in-memory Store contents to the persistent storage.
func (bc *Blockchain) persist(isSync bool) (time.Duration, error) {
func (bc *Blockchain) persist() (time.Duration, error) {
var (
start = time.Now()
duration time.Duration
persisted int
err error
)

if isSync {
persisted, err = bc.dao.PersistSync()
} else {
persisted, err = bc.dao.Persist()
}
persisted, err = bc.dao.Persist()
if err != nil {
return 0, err
}
Expand All @@ -2178,6 +2195,20 @@
return 0, err
}
duration = time.Since(start)
// Low number of keys is not representative and duration _can_
// be zero in tests on strange platforms like Windows.
if duration > 0 && persisted > persistMinForSampling {
var (
currentVelocity = uint32(int64(persisted) * int64(persistInterval) / int64(duration))
persistVelocity = atomic.LoadUint32(&bc.keysPerPersist)
)
if persistVelocity != 0 {
currentVelocity = min(currentVelocity, 2*persistVelocity) // Normalize sudden spikes.
currentVelocity = (persistVelocity*(persistSamples-1) + currentVelocity) / persistSamples
} // Otherwise it's the first sample and we take it as is.
atomic.StoreUint32(&bc.keysPerPersist, currentVelocity)
updateEstimatedPersistVelocityMetric(currentVelocity)
}
bc.log.Info("persisted to disk",
zap.Uint32("blocks", diff),
zap.Int("keys", persisted),
Expand All @@ -2188,6 +2219,7 @@
// update monitoring metrics.
updatePersistedHeightMetric(bHeight)
}
bc.persistCond.Signal()

return duration, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/blockchain_core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestAddBlock(t *testing.T) {
assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash())

// This one tests persisting blocks, so it does need to persist()
_, err = bc.persist(false)
_, err = bc.persist()
require.NoError(t, err)

key := make([]byte, 1+util.Uint256Size)
Expand Down
13 changes: 13 additions & 0 deletions pkg/core/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ var (
Namespace: "neogo",
},
)
// estimatedPersistVelocity prometheus metric.
estimatedPersistVelocity = prometheus.NewGauge(
prometheus.GaugeOpts{
Help: "Estimation of persist velocity per cycle (1s by default)",
Name: "estimated_persist_velocity",
Namespace: "neogo",
},
)
// headerHeight prometheus metric.
headerHeight = prometheus.NewGauge(
prometheus.GaugeOpts{
Expand All @@ -44,6 +52,7 @@ func init() {
prometheus.MustRegister(
blockHeight,
persistedHeight,
estimatedPersistVelocity,
headerHeight,
mempoolUnsortedTx,
)
Expand All @@ -53,6 +62,10 @@ func updatePersistedHeightMetric(pHeight uint32) {
persistedHeight.Set(float64(pHeight))
}

func updateEstimatedPersistVelocityMetric(v uint32) {
estimatedPersistVelocity.Set(float64(v))
}

func updateHeaderHeightMetric(hHeight uint32) {
headerHeight.Set(float64(hHeight))
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/core/storage/memcached_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ func TestMemCachedPutGetDelete(t *testing.T) {

s.Put(key, value)

require.Equal(t, 1, s.Len())
result, err := s.Get(key)
assert.Nil(t, err)
require.Equal(t, value, result)

s.Delete(key)

require.Equal(t, 1, s.Len()) // deletion marker
_, err = s.Get(key)
assert.NotNil(t, err)
assert.Equal(t, err, ErrKeyNotFound)
Expand Down
7 changes: 7 additions & 0 deletions pkg/core/storage/memory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ func (s *MemoryStore) putChangeSet(puts map[string][]byte, stores map[string][]b
}
}

// Len returns the number of keys stored.
func (s *MemoryStore) Len() int {
s.mut.RLock()
defer s.mut.RUnlock()
return len(s.mem) + len(s.stor)
}

// Seek implements the Store interface.
func (s *MemoryStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
s.seek(rng, f, s.mut.RLock, s.mut.RUnlock)
Expand Down
Loading