diff --git a/executor/extension/primer/primer.go b/executor/extension/primer/primer.go index 415f64c4c..5329465f4 100644 --- a/executor/extension/primer/primer.go +++ b/executor/extension/primer/primer.go @@ -168,7 +168,10 @@ func (p *stateDbPrimer[T]) prime(stateDb state.StateDB) error { return fmt.Errorf("cannot generate update-set; %w", err) } if hasPrimed { - p.ctx.SuicideAccounts(stateDb, deletedAccounts) + err = p.ctx.SuicideAccounts(stateDb, deletedAccounts) + if err != nil { + return err + } } if err = p.ctx.PrimeStateDB(substatecontext.NewWorldState(update), stateDb); err != nil { return fmt.Errorf("cannot prime state-db; %w", err) diff --git a/executor/transaction_processor.go b/executor/transaction_processor.go index 025a89a21..575ae602a 100644 --- a/executor/transaction_processor.go +++ b/executor/transaction_processor.go @@ -16,6 +16,8 @@ package executor +//go:generate mockgen -source transaction_processor.go -destination transaction_processor_mocks.go -package executor + import ( "errors" "fmt" @@ -36,11 +38,11 @@ import ( // MakeLiveDbTxProcessor creates a executor.Processor which processes transaction into LIVE StateDb. func MakeLiveDbTxProcessor(cfg *utils.Config) *LiveDbTxProcessor { - return &LiveDbTxProcessor{MakeTxProcessor(cfg)} + return &LiveDbTxProcessor{makeTxProcessor(cfg)} } type LiveDbTxProcessor struct { - *TxProcessor + *txProcessor } // Process transaction inside state into given LIVE StateDb @@ -62,11 +64,11 @@ func (p *LiveDbTxProcessor) Process(state State[txcontext.TxContext], ctx *Conte // MakeArchiveDbTxProcessor creates a executor.Processor which processes transaction into ARCHIVE StateDb. func MakeArchiveDbTxProcessor(cfg *utils.Config) *ArchiveDbTxProcessor { - return &ArchiveDbTxProcessor{MakeTxProcessor(cfg)} + return &ArchiveDbTxProcessor{makeTxProcessor(cfg)} } type ArchiveDbTxProcessor struct { - *TxProcessor + *txProcessor } // Process transaction inside state into given ARCHIVE StateDb @@ -86,7 +88,11 @@ func (p *ArchiveDbTxProcessor) Process(state State[txcontext.TxContext], ctx *Co return err } -type TxProcessor struct { +type TxProcessor interface { + ProcessTransaction(db state.VmStateDB, block int, tx int, st txcontext.TxContext) (txcontext.Result, error) +} + +type txProcessor struct { cfg *utils.Config numErrors *atomic.Int32 // transactions can be processed in parallel, so this needs to be thread safe vmCfg vm.Config @@ -94,7 +100,11 @@ type TxProcessor struct { log logger.Logger } -func MakeTxProcessor(cfg *utils.Config) *TxProcessor { +func MakeTxProcessor(cfg *utils.Config) TxProcessor { + return makeTxProcessor(cfg) +} + +func makeTxProcessor(cfg *utils.Config) *txProcessor { var vmCfg vm.Config switch cfg.ChainID { case utils.EthereumChainID: @@ -111,16 +121,16 @@ func MakeTxProcessor(cfg *utils.Config) *TxProcessor { vmCfg.Tracer = nil vmCfg.Debug = false - return &TxProcessor{ + return &txProcessor{ cfg: cfg, numErrors: new(atomic.Int32), vmCfg: vmCfg, chainCfg: utils.GetChainConfig(cfg.ChainID), - log: logger.NewLogger(cfg.LogLevel, "TxProcessor"), + log: logger.NewLogger(cfg.LogLevel, "txProcessor"), } } -func (s *TxProcessor) isErrFatal() bool { +func (s *txProcessor) isErrFatal() bool { if !s.cfg.ContinueOnFailure { return true } @@ -138,7 +148,7 @@ func (s *TxProcessor) isErrFatal() bool { return true } -func (s *TxProcessor) ProcessTransaction(db state.VmStateDB, block int, tx int, st txcontext.TxContext) (txcontext.Result, error) { +func (s *txProcessor) ProcessTransaction(db state.VmStateDB, block int, tx int, st txcontext.TxContext) (txcontext.Result, error) { if tx >= utils.PseudoTx { return s.processPseudoTx(st.GetOutputState(), db), nil } @@ -146,7 +156,7 @@ func (s *TxProcessor) ProcessTransaction(db state.VmStateDB, block int, tx int, } // processRegularTx executes VM on a chosen storage system. -func (s *TxProcessor) processRegularTx(db state.VmStateDB, block int, tx int, st txcontext.TxContext) (res transactionResult, finalError error) { +func (s *txProcessor) processRegularTx(db state.VmStateDB, block int, tx int, st txcontext.TxContext) (res transactionResult, finalError error) { var ( gasPool = new(evmcore.GasPool) txHash = common.HexToHash(fmt.Sprintf("0x%016d%016d", block, tx)) @@ -190,7 +200,7 @@ func (s *TxProcessor) processRegularTx(db state.VmStateDB, block int, tx int, st // processPseudoTx processes pseudo transactions in Lachesis by applying the change in db state. // The pseudo transactions includes Lachesis SFC, lachesis genesis and lachesis-opera transition. -func (s *TxProcessor) processPseudoTx(ws txcontext.WorldState, db state.VmStateDB) txcontext.Result { +func (s *txProcessor) processPseudoTx(ws txcontext.WorldState, db state.VmStateDB) txcontext.Result { ws.ForEachAccount(func(addr common.Address, acc txcontext.Account) { db.SubBalance(addr, db.GetBalance(addr)) db.AddBalance(addr, acc.GetBalance()) diff --git a/executor/transaction_processor_mocks.go b/executor/transaction_processor_mocks.go new file mode 100644 index 000000000..0c994bfad --- /dev/null +++ b/executor/transaction_processor_mocks.go @@ -0,0 +1,56 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: transaction_processor.go +// +// Generated by this command: +// +// mockgen -source transaction_processor.go -destination transaction_processor_mocks.go -package executor +// + +// Package executor is a generated GoMock package. +package executor + +import ( + reflect "reflect" + + state "github.com/Fantom-foundation/Aida/state" + txcontext "github.com/Fantom-foundation/Aida/txcontext" + gomock "go.uber.org/mock/gomock" +) + +// MockTxProcessor is a mock of TxProcessor interface. +type MockTxProcessor struct { + ctrl *gomock.Controller + recorder *MockTxProcessorMockRecorder +} + +// MockTxProcessorMockRecorder is the mock recorder for MockTxProcessor. +type MockTxProcessorMockRecorder struct { + mock *MockTxProcessor +} + +// NewMockTxProcessor creates a new mock instance. +func NewMockTxProcessor(ctrl *gomock.Controller) *MockTxProcessor { + mock := &MockTxProcessor{ctrl: ctrl} + mock.recorder = &MockTxProcessorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTxProcessor) EXPECT() *MockTxProcessorMockRecorder { + return m.recorder +} + +// ProcessTransaction mocks base method. +func (m *MockTxProcessor) ProcessTransaction(db state.VmStateDB, block, tx int, st txcontext.TxContext) (txcontext.Result, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ProcessTransaction", db, block, tx, st) + ret0, _ := ret[0].(txcontext.Result) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ProcessTransaction indicates an expected call of ProcessTransaction. +func (mr *MockTxProcessorMockRecorder) ProcessTransaction(db, block, tx, st any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessTransaction", reflect.TypeOf((*MockTxProcessor)(nil).ProcessTransaction), db, block, tx, st) +} diff --git a/state/proxy/deletion.go b/state/proxy/deletion.go index e6827ba99..2fd6fa9ae 100644 --- a/state/proxy/deletion.go +++ b/state/proxy/deletion.go @@ -40,11 +40,11 @@ type DeletionProxy struct { } // NewDeletionProxy creates a new StateDB proxy. -func NewDeletionProxy(db state.StateDB, ch chan ContractLiveliness, logLevel string) *DeletionProxy { +func NewDeletionProxy(db state.StateDB, ch chan ContractLiveliness, log logger.Logger) *DeletionProxy { r := new(DeletionProxy) r.db = db r.ch = ch - r.log = logger.NewLogger(logLevel, "Proxy Deletion") + r.log = log return r } diff --git a/utildb/gen_deleted_accounts.go b/utildb/gen_deleted_accounts.go index b40b2351e..9eb86997c 100644 --- a/utildb/gen_deleted_accounts.go +++ b/utildb/gen_deleted_accounts.go @@ -17,6 +17,8 @@ package utildb import ( + "errors" + "sync" "time" "github.com/Fantom-foundation/Aida/executor" @@ -32,11 +34,17 @@ import ( const channelSize = 100000 // size of deletion channel +type txLivelinessResult struct { + liveliness []proxy.ContractLiveliness + tx *substate.Transaction +} + // readAccounts reads contracts which were suicided or created and adds them to lists -func readAccounts(ch chan proxy.ContractLiveliness, deleteHistory *map[common.Address]bool) ([]common.Address, []common.Address) { +func readAccounts(cllArr []proxy.ContractLiveliness, deleteHistory *map[common.Address]bool) ([]common.Address, []common.Address) { des := make(map[common.Address]bool) res := make(map[common.Address]bool) - for contract := range ch { + + for _, contract := range cllArr { addr := contract.Addr if contract.IsDeleted { // if a contract was resurrected before suicided in the same tx, @@ -75,13 +83,7 @@ func readAccounts(ch chan proxy.ContractLiveliness, deleteHistory *map[common.Ad // genDeletedAccountsTask process a transaction substate then records self-destructed accounts // and resurrected accounts to a database. -func genDeletedAccountsTask( - tx *substate.Transaction, - processor *executor.TxProcessor, - ddb *substate.DestroyedAccountDB, - deleteHistory *map[common.Address]bool, - cfg *utils.Config, -) error { +func genDeletedAccountsTask(tx *substate.Transaction, processor executor.TxProcessor, cfg *utils.Config, log logger.Logger) ([]proxy.ContractLiveliness, error) { ch := make(chan proxy.ContractLiveliness, channelSize) var statedb state.StateDB var err error @@ -89,86 +91,268 @@ func genDeletedAccountsTask( statedb, err = state.MakeOffTheChainStateDB(ss.GetInputState(), tx.Block, state.NewChainConduit(cfg.ChainID == utils.EthereumChainID, utils.GetChainConfig(cfg.ChainID))) if err != nil { - return err + return nil, err } //wrapper - statedb = proxy.NewDeletionProxy(statedb, ch, cfg.LogLevel) + statedb = proxy.NewDeletionProxy(statedb, ch, log) _, err = processor.ProcessTransaction(statedb, int(tx.Block), tx.Transaction, ss) if err != nil { - return nil + return nil, err } close(ch) - des, res := readAccounts(ch, deleteHistory) - if len(des)+len(res) > 0 { - // if transaction completed successfully, put destroyed accounts - // and resurrected accounts to a database - if tx.Substate.Result.Status == types.ReceiptStatusSuccessful { - err = ddb.SetDestroyedAccounts(tx.Block, tx.Transaction, des, res) - if err != nil { - return err - } - } + + livelinessArr := make([]proxy.ContractLiveliness, 0) + for liveliness := range ch { + livelinessArr = append(livelinessArr, liveliness) } - return nil + return livelinessArr, nil } // GenDeletedAccountsAction replays transactions and record self-destructed accounts and resurrected accounts. +// Uses round-robin task assignment system to workers to keep order while utilizing parallelism. func GenDeletedAccountsAction(cfg *utils.Config, ddb *substate.DestroyedAccountDB, firstBlock uint64, lastBlock uint64) error { - var err error - - err = utils.StartCPUProfile(cfg) + err := utils.StartCPUProfile(cfg) if err != nil { return err } log := logger.NewLogger(cfg.LogLevel, "Generate Deleted Accounts") - log.Noticef("Generate deleted accounts from block %v to block %v", firstBlock, lastBlock) - start := time.Now() - sec := time.Since(start).Seconds() - lastSec := time.Since(start).Seconds() - txCount := uint64(0) - lastTxCount := uint64(0) - var deleteHistory = make(map[common.Address]bool) + processor := executor.MakeTxProcessor(cfg) + + wg := sync.WaitGroup{} + abort := utils.MakeEvent() + errChan := make(chan error) iter := substate.NewSubstateIterator(firstBlock, cfg.Workers) defer iter.Release() - processor := executor.MakeTxProcessor(cfg) + //error handling routine + encounteredErrors := errorHandler(abort, errChan) + + // feeder to send tasks to workers + workerInputChannels := taskFeeder(&wg, cfg, iter, lastBlock, abort, log) + + // prepare workers to process transactions + workerOutputChannels := txProcessor(&wg, cfg, workerInputChannels, processor, abort, errChan, log) + + // collect results from workers and orders them + orderedResults := resultCollector(&wg, cfg, workerOutputChannels, abort) + + // process ordered txLivelinessResults + resolveDeletionsAndResurrections(ddb, orderedResults, abort, errChan) - for iter.Next() { - tx := iter.Value() - if tx.Block > lastBlock { - break + // wait until feeder, workers and collector are done + wg.Wait() + + // notify error handler to stop listening + close(errChan) + + utils.StopCPUProfile(cfg) + + // retrieve encounteredErrors from error handler + err = <-encounteredErrors + return err +} + +// resolveDeletionsAndResurrections reads txLivelinessResults and resolves deletions and resurrections. +func resolveDeletionsAndResurrections(ddb *substate.DestroyedAccountDB, orderedResults chan txLivelinessResult, abort utils.Event, errChan chan error) { + var deleteHistory = make(map[common.Address]bool) + + for { + select { + case <-abort.Wait(): + return + case contract, ok := <-orderedResults: + { + if !ok { + return + } + des, res := readAccounts(contract.liveliness, &deleteHistory) + if len(des)+len(res) > 0 { + err := ddb.SetDestroyedAccounts(contract.tx.Block, contract.tx.Transaction, des, res) + if err != nil { + errChan <- err + return + } + } + } } + } +} + +// resultCollector collects results from workers in round-robin fashion and sends them to a single channel. +func resultCollector(wg *sync.WaitGroup, cfg *utils.Config, workerOutputChannels []chan txLivelinessResult, abort utils.Event) chan txLivelinessResult { + orderedResults := make(chan txLivelinessResult, cfg.Workers) + wg.Add(1) + go func() { + defer close(orderedResults) + defer wg.Done() - if tx.Transaction < utils.PseudoTx { - err = genDeletedAccountsTask(tx, processor, ddb, &deleteHistory, cfg) - if err != nil { - return err + // round-robin to collect results from workers + for { + for i := 0; i < cfg.Workers; i++ { + select { + case <-abort.Wait(): + return + case res, ok := <-workerOutputChannels[i]: + if !ok { + return + } + + // filter out txs with no liveliness actions + if res.liveliness != nil && len(res.liveliness) > 0 { + select { + case <-abort.Wait(): + return + case orderedResults <- res: + } + } + } } + } + }() + return orderedResults +} + +// txProcessor launches workers to process transactions in parallel. +func txProcessor(wg *sync.WaitGroup, cfg *utils.Config, workerInputChannels []chan *substate.Transaction, processor executor.TxProcessor, abort utils.Event, errChan chan error, log logger.Logger) []chan txLivelinessResult { + // channel for each worker to send results + workerOutputChannels := make([]chan txLivelinessResult, cfg.Workers) + for i := 0; i < cfg.Workers; i++ { + workerOutputChannels[i] = make(chan txLivelinessResult) + } + + for i := 0; i < cfg.Workers; i++ { + wg.Add(1) + go func(workerId int) { + defer func() { + close(workerOutputChannels[workerId]) + wg.Done() + }() + + for { + select { + case <-abort.Wait(): + return + case tx, ok := <-workerInputChannels[workerId]: + if !ok { + return + } + // Process sorted transactions + livelinessArr, err := genDeletedAccountsTask(tx, processor, cfg, log) + if err != nil { + errChan <- err + return + } + + select { + case <-abort.Wait(): + return + case workerOutputChannels[workerId] <- txLivelinessResult{livelinessArr, tx}: + } + } + } + }(i) + } + + return workerOutputChannels +} + +// taskFeeder feeds tasks to workers in round-robin fashion. +func taskFeeder(wg *sync.WaitGroup, cfg *utils.Config, iter substate.SubstateIterator, lastBlock uint64, abort utils.Event, log logger.Logger) []chan *substate.Transaction { + wg.Add(1) + + // channel for each worker to get tasks for processing + workerInputChannels := make([]chan *substate.Transaction, cfg.Workers) + for i := 0; i < cfg.Workers; i++ { + workerInputChannels[i] = make(chan *substate.Transaction) + } + + go func() { + start := time.Now() + sec := time.Since(start).Seconds() + lastSec := time.Since(start).Seconds() + txCount := uint64(0) + lastTxCount := uint64(0) + + defer func() { + wg.Done() + // close inputs for workers + for _, inputChan := range workerInputChannels { + close(inputChan) + } + }() + + // Round-robin worker index + nextWorkerIndex := 0 + for iter.Next() { + select { + case <-abort.Wait(): + return + default: + } + + tx := iter.Value() - txCount++ sec = time.Since(start).Seconds() diff := sec - lastSec + + if tx.Block > lastBlock { + log.Noticef("gen-del-acc: Total elapsed time: %.0f s, (Total ~%.1f Tx/s)", sec, float64(txCount)/sec) + break + } + + txCount++ if diff >= 30 { numTx := txCount - lastTxCount lastTxCount = txCount - log.Infof("aida-vm: gen-del-acc: Elapsed time: %.0f s, at block %v (~%.1f Tx/s)", sec, tx.Block, float64(numTx)/diff) + log.Infof("gen-del-acc: Elapsed time: %.0f s, at block %v (~%.1f Tx/s)", sec, tx.Block, float64(numTx)/diff) lastSec = sec } + + if tx.Transaction < utils.PseudoTx && tx.Substate.Result.Status == types.ReceiptStatusSuccessful { + // if not pseodo tx and completed successfully, send task to next worker in round-robin + select { + case <-abort.Wait(): + return + case workerInputChannels[nextWorkerIndex] <- tx: + nextWorkerIndex = (nextWorkerIndex + 1) % cfg.Workers + } + } } - } + }() - utils.StopCPUProfile(cfg) + return workerInputChannels +} - // explicitly set to nil to release memory as soon as possible - deleteHistory = nil +// errorHandler collects errors from workers and returns them as a single error +// while using abort to signal other routines to stop. +func errorHandler(abort utils.Event, errChan chan error) chan error { + encounteredErrors := make(chan error) + go func() { + defer close(encounteredErrors) - return err + var result error + + defer abort.Signal() + + for { + err, ok := <-errChan + if !ok { + encounteredErrors <- result + return + } + + abort.Signal() + + result = errors.Join(result, err) + } + }() + return encounteredErrors } diff --git a/utildb/gen_deleted_accounts_test.go b/utildb/gen_deleted_accounts_test.go new file mode 100644 index 000000000..b57c51053 --- /dev/null +++ b/utildb/gen_deleted_accounts_test.go @@ -0,0 +1,345 @@ +package utildb + +import ( + "fmt" + "math/big" + "sync" + "testing" + "time" + + "github.com/Fantom-foundation/Aida/executor" + "github.com/Fantom-foundation/Aida/state/proxy" + substatecontext "github.com/Fantom-foundation/Aida/txcontext/substate" + "github.com/Fantom-foundation/Aida/utils" + substate "github.com/Fantom-foundation/Substate" + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" +) + +func Test_errorHandlerAllErrorsAreMerged(t *testing.T) { + abort := utils.MakeEvent() + errChan := make(chan error, 2) + encounteredErrors := errorHandler(abort, errChan) + errChan <- fmt.Errorf("error1") + errChan <- fmt.Errorf("error2") + close(errChan) + got := <-encounteredErrors + assert.Equal(t, "error1\nerror2", got.Error()) +} + +func Test_errorHandlerSendsAbortSignal(t *testing.T) { + abort := utils.MakeEvent() + errChan := make(chan error, 2) + encounteredErrors := errorHandler(abort, errChan) + errChan <- fmt.Errorf("error1") + close(errChan) + got := <-encounteredErrors + assert.Equal(t, "error1", got.Error()) + + select { + case <-abort.Wait(): + default: + t.Errorf("abort signal should be sent") + } +} + +func Test_errorHandlerResultGetsClosed(t *testing.T) { + abort := utils.MakeEvent() + errChan := make(chan error, 2) + encounteredErrors := errorHandler(abort, errChan) + close(errChan) + err, ok := <-encounteredErrors + if !ok { + t.Errorf("encounteredErrors channel should be open with result") + } + if err != nil { + t.Errorf("encounteredErrors shouldn't have any errors") + } + + _, ok = <-encounteredErrors + if ok { + t.Errorf("encounteredErrors channel should be closed") + } +} + +func Test_launchWorkersParallelAbortsOnSignal(t *testing.T) { + ctrl := gomock.NewController(t) + processor := executor.NewMockTxProcessor(ctrl) + + wg := &sync.WaitGroup{} + cfg := &utils.Config{Workers: 2, ChainID: 250} + + errChan := make(chan error) + abort := utils.MakeEvent() + + testTx := makeTestTx(4) + + // channel for each worker to get tasks for processing + workerInputChannels := make([]chan *substate.Transaction, cfg.Workers) + for i := 0; i < cfg.Workers; i++ { + workerInputChannels[i] = make(chan *substate.Transaction) + go func(workerId int, workerIn chan *substate.Transaction) { + for k := 0; ; k++ { + index := workerId + k*cfg.Workers + if index >= len(testTx) { + break + } + workerIn <- &substate.Transaction{Block: uint64(index), Substate: testTx[index]} + } + close(workerIn) + }(i, workerInputChannels[i]) + } + + processor.EXPECT().ProcessTransaction(gomock.Any(), 0, gomock.Any(), substatecontext.NewTxContext(testTx[0])).Return(nil, nil) + processor.EXPECT().ProcessTransaction(gomock.Any(), 1, gomock.Any(), substatecontext.NewTxContext(testTx[1])).Return(nil, nil) + processor.EXPECT().ProcessTransaction(gomock.Any(), 2, gomock.Any(), substatecontext.NewTxContext(testTx[2])).Return(nil, nil) + // block 3 is missing because of aborting + + outPut := txProcessor(wg, cfg, workerInputChannels, processor, abort, errChan, nil) + + _, ok := <-outPut[0] + if !ok { + t.Fatalf("output channel should be open") + } + + // wait for second worker processing + time.Sleep(100 * time.Millisecond) + + // abort before allowing processing of block 3 + abort.Signal() + + wg.Wait() + ctrl.Finish() +} + +func Test_launchWorkersParallelCorrectOutputOrder(t *testing.T) { + ctrl := gomock.NewController(t) + processor := executor.NewMockTxProcessor(ctrl) + + wg := &sync.WaitGroup{} + cfg := &utils.Config{Workers: utils.GetRandom(2, 5), ChainID: 250} + + errChan := make(chan error) + + testSize := cfg.Workers*utils.GetRandom(100, 1000) + utils.GetRandom(0, cfg.Workers-1) + testTx := makeTestTx(testSize) + if len(testTx) != testSize { + t.Fatalf("internal test error: testTx size is incorrect") + } + + // channel for each worker to get tasks for processing + workerInputChannels := make([]chan *substate.Transaction, cfg.Workers) + for i := 0; i < cfg.Workers; i++ { + workerInputChannels[i] = make(chan *substate.Transaction) + go func(workerId int, workerIn chan *substate.Transaction) { + for k := 0; ; k++ { + index := workerId + k*cfg.Workers + if index >= len(testTx) { + break + } + + workerIn <- &substate.Transaction{Block: uint64(index), Substate: testTx[index]} + } + + close(workerIn) + }(i, workerInputChannels[i]) + } + + for i := 0; i < len(testTx); i++ { + processor.EXPECT().ProcessTransaction(gomock.Any(), i, gomock.Any(), substatecontext.NewTxContext(testTx[i])).Return(nil, nil) + } + + outPut := txProcessor(wg, cfg, workerInputChannels, processor, utils.MakeEvent(), errChan, nil) + + var orderCheck uint64 = 0 + +loop: + for { + for i := 0; i < cfg.Workers; i++ { + r, ok := <-outPut[i] + if !ok { + if int(orderCheck) != testSize { + t.Fatalf("results are missing got: %d, expected: %d", orderCheck, testSize) + } + break loop + } + if orderCheck != r.tx.Block { + t.Fatalf("results are in incorrect order") + } + orderCheck++ + } + } + + wg.Wait() + ctrl.Finish() +} + +func Test_readAccounts(t *testing.T) { + t.Run("Empty", func(t *testing.T) { + arr := make([]proxy.ContractLiveliness, 0) + deleteHistory := make(map[common.Address]bool) + del, res := readAccounts(arr, &deleteHistory) + if len(del) != 0 || len(res) != 0 { + t.Fatalf("should return empty arrays") + } + if len(deleteHistory) != 0 { + t.Fatalf("deleteHistory should be empty") + } + }) + + t.Run("Deletion", func(t *testing.T) { + arr := make([]proxy.ContractLiveliness, 0) + deleteHistory := make(map[common.Address]bool) + + arr = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: true}) + del, res := readAccounts(arr, &deleteHistory) + if len(del) != 1 || len(res) != 0 { + t.Fatalf("should return empty arrays") + } + if !deleteHistory[common.HexToAddress("0x1")] { + t.Fatalf("deleteHistory should have 0x1 deleted") + } + }) + + t.Run("DeletionAndResurrection", func(t *testing.T) { + arr := make([]proxy.ContractLiveliness, 0) + deleteHistory := make(map[common.Address]bool) + + arr = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: true}) + arr = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: false}) + del, res := readAccounts(arr, &deleteHistory) + if len(del) != 0 || len(res) != 1 { + t.Fatalf("should return empty deletion array and 1 resurrected") + } + if deleteHistory[common.HexToAddress("0x1")] { + t.Fatalf("deleteHistory should have 0x1 resurrected") + } + }) + + t.Run("DeletionResurrectionDeletion", func(t *testing.T) { + arr := make([]proxy.ContractLiveliness, 0) + deleteHistory := make(map[common.Address]bool) + + arr = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: true}) + arr = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: false}) + arr = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: true}) + del, res := readAccounts(arr, &deleteHistory) + if len(del) != 1 || len(res) != 0 { + t.Fatalf("should return empty deletion array and 1 resurrected") + } + if !deleteHistory[common.HexToAddress("0x1")] { + t.Fatalf("deleteHistory should have 0x1 deleted") + } + }) + + t.Run("DeletionResurrectionSplit", func(t *testing.T) { + arr := make([]proxy.ContractLiveliness, 0) + deleteHistory := make(map[common.Address]bool) + arr = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: true}) + _, _ = readAccounts(arr, &deleteHistory) + + // second run + arr2 := make([]proxy.ContractLiveliness, 0) + arr2 = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: false}) + del, res := readAccounts(arr2, &deleteHistory) + if len(del) != 0 || len(res) != 1 { + t.Fatalf("should return empty deletion array and 1 resurrected") + } + if deleteHistory[common.HexToAddress("0x1")] { + t.Fatalf("deleteHistory should have 0x1 deleted") + } + }) + + t.Run("ResurrectionDeletionResurrection", func(t *testing.T) { + arr := make([]proxy.ContractLiveliness, 0) + deleteHistory := make(map[common.Address]bool) + deleteHistory[common.HexToAddress("0x1")] = false + + arr = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: true}) + arr = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: false}) + del, res := readAccounts(arr, &deleteHistory) + if len(del) != 0 || len(res) != 1 { + t.Fatalf("should return empty deletion array and 1 resurrected") + } + if deleteHistory[common.HexToAddress("0x1")] { + t.Fatalf("deleteHistory should have 0x1 ressurected") + } + }) +} + +func Test_resultCollectorCorrectResultOrder(t *testing.T) { + wg := &sync.WaitGroup{} + cfg := &utils.Config{Workers: 100, ChainID: 250} + + // channel for each worker to get tasks for processing + workerOutputChannels := make([]chan txLivelinessResult, cfg.Workers) + for i := 0; i < cfg.Workers; i++ { + workerOutputChannels[i] = make(chan txLivelinessResult) + go func(workerId int, workerOut chan txLivelinessResult) { + workerOut <- txLivelinessResult{liveliness: []proxy.ContractLiveliness{{Addr: common.HexToAddress(fmt.Sprintf("0x%x", workerId)), IsDeleted: true}}} + close(workerOutputChannels[workerId]) + }(i, workerOutputChannels[i]) + } + + res := resultCollector(wg, cfg, workerOutputChannels, utils.MakeEvent()) + + currentBlk := 0 + for r := range res { + if !r.liveliness[0].IsDeleted { + t.Fatalf("results are in incorrect order") + } + + if r.liveliness[0].Addr != common.HexToAddress(fmt.Sprintf("0x%x", currentBlk)) { + t.Fatalf("results are in incorrect order liveliness") + } + currentBlk++ + } +} + +func Test_resultCollectorAbortsOnSignal(t *testing.T) { + wg := &sync.WaitGroup{} + cfg := &utils.Config{Workers: 100, ChainID: 250} + + abort := utils.MakeEvent() + + // channel for each worker to get tasks for processing + workerOutputChannels := make([]chan txLivelinessResult, cfg.Workers) + for i := 0; i < cfg.Workers; i++ { + workerOutputChannels[i] = make(chan txLivelinessResult) + go func(workerId int, workerOut chan txLivelinessResult) { + workerOut <- txLivelinessResult{liveliness: []proxy.ContractLiveliness{{Addr: common.HexToAddress(fmt.Sprintf("0x%x", workerId)), IsDeleted: true}}} + close(workerOutputChannels[workerId]) + }(i, workerOutputChannels[i]) + } + + res := resultCollector(wg, cfg, workerOutputChannels, abort) + + currentBlk := 0 + for range res { + if currentBlk == 50 { + abort.Signal() + break + } + currentBlk++ + } +} + +// makeTestTx creates dummy substates that will be processed without crashing. +func makeTestTx(count int) []*substate.Substate { + testTxArr := make([]*substate.Substate, count) + for i := 0; i < count; i++ { + var testTx = &substate.Substate{ + Env: &substate.SubstateEnv{}, + Message: &substate.SubstateMessage{ + Gas: 10000, + GasPrice: big.NewInt(0), + }, + Result: &substate.SubstateResult{ + GasUsed: 1, + }, + } + testTxArr[i] = testTx + } + return testTxArr +} diff --git a/utils/prime_ctx.go b/utils/prime_ctx.go index c110a63b8..d9c485780 100644 --- a/utils/prime_ctx.go +++ b/utils/prime_ctx.go @@ -231,11 +231,17 @@ func (pc *PrimeContext) PrimeStateDBRandom(ws txcontext.WorldState, db state.Sta } // SuicideAccounts clears storage of all input accounts. -func (pc *PrimeContext) SuicideAccounts(db state.StateDB, accounts []common.Address) { +func (pc *PrimeContext) SuicideAccounts(db state.StateDB, accounts []common.Address) error { count := 0 db.BeginSyncPeriod(0) - db.BeginBlock(pc.block) - db.BeginTransaction(0) + err := db.BeginBlock(pc.block) + if err != nil { + return fmt.Errorf("DeleteDestroyedAccounts BeginBlock: %w", err) + } + err = db.BeginTransaction(0) + if err != nil { + return fmt.Errorf("DeleteDestroyedAccounts BeginTransaction: %w", err) + } for _, addr := range accounts { if db.Exist(addr) { db.Suicide(addr) @@ -244,11 +250,18 @@ func (pc *PrimeContext) SuicideAccounts(db state.StateDB, accounts []common.Addr pc.exist[addr] = false } } - db.EndTransaction() - db.EndBlock() + err = db.EndTransaction() + if err != nil { + return fmt.Errorf("DeleteDestroyedAccounts EndTransaction: %w", err) + } + err = db.EndBlock() + if err != nil { + return fmt.Errorf("DeleteDestroyedAccounts EndBlock: %w", err) + } db.EndSyncPeriod() pc.block++ pc.log.Infof("\t\t %v suicided accounts were removed from statedb (before priming).", count) + return nil } func (pc *PrimeContext) GetBlock() uint64 { diff --git a/utils/statedb.go b/utils/statedb.go index ecb48166b..cbbc50243 100644 --- a/utils/statedb.go +++ b/utils/statedb.go @@ -260,14 +260,26 @@ func DeleteDestroyedAccountsFromStateDB(db state.StateDB, cfg *Config, target ui return nil } db.BeginSyncPeriod(0) - db.BeginBlock(target) - db.BeginTransaction(0) + err = db.BeginBlock(target) + if err != nil { + return fmt.Errorf("DeleteDestroyedAccounts BeginBlock: %w", err) + } + err = db.BeginTransaction(0) + if err != nil { + return fmt.Errorf("DeleteDestroyedAccounts BeginTransaction: %w", err) + } for _, addr := range accounts { db.Suicide(addr) log.Debugf("Perform suicide on %v", addr) } - db.EndTransaction() - db.EndBlock() + err = db.EndTransaction() + if err != nil { + return fmt.Errorf("DeleteDestroyedAccounts EndTransaction: %w", err) + } + err = db.EndBlock() + if err != nil { + return fmt.Errorf("DeleteDestroyedAccounts EndBlock: %w", err) + } db.EndSyncPeriod() return nil }