Skip to content

Commit

Permalink
Add bound options to txn.NewIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
weiihann committed Jan 2, 2025
1 parent 417bb0a commit b774f25
Show file tree
Hide file tree
Showing 21 changed files with 72 additions and 43 deletions.
23 changes: 10 additions & 13 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,20 +488,16 @@ func BlockByNumber(txn db.Transaction, number uint64) (*core.Block, error) {
}

func TransactionsByBlockNumber(txn db.Transaction, number uint64) ([]core.Transaction, error) {
iterator, err := txn.NewIterator()
numBytes := core.MarshalBlockNumber(number)
prefix := db.TransactionsByBlockNumberAndIndex.Key(numBytes)

iterator, err := txn.NewIterator(prefix, true)
if err != nil {
return nil, err
}

var txs []core.Transaction
numBytes := core.MarshalBlockNumber(number)

prefix := db.TransactionsByBlockNumberAndIndex.Key(numBytes)
for iterator.Seek(prefix); iterator.Valid(); iterator.Next() {
if !bytes.HasPrefix(iterator.Key(), prefix) {
break
}

for iterator.Next(); iterator.Valid(); iterator.Next() {
val, vErr := iterator.Value()
if vErr != nil {
return nil, utils.RunAndWrapOnError(iterator.Close, vErr)
Expand All @@ -523,16 +519,17 @@ func TransactionsByBlockNumber(txn db.Transaction, number uint64) ([]core.Transa
}

func receiptsByBlockNumber(txn db.Transaction, number uint64) ([]*core.TransactionReceipt, error) {
iterator, err := txn.NewIterator()
numBytes := core.MarshalBlockNumber(number)
prefix := db.ReceiptsByBlockNumberAndIndex.Key(numBytes)

iterator, err := txn.NewIterator(prefix, true)
if err != nil {
return nil, err
}

var receipts []*core.TransactionReceipt
numBytes := core.MarshalBlockNumber(number)

prefix := db.ReceiptsByBlockNumberAndIndex.Key(numBytes)
for iterator.Seek(prefix); iterator.Valid(); iterator.Next() {
for iterator.First(); iterator.Valid(); iterator.Next() {
if !bytes.HasPrefix(iterator.Key(), prefix) {
break
}
Expand Down
2 changes: 1 addition & 1 deletion blockchain/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ func TestRevert(t *testing.T) {

t.Run("empty blockchain should mean empty db", func(t *testing.T) {
require.NoError(t, testdb.View(func(txn db.Transaction) error {
it, err := txn.NewIterator()
it, err := txn.NewIterator(nil, false)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion core/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (h *history) deleteLog(key []byte, height uint64) error {
}

func (h *history) valueAt(key []byte, height uint64) ([]byte, error) {
it, err := h.txn.NewIterator()
it, err := h.txn.NewIterator(nil, false)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion core/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func TestRevert(t *testing.T) {

t.Run("empty state should mean empty db", func(t *testing.T) {
require.NoError(t, testDB.View(func(txn db.Transaction) error {
it, err := txn.NewIterator()
it, err := txn.NewIterator(nil, false)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion db/buffered_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,6 @@ func (t *BufferedTransaction) Impl() any {
}

// NewIterator : see db.Transaction.NewIterator
func (t *BufferedTransaction) NewIterator() (Iterator, error) {
func (t *BufferedTransaction) NewIterator(_ []byte, _ bool) (Iterator, error) {

Check warning on line 82 in db/buffered_transaction.go

View check run for this annotation

Codecov / codecov/patch

db/buffered_transaction.go#L82

Added line #L82 was not covered by tests
return nil, errors.New("buffered transactions dont support iterators")
}
5 changes: 4 additions & 1 deletion db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type Iterator interface {
// Valid returns true if the iterator is positioned at a valid key/value pair.
Valid() bool

// First moves the iterator to the first key/value pair.
First() bool

// Next moves the iterator to the next key/value pair. It returns whether the
// iterator is valid after the call. Once invalid, the iterator remains
// invalid.
Expand All @@ -63,7 +66,7 @@ type Iterator interface {
// the transaction is committed.
type Transaction interface {
// NewIterator returns an iterator over the database's key/value pairs.
NewIterator() (Iterator, error)
NewIterator(lowerBound []byte, withUpperBound bool) (Iterator, error)
// Discard discards all the changes done to the database with this transaction
Discard() error
// Commit flushes all the changes pending on this transaction to the database, making the changes visible to other
Expand Down
2 changes: 1 addition & 1 deletion db/memory_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func NewMemTransaction() Transaction {
return &memTransaction{storage: make(map[string][]byte)}
}

func (t *memTransaction) NewIterator() (Iterator, error) {
func (t *memTransaction) NewIterator(_ []byte, _ bool) (Iterator, error) {

Check warning on line 17 in db/memory_transaction.go

View check run for this annotation

Codecov / codecov/patch

db/memory_transaction.go#L17

Added line #L17 was not covered by tests
return nil, errors.New("not implemented")
}

Expand Down
9 changes: 7 additions & 2 deletions db/pebble/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,20 @@ func (b *batch) Get(key []byte, cb func([]byte) error) error {
}

// NewIterator : see db.Transaction.NewIterator
func (b *batch) NewIterator() (db.Iterator, error) {
func (b *batch) NewIterator(lowerBound []byte, withUpperBound bool) (db.Iterator, error) {
var iter *pebble.Iterator
var err error

if b.batch == nil {
return nil, ErrDiscardedTransaction
}

iter, err = b.batch.NewIter(nil)
iterOpt := &pebble.IterOptions{LowerBound: lowerBound}
if withUpperBound {
iterOpt.UpperBound = upperBound(lowerBound)
}

iter, err = b.batch.NewIter(iterOpt)
if err != nil {
return nil, err
}
Expand Down
8 changes: 8 additions & 0 deletions db/pebble/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@ func CalculatePrefixSize(ctx context.Context, pDB *DB, prefix []byte, withUpperB
return item, utils.RunAndWrapOnError(it.Close, err)
}

// Calculates the next possible prefix after the given prefix bytes.
// It's used to establish an upper boundary for prefix-based database scans.
// Examples:
//
// [1] -> [2]
// [1, 255, 255] -> [2]
// [1, 2, 255] -> [1, 3]
// [255, 255] -> nil
func upperBound(prefix []byte) []byte {
var ub []byte

Expand Down
10 changes: 5 additions & 5 deletions db/pebble/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func TestSeek(t *testing.T) {
require.NoError(t, txn.Set([]byte{3}, []byte{3}))

t.Run("seeks to the next key in lexicographical order", func(t *testing.T) {
iter, err := txn.NewIterator()
iter, err := txn.NewIterator(nil, false)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, iter.Close())
Expand All @@ -275,7 +275,7 @@ func TestSeek(t *testing.T) {
})

t.Run("key returns nil when seeking nonexistent data", func(t *testing.T) {
iter, err := txn.NewIterator()
iter, err := txn.NewIterator(nil, false)
require.NoError(t, err)

t.Cleanup(func() {
Expand Down Expand Up @@ -314,7 +314,7 @@ func TestPrefixSearch(t *testing.T) {
}))

require.NoError(t, testDB.View(func(txn db.Transaction) error {
iter, err := txn.NewIterator()
iter, err := txn.NewIterator(nil, false)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, iter.Close())
Expand Down Expand Up @@ -359,7 +359,7 @@ func TestNext(t *testing.T) {
require.NoError(t, txn.Set([]byte{2}, []byte{2}))

t.Run("Next() on new iterator", func(t *testing.T) {
it, err := txn.NewIterator()
it, err := txn.NewIterator(nil, false)
require.NoError(t, err)

t.Run("new iterator should be invalid", func(t *testing.T) {
Expand All @@ -374,7 +374,7 @@ func TestNext(t *testing.T) {
})

t.Run("Next() should work as expected after a Seek()", func(t *testing.T) {
it, err := txn.NewIterator()
it, err := txn.NewIterator(nil, false)
require.NoError(t, err)

require.True(t, it.Seek([]byte{0}))
Expand Down
5 changes: 5 additions & 0 deletions db/pebble/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ func (i *iterator) Value() ([]byte, error) {
return buf, nil
}

func (i *iterator) First() bool {
i.positioned = true
return i.iter.First()
}

// Next : see db.Transaction.Iterator.Next
func (i *iterator) Next() bool {
if !i.positioned {
Expand Down
9 changes: 7 additions & 2 deletions db/pebble/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,20 @@ func (s *snapshot) Get(key []byte, cb func([]byte) error) error {
}

// NewIterator : see db.Transaction.NewIterator
func (s *snapshot) NewIterator() (db.Iterator, error) {
func (s *snapshot) NewIterator(lowerBound []byte, withUpperBound bool) (db.Iterator, error) {
var iter *pebble.Iterator
var err error

if s.snapshot == nil {
return nil, ErrDiscardedTransaction
}

iter, err = s.snapshot.NewIter(nil)
iterOpt := &pebble.IterOptions{LowerBound: lowerBound}
if withUpperBound {
iterOpt.UpperBound = upperBound(lowerBound)
}

iter, err = s.snapshot.NewIter(iterOpt)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions db/remote/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestRemote(t *testing.T) {

t.Run("iterate", func(t *testing.T) {
err := remoteDB.View(func(txn db.Transaction) error {
it, err := txn.NewIterator()
it, err := txn.NewIterator(nil, false)
if err != nil {
return err
}
Expand All @@ -85,7 +85,7 @@ func TestRemote(t *testing.T) {

t.Run("seek", func(t *testing.T) {
err := remoteDB.View(func(txn db.Transaction) error {
it, err := txn.NewIterator()
it, err := txn.NewIterator(nil, false)
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions db/remote/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ func (i *iterator) Value() ([]byte, error) {
return i.currentV, nil
}

func (i *iterator) First() bool {
if err := i.doOpAndUpdate(gen.Op_FIRST, nil); err != nil {
i.log.Debugw("Error", "op", gen.Op_FIRST, "err", err)
}
return len(i.currentK) > 0 || len(i.currentV) > 0

Check warning on line 59 in db/remote/iterator.go

View check run for this annotation

Codecov / codecov/patch

db/remote/iterator.go#L55-L59

Added lines #L55 - L59 were not covered by tests
}

func (i *iterator) Next() bool {
if err := i.doOpAndUpdate(gen.Op_NEXT, nil); err != nil {
i.log.Debugw("Error", "op", gen.Op_NEXT, "err", err)
Expand Down
2 changes: 1 addition & 1 deletion db/remote/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type transaction struct {
log utils.SimpleLogger
}

func (t *transaction) NewIterator() (db.Iterator, error) {
func (t *transaction) NewIterator(_ []byte, _ bool) (db.Iterator, error) {
err := t.client.Send(&gen.Cursor{
Op: gen.Op_OPEN,
})
Expand Down
2 changes: 1 addition & 1 deletion db/sync_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,6 @@ func (t *SyncTransaction) Impl() any {
}

// NewIterator : see db.Transaction.NewIterator
func (t *SyncTransaction) NewIterator() (Iterator, error) {
func (t *SyncTransaction) NewIterator(_ []byte, _ bool) (Iterator, error) {

Check warning on line 64 in db/sync_transaction.go

View check run for this annotation

Codecov / codecov/patch

db/sync_transaction.go#L64

Added line #L64 was not covered by tests
return nil, errors.New("sync transactions dont support iterators")
}
2 changes: 1 addition & 1 deletion grpc/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func newTx(dbTx db.Transaction) *tx {
}

func (t *tx) newCursor() (uint32, error) {
it, err := t.dbTx.NewIterator()
it, err := t.dbTx.NewIterator(nil, false)
if err != nil {
return 0, err
}
Expand Down
2 changes: 1 addition & 1 deletion migration/bucket_migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (m *BucketMigrator) Before(_ []byte) error {

func (m *BucketMigrator) Migrate(ctx context.Context, txn db.Transaction, network *utils.Network, log utils.SimpleLogger) ([]byte, error) {
remainingInBatch := m.batchSize
iterator, err := txn.NewIterator()
iterator, err := txn.NewIterator(nil, false)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func updateSchemaMetadata(txn db.Transaction, schema schemaMetadata) error {

// migration0000 makes sure the targetDB is empty
func migration0000(txn db.Transaction, _ *utils.Network) error {
it, err := txn.NewIterator()
it, err := txn.NewIterator(nil, false)
if err != nil {
return err
}
Expand All @@ -202,7 +202,7 @@ func migration0000(txn db.Transaction, _ *utils.Network) error {
//
// This enables us to remove the db.ContractRootKey prefix.
func relocateContractStorageRootKeys(txn db.Transaction, _ *utils.Network) error {
it, err := txn.NewIterator()
it, err := txn.NewIterator(nil, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -429,7 +429,7 @@ func (m *changeTrieNodeEncoding) Migrate(_ context.Context, txn db.Transaction,
return nil
}

iterator, err := txn.NewIterator()
iterator, err := txn.NewIterator(nil, false)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions migration/migration_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ func TestChangeStateDiffStructEmptyDB(t *testing.T) {
require.Nil(t, intermediateState)

// DB is still empty.
iter, err := txn.NewIterator()
iter, err := txn.NewIterator(nil, false)
defer func() {
require.NoError(t, iter.Close())
}()
Expand Down Expand Up @@ -654,7 +654,7 @@ func TestChangeStateDiffStruct(t *testing.T) {
// - Both state diffs have been updated.
// - There are no extraneous entries in the DB.
require.NoError(t, testdb.View(func(txn db.Transaction) error {
iter, err := txn.NewIterator()
iter, err := txn.NewIterator(nil, false)
require.NoError(t, err)
defer func() {
require.NoError(t, iter.Close())
Expand Down
7 changes: 3 additions & 4 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,15 +353,14 @@ func loadPeers(database db.DB) ([]peer.AddrInfo, error) {
var peers []peer.AddrInfo

err := database.View(func(txn db.Transaction) error {
it, err := txn.NewIterator()
it, err := txn.NewIterator(db.Peer.Key(), true)
if err != nil {
return fmt.Errorf("create iterator: %w", err)
}
defer it.Close()

prefix := db.Peer.Key()
for it.Seek(prefix); it.Valid(); it.Next() {
peerIDBytes := it.Key()[len(prefix):]
for it.First(); it.Valid(); it.Next() {
peerIDBytes := it.Key()
peerID, err := peer.IDFromBytes(peerIDBytes)
if err != nil {
return fmt.Errorf("decode peer ID: %w", err)
Expand Down

0 comments on commit b774f25

Please sign in to comment.