Skip to content

Commit

Permalink
async checkpoint poc used in devmos 2024 benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
kocubinski committed Oct 29, 2024
1 parent 604c54a commit 97f3b34
Show file tree
Hide file tree
Showing 11 changed files with 444 additions and 259 deletions.
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type TreeMetrics struct {
TreeUpdate int64
TreeNewNode int64
TreeDelete int64
TreeHash int64
}

type DbMetrics struct {
Expand Down
9 changes: 4 additions & 5 deletions multitree.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync/atomic"

"github.com/cosmos/iavl/v2/metrics"
"github.com/dustin/go-humanize"
"golang.org/x/exp/slices"
)

Expand Down Expand Up @@ -186,10 +185,10 @@ func (mt *MultiTree) SaveVersionConcurrently() ([]byte, int64, error) {
mt.shouldCheckpoint = false

if mt.treeOpts.MetricsProxy != nil {
bz := workingBytes.Load()
sz := workingSize.Load()
fmt.Printf("version=%d work-bytes=%s work-size=%s mem-ceiling=%s\n",
version, humanize.IBytes(bz), humanize.Comma(sz), humanize.IBytes(mt.treeOpts.CheckpointMemory))
//bz := workingBytes.Load()
//sz := workingSize.Load()
//fmt.Printf("version=%d work-bytes=%s work-size=%s mem-ceiling=%s\n",
// version, humanize.IBytes(bz), humanize.Comma(sz), humanize.IBytes(mt.treeOpts.CheckpointMemory))
mt.treeOpts.MetricsProxy.SetGauge(float32(workingBytes.Load()), "iavl_v2", "working_bytes")
mt.treeOpts.MetricsProxy.SetGauge(float32(workingSize.Load()), "iavl_v2", "working_size")
}
Expand Down
18 changes: 16 additions & 2 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ type Node struct {
}

func (node *Node) String() string {
return fmt.Sprintf("Node{hash: %x, nodeKey: %s, leftNodeKey: %v, rightNodeKey: %v, size: %d, subtreeHeight: %d, poolId: %d}",
node.hash, node.nodeKey, node.leftNodeKey, node.rightNodeKey, node.size, node.subtreeHeight, node.poolId)
return fmt.Sprintf("Node{hash: %x, nodeKey: %s, leftNodeKey: %v, rightNodeKey: %v, size: %d, subtreeHeight: %d, evict: %t, dirty: %t, poolId: %d}",
node.hash, node.nodeKey, node.leftNodeKey, node.rightNodeKey, node.size, node.subtreeHeight, node.evict, node.dirty, node.poolId)
}

func (node *Node) isLeaf() bool {
Expand Down Expand Up @@ -107,6 +107,13 @@ func (node *Node) getLeftNode(t *Tree) (*Node, error) {
if node.leftNode != nil {
return node.leftNode, nil
}
// check writer cache
var ok bool
node.leftNode, ok = t.sqlWriter.cachePop(node.leftNodeKey)
if ok {
return node.leftNode, nil
}

var err error
node.leftNode, err = t.sql.getLeftNode(node)
if err != nil {
Expand All @@ -122,6 +129,13 @@ func (node *Node) getRightNode(t *Tree) (*Node, error) {
if node.rightNode != nil {
return node.rightNode, nil
}
// check writer cache
var ok bool
node.rightNode, ok = t.sqlWriter.cachePop(node.rightNodeKey)
if ok {
return node.rightNode, nil
}

var err error
node.rightNode, err = t.sql.getRightNode(node)
if err != nil {
Expand Down
17 changes: 14 additions & 3 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (
type NodePool struct {
syncPool *sync.Pool

free chan int
nodes []Node

free chan int
poolId uint64
}

Expand Down Expand Up @@ -54,3 +52,16 @@ func (np *NodePool) Put(node *Node) {
node.poolId = 0
np.syncPool.Put(node)
}

func (np *NodePool) clone(n *Node) *Node {
node := np.Get()
node.leftNodeKey = n.leftNodeKey
node.rightNodeKey = n.rightNodeKey
node.nodeKey = n.nodeKey
node.hash = n.hash
node.key = n.key
node.value = n.value
node.subtreeHeight = n.subtreeHeight
node.size = n.size
return node
}
9 changes: 5 additions & 4 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
"github.com/stretchr/testify/require"
)

func TestNodePool_Get(t *testing.T) {
func Test_TheLimitsOfMySanity(t *testing.T) {
pool := NewNodePool()
node := pool.Get()
node.key = []byte("hello")
require.Equal(t, node.key, pool.nodes[node.poolId].key)
pool.Put(node)
require.Equal(t, []byte(nil), pool.nodes[node.poolId].key)
n2 := pool.clone(node)
require.Equal(t, node.key, n2.key)
node.key = []byte("world")
require.NotEqual(t, node.key, n2.key)
}
10 changes: 6 additions & 4 deletions sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ func defaultSqliteDbOptions(opts SqliteDbOptions) SqliteDbOptions {
opts.MmapSize = 8 * 1024 * 1024 * 1024
}
if opts.WalSize == 0 {
opts.WalSize = 1024 * 1024 * 100
opts.WalSize = 1024 * 1024 * 500
}
opts.ShardTrees = true
opts.walPages = opts.WalSize / os.Getpagesize()
return opts
}
Expand Down Expand Up @@ -722,8 +723,8 @@ func (sql *SqliteDb) getRightNode(node *Node) (*Node, error) {

node.rightNode, err = sql.Get(node.rightNodeKey)
if err != nil {
return nil, fmt.Errorf("failed to get right node node_key=%s height=%d path=%s: %w",
node.rightNodeKey, node.subtreeHeight, sql.opts.Path, err)
return nil, fmt.Errorf("failed to get right node node_key=%s for %v at path=%s: %w",
node.rightNodeKey, node, sql.opts.Path, err)
}
return node.rightNode, nil
}
Expand All @@ -743,7 +744,8 @@ func (sql *SqliteDb) getLeftNode(node *Node) (*Node, error) {

node.leftNode, err = sql.Get(node.leftNodeKey)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get left node node_key=%s for %v at path=%s: %w",
node.leftNodeKey, node, sql.opts.Path, err)
}
return node.leftNode, err
}
Expand Down
31 changes: 19 additions & 12 deletions sqlite_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ import (
)

type sqliteBatch struct {
tree *Tree
// used in leaves, TODO remove
tree *Tree

// used in branch checkpoint
version int64
branchOrphans []NodeKey
branches []*Node

sql *SqliteDb
size int64
logger zerolog.Logger
Expand All @@ -33,11 +40,11 @@ func (b *sqliteBatch) newChangeLogBatch() (err error) {
if err = b.sql.leafWrite.Begin(); err != nil {
return err
}
b.leafInsert, err = b.sql.leafWrite.Prepare("INSERT OR REPLACE INTO leaf (version, sequence, bytes) VALUES (?, ?, ?)")
b.leafInsert, err = b.sql.leafWrite.Prepare("INSERT INTO leaf (version, sequence, bytes) VALUES (?, ?, ?)")
if err != nil {
return err
}
b.deleteInsert, err = b.sql.leafWrite.Prepare("INSERT OR REPLACE INTO leaf_delete (version, sequence, key) VALUES (?, ?, ?)")
b.deleteInsert, err = b.sql.leafWrite.Prepare("INSERT INTO leaf_delete (version, sequence, key) VALUES (?, ?, ?)")
if err != nil {
return err
}
Expand Down Expand Up @@ -233,30 +240,29 @@ func (b *sqliteBatch) saveLeaves() (int64, error) {
}

func (b *sqliteBatch) isCheckpoint() bool {
return len(b.tree.branches) > 0
return len(b.branches) > 0
}

func (b *sqliteBatch) saveBranches() (n int64, err error) {
if b.isCheckpoint() {
tree := b.tree
b.treeCount = 0

shardID, err := tree.sql.nextShard(tree.version)
shardID, err := b.sql.nextShard(b.version)
if err != nil {
return 0, err
}
b.logger.Debug().Msgf("checkpoint db=tree version=%d shard=%d orphans=%s",
tree.version, shardID, humanize.Comma(int64(len(tree.branchOrphans))))
b.logger.Debug().Msgf("checkpoint version=%d shard=%d orphans=%s",
b.version, shardID, humanize.Comma(int64(len(b.branchOrphans))))

if err = b.newTreeBatch(shardID); err != nil {
return 0, err
}

for _, node := range tree.branches {
for _, node := range b.branches {
b.treeCount++
bz, err := node.Bytes()
if err != nil {
return 0, err
return 0, fmt.Errorf("failed to encode node: %v %w", node, err)
}
if err = b.treeInsert.Exec(node.nodeKey.Version(), int(node.nodeKey.Sequence()), bz); err != nil {
return 0, err
Expand All @@ -265,11 +271,12 @@ func (b *sqliteBatch) saveBranches() (n int64, err error) {
return 0, err
}
if node.evict {
tree.returnNode(node)
// TODO, remove tree reference
b.tree.returnNode(node)
}
}

for _, orphan := range tree.branchOrphans {
for _, orphan := range b.branchOrphans {
b.treeCount++
err = b.execBranchOrphan(orphan)
if err != nil {
Expand Down
Loading

0 comments on commit 97f3b34

Please sign in to comment.