From 3cda315f73f1dc3ca37eae8b0e99388d7c80d5e3 Mon Sep 17 00:00:00 2001 From: matej Date: Mon, 22 Apr 2024 14:42:46 +0200 Subject: [PATCH 1/7] Optimize gen-deleted-accounts --- executor/transaction_processor.go | 34 ++- executor/transaction_processor_mocks.go | 56 +++++ utildb/gen_deleted_accounts.go | 288 ++++++++++++++++++++---- utildb/gen_deleted_accounts_test.go | 246 ++++++++++++++++++++ 4 files changed, 564 insertions(+), 60 deletions(-) create mode 100644 executor/transaction_processor_mocks.go create mode 100644 utildb/gen_deleted_accounts_test.go diff --git a/executor/transaction_processor.go b/executor/transaction_processor.go index 025a89a21..575ae602a 100644 --- a/executor/transaction_processor.go +++ b/executor/transaction_processor.go @@ -16,6 +16,8 @@ package executor +//go:generate mockgen -source transaction_processor.go -destination transaction_processor_mocks.go -package executor + import ( "errors" "fmt" @@ -36,11 +38,11 @@ import ( // MakeLiveDbTxProcessor creates a executor.Processor which processes transaction into LIVE StateDb. func MakeLiveDbTxProcessor(cfg *utils.Config) *LiveDbTxProcessor { - return &LiveDbTxProcessor{MakeTxProcessor(cfg)} + return &LiveDbTxProcessor{makeTxProcessor(cfg)} } type LiveDbTxProcessor struct { - *TxProcessor + *txProcessor } // Process transaction inside state into given LIVE StateDb @@ -62,11 +64,11 @@ func (p *LiveDbTxProcessor) Process(state State[txcontext.TxContext], ctx *Conte // MakeArchiveDbTxProcessor creates a executor.Processor which processes transaction into ARCHIVE StateDb. func MakeArchiveDbTxProcessor(cfg *utils.Config) *ArchiveDbTxProcessor { - return &ArchiveDbTxProcessor{MakeTxProcessor(cfg)} + return &ArchiveDbTxProcessor{makeTxProcessor(cfg)} } type ArchiveDbTxProcessor struct { - *TxProcessor + *txProcessor } // Process transaction inside state into given ARCHIVE StateDb @@ -86,7 +88,11 @@ func (p *ArchiveDbTxProcessor) Process(state State[txcontext.TxContext], ctx *Co return err } -type TxProcessor struct { +type TxProcessor interface { + ProcessTransaction(db state.VmStateDB, block int, tx int, st txcontext.TxContext) (txcontext.Result, error) +} + +type txProcessor struct { cfg *utils.Config numErrors *atomic.Int32 // transactions can be processed in parallel, so this needs to be thread safe vmCfg vm.Config @@ -94,7 +100,11 @@ type TxProcessor struct { log logger.Logger } -func MakeTxProcessor(cfg *utils.Config) *TxProcessor { +func MakeTxProcessor(cfg *utils.Config) TxProcessor { + return makeTxProcessor(cfg) +} + +func makeTxProcessor(cfg *utils.Config) *txProcessor { var vmCfg vm.Config switch cfg.ChainID { case utils.EthereumChainID: @@ -111,16 +121,16 @@ func MakeTxProcessor(cfg *utils.Config) *TxProcessor { vmCfg.Tracer = nil vmCfg.Debug = false - return &TxProcessor{ + return &txProcessor{ cfg: cfg, numErrors: new(atomic.Int32), vmCfg: vmCfg, chainCfg: utils.GetChainConfig(cfg.ChainID), - log: logger.NewLogger(cfg.LogLevel, "TxProcessor"), + log: logger.NewLogger(cfg.LogLevel, "txProcessor"), } } -func (s *TxProcessor) isErrFatal() bool { +func (s *txProcessor) isErrFatal() bool { if !s.cfg.ContinueOnFailure { return true } @@ -138,7 +148,7 @@ func (s *TxProcessor) isErrFatal() bool { return true } -func (s *TxProcessor) ProcessTransaction(db state.VmStateDB, block int, tx int, st txcontext.TxContext) (txcontext.Result, error) { +func (s *txProcessor) ProcessTransaction(db state.VmStateDB, block int, tx int, st txcontext.TxContext) (txcontext.Result, error) { if tx >= utils.PseudoTx { return s.processPseudoTx(st.GetOutputState(), db), nil } @@ -146,7 +156,7 @@ func (s *TxProcessor) ProcessTransaction(db state.VmStateDB, block int, tx int, } // processRegularTx executes VM on a chosen storage system. -func (s *TxProcessor) processRegularTx(db state.VmStateDB, block int, tx int, st txcontext.TxContext) (res transactionResult, finalError error) { +func (s *txProcessor) processRegularTx(db state.VmStateDB, block int, tx int, st txcontext.TxContext) (res transactionResult, finalError error) { var ( gasPool = new(evmcore.GasPool) txHash = common.HexToHash(fmt.Sprintf("0x%016d%016d", block, tx)) @@ -190,7 +200,7 @@ func (s *TxProcessor) processRegularTx(db state.VmStateDB, block int, tx int, st // processPseudoTx processes pseudo transactions in Lachesis by applying the change in db state. // The pseudo transactions includes Lachesis SFC, lachesis genesis and lachesis-opera transition. -func (s *TxProcessor) processPseudoTx(ws txcontext.WorldState, db state.VmStateDB) txcontext.Result { +func (s *txProcessor) processPseudoTx(ws txcontext.WorldState, db state.VmStateDB) txcontext.Result { ws.ForEachAccount(func(addr common.Address, acc txcontext.Account) { db.SubBalance(addr, db.GetBalance(addr)) db.AddBalance(addr, acc.GetBalance()) diff --git a/executor/transaction_processor_mocks.go b/executor/transaction_processor_mocks.go new file mode 100644 index 000000000..0c994bfad --- /dev/null +++ b/executor/transaction_processor_mocks.go @@ -0,0 +1,56 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: transaction_processor.go +// +// Generated by this command: +// +// mockgen -source transaction_processor.go -destination transaction_processor_mocks.go -package executor +// + +// Package executor is a generated GoMock package. +package executor + +import ( + reflect "reflect" + + state "github.com/Fantom-foundation/Aida/state" + txcontext "github.com/Fantom-foundation/Aida/txcontext" + gomock "go.uber.org/mock/gomock" +) + +// MockTxProcessor is a mock of TxProcessor interface. +type MockTxProcessor struct { + ctrl *gomock.Controller + recorder *MockTxProcessorMockRecorder +} + +// MockTxProcessorMockRecorder is the mock recorder for MockTxProcessor. +type MockTxProcessorMockRecorder struct { + mock *MockTxProcessor +} + +// NewMockTxProcessor creates a new mock instance. +func NewMockTxProcessor(ctrl *gomock.Controller) *MockTxProcessor { + mock := &MockTxProcessor{ctrl: ctrl} + mock.recorder = &MockTxProcessorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTxProcessor) EXPECT() *MockTxProcessorMockRecorder { + return m.recorder +} + +// ProcessTransaction mocks base method. +func (m *MockTxProcessor) ProcessTransaction(db state.VmStateDB, block, tx int, st txcontext.TxContext) (txcontext.Result, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ProcessTransaction", db, block, tx, st) + ret0, _ := ret[0].(txcontext.Result) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ProcessTransaction indicates an expected call of ProcessTransaction. +func (mr *MockTxProcessorMockRecorder) ProcessTransaction(db, block, tx, st any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessTransaction", reflect.TypeOf((*MockTxProcessor)(nil).ProcessTransaction), db, block, tx, st) +} diff --git a/utildb/gen_deleted_accounts.go b/utildb/gen_deleted_accounts.go index b40b2351e..921c071da 100644 --- a/utildb/gen_deleted_accounts.go +++ b/utildb/gen_deleted_accounts.go @@ -17,6 +17,8 @@ package utildb import ( + "errors" + "sync" "time" "github.com/Fantom-foundation/Aida/executor" @@ -32,11 +34,17 @@ import ( const channelSize = 100000 // size of deletion channel +type txLivelinessResult struct { + liveliness []proxy.ContractLiveliness + tx *substate.Transaction +} + // readAccounts reads contracts which were suicided or created and adds them to lists -func readAccounts(ch chan proxy.ContractLiveliness, deleteHistory *map[common.Address]bool) ([]common.Address, []common.Address) { +func readAccounts(cllArr []proxy.ContractLiveliness, deleteHistory *map[common.Address]bool) ([]common.Address, []common.Address) { des := make(map[common.Address]bool) res := make(map[common.Address]bool) - for contract := range ch { + + for _, contract := range cllArr { addr := contract.Addr if contract.IsDeleted { // if a contract was resurrected before suicided in the same tx, @@ -75,13 +83,7 @@ func readAccounts(ch chan proxy.ContractLiveliness, deleteHistory *map[common.Ad // genDeletedAccountsTask process a transaction substate then records self-destructed accounts // and resurrected accounts to a database. -func genDeletedAccountsTask( - tx *substate.Transaction, - processor *executor.TxProcessor, - ddb *substate.DestroyedAccountDB, - deleteHistory *map[common.Address]bool, - cfg *utils.Config, -) error { +func genDeletedAccountsTask(tx *substate.Transaction, processor executor.TxProcessor, cfg *utils.Config) ([]proxy.ContractLiveliness, error) { ch := make(chan proxy.ContractLiveliness, channelSize) var statedb state.StateDB var err error @@ -89,7 +91,7 @@ func genDeletedAccountsTask( statedb, err = state.MakeOffTheChainStateDB(ss.GetInputState(), tx.Block, state.NewChainConduit(cfg.ChainID == utils.EthereumChainID, utils.GetChainConfig(cfg.ChainID))) if err != nil { - return err + return nil, err } //wrapper @@ -97,78 +99,268 @@ func genDeletedAccountsTask( _, err = processor.ProcessTransaction(statedb, int(tx.Block), tx.Transaction, ss) if err != nil { - return nil + return nil, nil } close(ch) - des, res := readAccounts(ch, deleteHistory) - if len(des)+len(res) > 0 { - // if transaction completed successfully, put destroyed accounts - // and resurrected accounts to a database - if tx.Substate.Result.Status == types.ReceiptStatusSuccessful { - err = ddb.SetDestroyedAccounts(tx.Block, tx.Transaction, des, res) - if err != nil { - return err - } - } + + livelinessArr := make([]proxy.ContractLiveliness, 0) + for liveliness := range ch { + livelinessArr = append(livelinessArr, liveliness) } - return nil + return livelinessArr, nil } // GenDeletedAccountsAction replays transactions and record self-destructed accounts and resurrected accounts. +// Uses round-robin task assignment system to workers to keep order while utilizing parallelism. func GenDeletedAccountsAction(cfg *utils.Config, ddb *substate.DestroyedAccountDB, firstBlock uint64, lastBlock uint64) error { - var err error - - err = utils.StartCPUProfile(cfg) + err := utils.StartCPUProfile(cfg) if err != nil { return err } log := logger.NewLogger(cfg.LogLevel, "Generate Deleted Accounts") - log.Noticef("Generate deleted accounts from block %v to block %v", firstBlock, lastBlock) - start := time.Now() - sec := time.Since(start).Seconds() - lastSec := time.Since(start).Seconds() - txCount := uint64(0) - lastTxCount := uint64(0) - var deleteHistory = make(map[common.Address]bool) + processor := executor.MakeTxProcessor(cfg) + + wg := sync.WaitGroup{} + stopChan := make(chan struct{}) + errChan := make(chan error) iter := substate.NewSubstateIterator(firstBlock, cfg.Workers) defer iter.Release() - processor := executor.MakeTxProcessor(cfg) + //error handling routine + encounteredErrors := errorHandler(stopChan, errChan) + + // feeder to send tasks to workers + workerInputChannels := taskFeeder(&wg, cfg, iter, lastBlock, stopChan, log) + + // prepare workers to process transactions + workerOutputChannels := launchWorkers(&wg, cfg, workerInputChannels, processor, stopChan, errChan) + + // collect results from workers and orders them + orderedResults := resultCollector(&wg, cfg, workerOutputChannels, stopChan) + + // process ordered txLivelinessResults + resolveDeletionsAndResurrections(ddb, orderedResults, stopChan, errChan) - for iter.Next() { - tx := iter.Value() - if tx.Block > lastBlock { - break + // wait until feeder, workers and collector are done + wg.Wait() + + // notify error handler to stop + close(errChan) + + utils.StopCPUProfile(cfg) + + // retrieve encounteredErrors from error handler + err = <-encounteredErrors + return err +} + +// resolveDeletionsAndResurrections reads txLivelinessResults and resolves deletions and resurrections. +func resolveDeletionsAndResurrections(ddb *substate.DestroyedAccountDB, orderedResults chan txLivelinessResult, stopChan chan struct{}, errChan chan error) { + var deleteHistory = make(map[common.Address]bool) + defer func() { + // explicitly set to nil to release memory as soon as possible + deleteHistory = nil + }() + + for { + select { + case <-stopChan: + return + case contract, ok := <-orderedResults: + { + if !ok { + return + } + des, res := readAccounts(contract.liveliness, &deleteHistory) + if len(des)+len(res) > 0 { + err := ddb.SetDestroyedAccounts(contract.tx.Block, contract.tx.Transaction, des, res) + if err != nil { + errChan <- err + return + } + } + } } + } +} + +// resultCollector collects results from workers in round-robin fashion and sends them to a single channel. +func resultCollector(wg *sync.WaitGroup, cfg *utils.Config, workerOutputChannels map[int]chan txLivelinessResult, stopChan chan struct{}) chan txLivelinessResult { + orderedResults := make(chan txLivelinessResult, cfg.Workers) + wg.Add(1) + go func() { + defer close(orderedResults) + defer wg.Done() - if tx.Transaction < utils.PseudoTx { - err = genDeletedAccountsTask(tx, processor, ddb, &deleteHistory, cfg) - if err != nil { - return err + // round-robin to collect results from workers + for { + for i := 0; i < cfg.Workers; i++ { + select { + case <-stopChan: + return + case res, ok := <-workerOutputChannels[i]: + if !ok { + return + } + + // filter out txs with no liveliness actions + if res.liveliness != nil && len(res.liveliness) > 0 { + orderedResults <- res + } + } } + } + }() + return orderedResults +} + +// launchWorkers lauches workers to process transactions in parallel. +func launchWorkers(wg *sync.WaitGroup, cfg *utils.Config, workerInputChannels map[int]chan *substate.Transaction, processor executor.TxProcessor, stopChan chan struct{}, errChan chan error) map[int]chan txLivelinessResult { + // channel for each worker to send results + workerOutputChannels := make(map[int]chan txLivelinessResult) + for i := 0; i < cfg.Workers; i++ { + workerOutputChannels[i] = make(chan txLivelinessResult) + } + + for i := 0; i < cfg.Workers; i++ { + wg.Add(1) + go func(workerId int) { + defer func() { + close(workerOutputChannels[workerId]) + wg.Done() + }() + + for { + select { + case <-stopChan: + return + case tx, ok := <-workerInputChannels[workerId]: + if !ok { + return + } + // Process sorted transactions + livelinessArr, err := genDeletedAccountsTask(tx, processor, cfg) + if err != nil { + errChan <- err + return + } + + select { + case <-stopChan: + return + case workerOutputChannels[workerId] <- txLivelinessResult{livelinessArr, tx}: + } + } + } + }(i) + } + + return workerOutputChannels +} + +// taskFeeder feeds tasks to workers in round-robin fashion. +func taskFeeder(wg *sync.WaitGroup, cfg *utils.Config, iter substate.SubstateIterator, lastBlock uint64, stopChan chan struct{}, log logger.Logger) map[int]chan *substate.Transaction { + wg.Add(1) + + // channel for each worker to get tasks for processing + workerInputChannels := make(map[int]chan *substate.Transaction) + for i := 0; i < cfg.Workers; i++ { + workerInputChannels[i] = make(chan *substate.Transaction) + } + + go func() { + start := time.Now() + sec := time.Since(start).Seconds() + lastSec := time.Since(start).Seconds() + txCount := uint64(0) + lastTxCount := uint64(0) + + defer func() { + wg.Done() + // close inputs for workers + for _, inputChan := range workerInputChannels { + close(inputChan) + } + }() + + // Round-robin worker index + nextWorkerIndex := 0 + for iter.Next() { + select { + case <-stopChan: + return + default: + } + + tx := iter.Value() - txCount++ sec = time.Since(start).Seconds() diff := sec - lastSec + + if tx.Block > lastBlock { + log.Infof("gen-del-acc: Total elapsed time: %.0f s, (Total ~%.1f Tx/s)", sec, float64(txCount)/sec) + break + } + + txCount++ if diff >= 30 { numTx := txCount - lastTxCount lastTxCount = txCount - log.Infof("aida-vm: gen-del-acc: Elapsed time: %.0f s, at block %v (~%.1f Tx/s)", sec, tx.Block, float64(numTx)/diff) + log.Infof("gen-del-acc: Elapsed time: %.0f s, at block %v (~%.1f Tx/s)", sec, tx.Block, float64(numTx)/diff) lastSec = sec } + + if tx.Transaction < utils.PseudoTx && tx.Substate.Result.Status == types.ReceiptStatusSuccessful { + // if not pseodo tx and completed successfully, send task to next worker in round-robin + select { + case <-stopChan: + return + case workerInputChannels[nextWorkerIndex] <- tx: + nextWorkerIndex = (nextWorkerIndex + 1) % cfg.Workers + } + } } - } + }() - utils.StopCPUProfile(cfg) + return workerInputChannels +} - // explicitly set to nil to release memory as soon as possible - deleteHistory = nil +// errorHandler collects errors from workers and returns them as a single error +// while closing the stopChan to signal other routines to stop. +func errorHandler(stopChan chan struct{}, errChan chan error) chan error { + encounteredErrors := make(chan error) + go func() { + defer close(encounteredErrors) - return err + var result error + firstErr := true + + defer func() { + if firstErr { + close(stopChan) + firstErr = false + } + }() + + for { + err, ok := <-errChan + if !ok { + encounteredErrors <- result + return + } + if firstErr { + close(stopChan) + firstErr = false + } + + result = errors.Join(result, err) + } + }() + return encounteredErrors } diff --git a/utildb/gen_deleted_accounts_test.go b/utildb/gen_deleted_accounts_test.go new file mode 100644 index 000000000..7e5df33ff --- /dev/null +++ b/utildb/gen_deleted_accounts_test.go @@ -0,0 +1,246 @@ +package utildb + +import ( + "fmt" + "math/big" + "sync" + "testing" + + "github.com/Fantom-foundation/Aida/executor" + "github.com/Fantom-foundation/Aida/state/proxy" + substatecontext "github.com/Fantom-foundation/Aida/txcontext/substate" + "github.com/Fantom-foundation/Aida/utils" + substate "github.com/Fantom-foundation/Substate" + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" +) + +func Test_errorHandlerAllErrorsAreMerged(t *testing.T) { + stopChan := make(chan struct{}) + errChan := make(chan error, 2) + encounteredErrors := errorHandler(stopChan, errChan) + errChan <- fmt.Errorf("error1") + errChan <- fmt.Errorf("error2") + close(errChan) + got := <-encounteredErrors + assert.Equal(t, "error1\nerror2", got.Error()) +} + +func Test_errorHandlerResultGetsClosed(t *testing.T) { + stopChan := make(chan struct{}) + errChan := make(chan error, 2) + encounteredErrors := errorHandler(stopChan, errChan) + close(errChan) + err, ok := <-encounteredErrors + if !ok { + t.Errorf("encounteredErrors channel should be open with result") + } + if err != nil { + t.Errorf("encounteredErrors shouldn't have any errors") + } + + _, ok = <-encounteredErrors + if ok { + t.Errorf("encounteredErrors channel should be closed") + } +} + +func Test_launchWorkersParallel(t *testing.T) { + ctrl := gomock.NewController(t) + processor := executor.NewMockTxProcessor(ctrl) + + wg := &sync.WaitGroup{} + cfg := &utils.Config{Workers: 3, ChainID: 250} + + stopChan := make(chan struct{}) + errChan := make(chan error) + + testTx := makeTestTx(6) + + // channel for each worker to get tasks for processing + workerInputChannels := make(map[int]chan *substate.Transaction) + for i := 0; i < cfg.Workers; i++ { + workerInputChannels[i] = make(chan *substate.Transaction) + go func(workerId int) { + // 2 transactions for each worker + workerInputChannels[workerId] <- &substate.Transaction{Block: uint64(workerId), Substate: testTx[workerId]} + workerInputChannels[workerId] <- &substate.Transaction{Block: uint64(workerId + cfg.Workers), Substate: testTx[workerId+cfg.Workers]} + close(workerInputChannels[workerId]) + }(i) + } + + processor.EXPECT().ProcessTransaction(gomock.Any(), 0, gomock.Any(), substatecontext.NewTxContext(testTx[0])).Return(nil, nil) + processor.EXPECT().ProcessTransaction(gomock.Any(), 1, gomock.Any(), substatecontext.NewTxContext(testTx[1])).Return(nil, nil) + processor.EXPECT().ProcessTransaction(gomock.Any(), 2, gomock.Any(), substatecontext.NewTxContext(testTx[2])).Return(nil, nil) + processor.EXPECT().ProcessTransaction(gomock.Any(), 3, gomock.Any(), substatecontext.NewTxContext(testTx[3])).Return(nil, nil) + processor.EXPECT().ProcessTransaction(gomock.Any(), 4, gomock.Any(), substatecontext.NewTxContext(testTx[4])).Return(nil, nil) + processor.EXPECT().ProcessTransaction(gomock.Any(), 5, gomock.Any(), substatecontext.NewTxContext(testTx[5])).Return(nil, nil) + + outPut := launchWorkers(wg, cfg, workerInputChannels, processor, stopChan, errChan) + + var orderCheck uint64 = 0 + // 2 transactions for each worker + for k := 0; k < 2; k++ { + for i := 0; i < cfg.Workers; i++ { + r, ok := <-outPut[i] + if !ok { + t.Fatalf("results channel should be open") + } + if orderCheck != r.tx.Block { + t.Fatalf("results are in incorrect order") + } + orderCheck++ + } + } + wg.Wait() + + ctrl.Finish() +} + +// makeTestTx creates dummy substates that will be processed without crashing. +func makeTestTx(count int) []*substate.Substate { + testTxArr := make([]*substate.Substate, count) + for i := 0; i < count; i++ { + var testTx = &substate.Substate{ + Env: &substate.SubstateEnv{}, + Message: &substate.SubstateMessage{ + Gas: 10000, + GasPrice: big.NewInt(0), + }, + Result: &substate.SubstateResult{ + GasUsed: 1, + }, + } + testTxArr[i] = testTx + } + return testTxArr +} + +func Test_readAccounts(t *testing.T) { + t.Run("Empty", func(t *testing.T) { + arr := make([]proxy.ContractLiveliness, 0) + deleteHistory := make(map[common.Address]bool) + del, res := readAccounts(arr, &deleteHistory) + if len(del) != 0 || len(res) != 0 { + t.Fatalf("should return empty arrays") + } + if len(deleteHistory) != 0 { + t.Fatalf("deleteHistory should be empty") + } + }) + + t.Run("Deletion", func(t *testing.T) { + arr := make([]proxy.ContractLiveliness, 0) + deleteHistory := make(map[common.Address]bool) + + arr = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: true}) + del, res := readAccounts(arr, &deleteHistory) + if len(del) != 1 || len(res) != 0 { + t.Fatalf("should return empty arrays") + } + if !deleteHistory[common.HexToAddress("0x1")] { + t.Fatalf("deleteHistory should have 0x1 deleted") + } + }) + + t.Run("DeletionAndResurrection", func(t *testing.T) { + arr := make([]proxy.ContractLiveliness, 0) + deleteHistory := make(map[common.Address]bool) + + arr = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: true}) + arr = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: false}) + del, res := readAccounts(arr, &deleteHistory) + if len(del) != 0 || len(res) != 1 { + t.Fatalf("should return empty deletion array and 1 resurrected") + } + if deleteHistory[common.HexToAddress("0x1")] { + t.Fatalf("deleteHistory should have 0x1 resurrected") + } + }) + + t.Run("DeletionResurrectionDeletion", func(t *testing.T) { + arr := make([]proxy.ContractLiveliness, 0) + deleteHistory := make(map[common.Address]bool) + + arr = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: true}) + arr = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: false}) + arr = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: true}) + del, res := readAccounts(arr, &deleteHistory) + if len(del) != 1 || len(res) != 0 { + t.Fatalf("should return empty deletion array and 1 resurrected") + } + if !deleteHistory[common.HexToAddress("0x1")] { + t.Fatalf("deleteHistory should have 0x1 deleted") + } + }) + + t.Run("DeletionResurrectionSplit", func(t *testing.T) { + arr := make([]proxy.ContractLiveliness, 0) + deleteHistory := make(map[common.Address]bool) + arr = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: true}) + _, _ = readAccounts(arr, &deleteHistory) + + // second run + arr2 := make([]proxy.ContractLiveliness, 0) + arr2 = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: false}) + del, res := readAccounts(arr2, &deleteHistory) + if len(del) != 0 || len(res) != 1 { + t.Fatalf("should return empty deletion array and 1 resurrected") + } + if deleteHistory[common.HexToAddress("0x1")] { + t.Fatalf("deleteHistory should have 0x1 deleted") + } + }) + + t.Run("ResurrectionDeletionResurrection", func(t *testing.T) { + arr := make([]proxy.ContractLiveliness, 0) + deleteHistory := make(map[common.Address]bool) + deleteHistory[common.HexToAddress("0x1")] = true + + arr = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: false}) + arr = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: true}) + del, res := readAccounts(arr, &deleteHistory) + if len(del) != 1 || len(res) != 0 { + t.Fatalf("should return empty deletion array and 1 resurrected") + } + if !deleteHistory[common.HexToAddress("0x1")] { + t.Fatalf("deleteHistory should have 0x1 deleted") + } + }) +} + +// TODO trace why this test fails +//func Test_resultCollector(t *testing.T) { +// +// wg := &sync.WaitGroup{} +// cfg := &utils.Config{Workers: 100, ChainID: 250} +// stopChan := make(chan struct{}) +// +// // channel for each worker to get tasks for processing +// workerOutputChannels := make(map[int]chan txLivelinessResult) +// for i := 0; i < cfg.Workers; i++ { +// workerOutputChannels[i] = make(chan txLivelinessResult) +// go func(workerId int) { +// // make proxy.ContractLiveliness map +// cll := make([]proxy.ContractLiveliness, 0) +// cll = append(cll, proxy.ContractLiveliness{Addr: common.HexToAddress(fmt.Sprintf("0x%x", workerId)), IsDeleted: true}) +// workerOutputChannels[workerId] <- txLivelinessResult{liveliness: cll} +// close(workerOutputChannels[workerId]) +// }(i) +// } +// +// res := resultCollector(wg, cfg, workerOutputChannels, stopChan) +// +// currentBlk := 0 +// for r := range res { +// if !r.liveliness[0].IsDeleted { +// t.Fatalf("results are in incorrect order") +// } +// +// if r.liveliness[0].Addr != common.HexToAddress(fmt.Sprintf("0x%x", currentBlk)) { +// t.Fatalf("results are in incorrect order liveliness") +// } +// currentBlk++ +// } +//} From 856d2f297d89e4f5b6e299199257795b7dbf30bb Mon Sep 17 00:00:00 2001 From: matej Date: Mon, 22 Apr 2024 15:19:30 +0200 Subject: [PATCH 2/7] Fix race conditions --- state/proxy/deletion.go | 4 +- utildb/gen_deleted_accounts.go | 10 ++-- utildb/gen_deleted_accounts_test.go | 76 ++++++++++++++--------------- 3 files changed, 43 insertions(+), 47 deletions(-) diff --git a/state/proxy/deletion.go b/state/proxy/deletion.go index e6827ba99..2fd6fa9ae 100644 --- a/state/proxy/deletion.go +++ b/state/proxy/deletion.go @@ -40,11 +40,11 @@ type DeletionProxy struct { } // NewDeletionProxy creates a new StateDB proxy. -func NewDeletionProxy(db state.StateDB, ch chan ContractLiveliness, logLevel string) *DeletionProxy { +func NewDeletionProxy(db state.StateDB, ch chan ContractLiveliness, log logger.Logger) *DeletionProxy { r := new(DeletionProxy) r.db = db r.ch = ch - r.log = logger.NewLogger(logLevel, "Proxy Deletion") + r.log = log return r } diff --git a/utildb/gen_deleted_accounts.go b/utildb/gen_deleted_accounts.go index 921c071da..51d53872b 100644 --- a/utildb/gen_deleted_accounts.go +++ b/utildb/gen_deleted_accounts.go @@ -83,7 +83,7 @@ func readAccounts(cllArr []proxy.ContractLiveliness, deleteHistory *map[common.A // genDeletedAccountsTask process a transaction substate then records self-destructed accounts // and resurrected accounts to a database. -func genDeletedAccountsTask(tx *substate.Transaction, processor executor.TxProcessor, cfg *utils.Config) ([]proxy.ContractLiveliness, error) { +func genDeletedAccountsTask(tx *substate.Transaction, processor executor.TxProcessor, cfg *utils.Config, log logger.Logger) ([]proxy.ContractLiveliness, error) { ch := make(chan proxy.ContractLiveliness, channelSize) var statedb state.StateDB var err error @@ -95,7 +95,7 @@ func genDeletedAccountsTask(tx *substate.Transaction, processor executor.TxProce } //wrapper - statedb = proxy.NewDeletionProxy(statedb, ch, cfg.LogLevel) + statedb = proxy.NewDeletionProxy(statedb, ch, log) _, err = processor.ProcessTransaction(statedb, int(tx.Block), tx.Transaction, ss) if err != nil { @@ -139,7 +139,7 @@ func GenDeletedAccountsAction(cfg *utils.Config, ddb *substate.DestroyedAccountD workerInputChannels := taskFeeder(&wg, cfg, iter, lastBlock, stopChan, log) // prepare workers to process transactions - workerOutputChannels := launchWorkers(&wg, cfg, workerInputChannels, processor, stopChan, errChan) + workerOutputChannels := launchWorkers(&wg, cfg, workerInputChannels, processor, stopChan, errChan, log) // collect results from workers and orders them orderedResults := resultCollector(&wg, cfg, workerOutputChannels, stopChan) @@ -221,7 +221,7 @@ func resultCollector(wg *sync.WaitGroup, cfg *utils.Config, workerOutputChannels } // launchWorkers lauches workers to process transactions in parallel. -func launchWorkers(wg *sync.WaitGroup, cfg *utils.Config, workerInputChannels map[int]chan *substate.Transaction, processor executor.TxProcessor, stopChan chan struct{}, errChan chan error) map[int]chan txLivelinessResult { +func launchWorkers(wg *sync.WaitGroup, cfg *utils.Config, workerInputChannels map[int]chan *substate.Transaction, processor executor.TxProcessor, stopChan chan struct{}, errChan chan error, log logger.Logger) map[int]chan txLivelinessResult { // channel for each worker to send results workerOutputChannels := make(map[int]chan txLivelinessResult) for i := 0; i < cfg.Workers; i++ { @@ -245,7 +245,7 @@ func launchWorkers(wg *sync.WaitGroup, cfg *utils.Config, workerInputChannels ma return } // Process sorted transactions - livelinessArr, err := genDeletedAccountsTask(tx, processor, cfg) + livelinessArr, err := genDeletedAccountsTask(tx, processor, cfg, log) if err != nil { errChan <- err return diff --git a/utildb/gen_deleted_accounts_test.go b/utildb/gen_deleted_accounts_test.go index 7e5df33ff..eb37f5daf 100644 --- a/utildb/gen_deleted_accounts_test.go +++ b/utildb/gen_deleted_accounts_test.go @@ -62,12 +62,12 @@ func Test_launchWorkersParallel(t *testing.T) { workerInputChannels := make(map[int]chan *substate.Transaction) for i := 0; i < cfg.Workers; i++ { workerInputChannels[i] = make(chan *substate.Transaction) - go func(workerId int) { + go func(workerId int, workerIn chan *substate.Transaction) { // 2 transactions for each worker - workerInputChannels[workerId] <- &substate.Transaction{Block: uint64(workerId), Substate: testTx[workerId]} - workerInputChannels[workerId] <- &substate.Transaction{Block: uint64(workerId + cfg.Workers), Substate: testTx[workerId+cfg.Workers]} - close(workerInputChannels[workerId]) - }(i) + workerIn <- &substate.Transaction{Block: uint64(workerId), Substate: testTx[workerId]} + workerIn <- &substate.Transaction{Block: uint64(workerId + cfg.Workers), Substate: testTx[workerId+cfg.Workers]} + close(workerIn) + }(i, workerInputChannels[i]) } processor.EXPECT().ProcessTransaction(gomock.Any(), 0, gomock.Any(), substatecontext.NewTxContext(testTx[0])).Return(nil, nil) @@ -77,7 +77,7 @@ func Test_launchWorkersParallel(t *testing.T) { processor.EXPECT().ProcessTransaction(gomock.Any(), 4, gomock.Any(), substatecontext.NewTxContext(testTx[4])).Return(nil, nil) processor.EXPECT().ProcessTransaction(gomock.Any(), 5, gomock.Any(), substatecontext.NewTxContext(testTx[5])).Return(nil, nil) - outPut := launchWorkers(wg, cfg, workerInputChannels, processor, stopChan, errChan) + outPut := launchWorkers(wg, cfg, workerInputChannels, processor, stopChan, errChan, nil) var orderCheck uint64 = 0 // 2 transactions for each worker @@ -210,37 +210,33 @@ func Test_readAccounts(t *testing.T) { }) } -// TODO trace why this test fails -//func Test_resultCollector(t *testing.T) { -// -// wg := &sync.WaitGroup{} -// cfg := &utils.Config{Workers: 100, ChainID: 250} -// stopChan := make(chan struct{}) -// -// // channel for each worker to get tasks for processing -// workerOutputChannels := make(map[int]chan txLivelinessResult) -// for i := 0; i < cfg.Workers; i++ { -// workerOutputChannels[i] = make(chan txLivelinessResult) -// go func(workerId int) { -// // make proxy.ContractLiveliness map -// cll := make([]proxy.ContractLiveliness, 0) -// cll = append(cll, proxy.ContractLiveliness{Addr: common.HexToAddress(fmt.Sprintf("0x%x", workerId)), IsDeleted: true}) -// workerOutputChannels[workerId] <- txLivelinessResult{liveliness: cll} -// close(workerOutputChannels[workerId]) -// }(i) -// } -// -// res := resultCollector(wg, cfg, workerOutputChannels, stopChan) -// -// currentBlk := 0 -// for r := range res { -// if !r.liveliness[0].IsDeleted { -// t.Fatalf("results are in incorrect order") -// } -// -// if r.liveliness[0].Addr != common.HexToAddress(fmt.Sprintf("0x%x", currentBlk)) { -// t.Fatalf("results are in incorrect order liveliness") -// } -// currentBlk++ -// } -//} +func Test_resultCollector(t *testing.T) { + + wg := &sync.WaitGroup{} + cfg := &utils.Config{Workers: 100, ChainID: 250} + stopChan := make(chan struct{}) + + // channel for each worker to get tasks for processing + workerOutputChannels := make(map[int]chan txLivelinessResult) + for i := 0; i < cfg.Workers; i++ { + workerOutputChannels[i] = make(chan txLivelinessResult) + go func(workerId int, workerOut chan txLivelinessResult) { + workerOut <- txLivelinessResult{liveliness: []proxy.ContractLiveliness{{Addr: common.HexToAddress(fmt.Sprintf("0x%x", workerId)), IsDeleted: true}}} + close(workerOutputChannels[workerId]) + }(i, workerOutputChannels[i]) + } + + res := resultCollector(wg, cfg, workerOutputChannels, stopChan) + + currentBlk := 0 + for r := range res { + if !r.liveliness[0].IsDeleted { + t.Fatalf("results are in incorrect order") + } + + if r.liveliness[0].Addr != common.HexToAddress(fmt.Sprintf("0x%x", currentBlk)) { + t.Fatalf("results are in incorrect order liveliness") + } + currentBlk++ + } +} From ba0fa4859663a677070314cbf25786824ddbbc8b Mon Sep 17 00:00:00 2001 From: matej Date: Tue, 23 Apr 2024 08:50:22 +0200 Subject: [PATCH 3/7] Rewrite stopChan to abort event --- utildb/gen_deleted_accounts.go | 64 +++++----- utildb/gen_deleted_accounts_test.go | 187 +++++++++++++++++++++------- 2 files changed, 175 insertions(+), 76 deletions(-) diff --git a/utildb/gen_deleted_accounts.go b/utildb/gen_deleted_accounts.go index 51d53872b..4e94ae1b1 100644 --- a/utildb/gen_deleted_accounts.go +++ b/utildb/gen_deleted_accounts.go @@ -99,7 +99,7 @@ func genDeletedAccountsTask(tx *substate.Transaction, processor executor.TxProce _, err = processor.ProcessTransaction(statedb, int(tx.Block), tx.Transaction, ss) if err != nil { - return nil, nil + return nil, err } close(ch) @@ -126,31 +126,31 @@ func GenDeletedAccountsAction(cfg *utils.Config, ddb *substate.DestroyedAccountD processor := executor.MakeTxProcessor(cfg) wg := sync.WaitGroup{} - stopChan := make(chan struct{}) + abort := utils.MakeEvent() errChan := make(chan error) iter := substate.NewSubstateIterator(firstBlock, cfg.Workers) defer iter.Release() //error handling routine - encounteredErrors := errorHandler(stopChan, errChan) + encounteredErrors := errorHandler(abort, errChan) // feeder to send tasks to workers - workerInputChannels := taskFeeder(&wg, cfg, iter, lastBlock, stopChan, log) + workerInputChannels := taskFeeder(&wg, cfg, iter, lastBlock, abort, log) // prepare workers to process transactions - workerOutputChannels := launchWorkers(&wg, cfg, workerInputChannels, processor, stopChan, errChan, log) + workerOutputChannels := txProcessor(&wg, cfg, workerInputChannels, processor, abort, errChan, log) // collect results from workers and orders them - orderedResults := resultCollector(&wg, cfg, workerOutputChannels, stopChan) + orderedResults := resultCollector(&wg, cfg, workerOutputChannels, abort) // process ordered txLivelinessResults - resolveDeletionsAndResurrections(ddb, orderedResults, stopChan, errChan) + resolveDeletionsAndResurrections(ddb, orderedResults, abort, errChan) // wait until feeder, workers and collector are done wg.Wait() - // notify error handler to stop + // notify error handler to stop listening close(errChan) utils.StopCPUProfile(cfg) @@ -161,7 +161,7 @@ func GenDeletedAccountsAction(cfg *utils.Config, ddb *substate.DestroyedAccountD } // resolveDeletionsAndResurrections reads txLivelinessResults and resolves deletions and resurrections. -func resolveDeletionsAndResurrections(ddb *substate.DestroyedAccountDB, orderedResults chan txLivelinessResult, stopChan chan struct{}, errChan chan error) { +func resolveDeletionsAndResurrections(ddb *substate.DestroyedAccountDB, orderedResults chan txLivelinessResult, abort utils.Event, errChan chan error) { var deleteHistory = make(map[common.Address]bool) defer func() { // explicitly set to nil to release memory as soon as possible @@ -170,7 +170,7 @@ func resolveDeletionsAndResurrections(ddb *substate.DestroyedAccountDB, orderedR for { select { - case <-stopChan: + case <-abort.Wait(): return case contract, ok := <-orderedResults: { @@ -191,7 +191,7 @@ func resolveDeletionsAndResurrections(ddb *substate.DestroyedAccountDB, orderedR } // resultCollector collects results from workers in round-robin fashion and sends them to a single channel. -func resultCollector(wg *sync.WaitGroup, cfg *utils.Config, workerOutputChannels map[int]chan txLivelinessResult, stopChan chan struct{}) chan txLivelinessResult { +func resultCollector(wg *sync.WaitGroup, cfg *utils.Config, workerOutputChannels map[int]chan txLivelinessResult, abort utils.Event) chan txLivelinessResult { orderedResults := make(chan txLivelinessResult, cfg.Workers) wg.Add(1) go func() { @@ -202,7 +202,7 @@ func resultCollector(wg *sync.WaitGroup, cfg *utils.Config, workerOutputChannels for { for i := 0; i < cfg.Workers; i++ { select { - case <-stopChan: + case <-abort.Wait(): return case res, ok := <-workerOutputChannels[i]: if !ok { @@ -211,7 +211,11 @@ func resultCollector(wg *sync.WaitGroup, cfg *utils.Config, workerOutputChannels // filter out txs with no liveliness actions if res.liveliness != nil && len(res.liveliness) > 0 { - orderedResults <- res + select { + case <-abort.Wait(): + return + case orderedResults <- res: + } } } } @@ -220,8 +224,8 @@ func resultCollector(wg *sync.WaitGroup, cfg *utils.Config, workerOutputChannels return orderedResults } -// launchWorkers lauches workers to process transactions in parallel. -func launchWorkers(wg *sync.WaitGroup, cfg *utils.Config, workerInputChannels map[int]chan *substate.Transaction, processor executor.TxProcessor, stopChan chan struct{}, errChan chan error, log logger.Logger) map[int]chan txLivelinessResult { +// txProcessor launches workers to process transactions in parallel. +func txProcessor(wg *sync.WaitGroup, cfg *utils.Config, workerInputChannels map[int]chan *substate.Transaction, processor executor.TxProcessor, abort utils.Event, errChan chan error, log logger.Logger) map[int]chan txLivelinessResult { // channel for each worker to send results workerOutputChannels := make(map[int]chan txLivelinessResult) for i := 0; i < cfg.Workers; i++ { @@ -238,7 +242,7 @@ func launchWorkers(wg *sync.WaitGroup, cfg *utils.Config, workerInputChannels ma for { select { - case <-stopChan: + case <-abort.Wait(): return case tx, ok := <-workerInputChannels[workerId]: if !ok { @@ -252,7 +256,7 @@ func launchWorkers(wg *sync.WaitGroup, cfg *utils.Config, workerInputChannels ma } select { - case <-stopChan: + case <-abort.Wait(): return case workerOutputChannels[workerId] <- txLivelinessResult{livelinessArr, tx}: } @@ -265,7 +269,7 @@ func launchWorkers(wg *sync.WaitGroup, cfg *utils.Config, workerInputChannels ma } // taskFeeder feeds tasks to workers in round-robin fashion. -func taskFeeder(wg *sync.WaitGroup, cfg *utils.Config, iter substate.SubstateIterator, lastBlock uint64, stopChan chan struct{}, log logger.Logger) map[int]chan *substate.Transaction { +func taskFeeder(wg *sync.WaitGroup, cfg *utils.Config, iter substate.SubstateIterator, lastBlock uint64, abort utils.Event, log logger.Logger) map[int]chan *substate.Transaction { wg.Add(1) // channel for each worker to get tasks for processing @@ -293,7 +297,7 @@ func taskFeeder(wg *sync.WaitGroup, cfg *utils.Config, iter substate.SubstateIte nextWorkerIndex := 0 for iter.Next() { select { - case <-stopChan: + case <-abort.Wait(): return default: } @@ -304,7 +308,7 @@ func taskFeeder(wg *sync.WaitGroup, cfg *utils.Config, iter substate.SubstateIte diff := sec - lastSec if tx.Block > lastBlock { - log.Infof("gen-del-acc: Total elapsed time: %.0f s, (Total ~%.1f Tx/s)", sec, float64(txCount)/sec) + log.Noticef("gen-del-acc: Total elapsed time: %.0f s, (Total ~%.1f Tx/s)", sec, float64(txCount)/sec) break } @@ -319,7 +323,7 @@ func taskFeeder(wg *sync.WaitGroup, cfg *utils.Config, iter substate.SubstateIte if tx.Transaction < utils.PseudoTx && tx.Substate.Result.Status == types.ReceiptStatusSuccessful { // if not pseodo tx and completed successfully, send task to next worker in round-robin select { - case <-stopChan: + case <-abort.Wait(): return case workerInputChannels[nextWorkerIndex] <- tx: nextWorkerIndex = (nextWorkerIndex + 1) % cfg.Workers @@ -332,21 +336,15 @@ func taskFeeder(wg *sync.WaitGroup, cfg *utils.Config, iter substate.SubstateIte } // errorHandler collects errors from workers and returns them as a single error -// while closing the stopChan to signal other routines to stop. -func errorHandler(stopChan chan struct{}, errChan chan error) chan error { +// while using abort to signal other routines to stop. +func errorHandler(abort utils.Event, errChan chan error) chan error { encounteredErrors := make(chan error) go func() { defer close(encounteredErrors) var result error - firstErr := true - defer func() { - if firstErr { - close(stopChan) - firstErr = false - } - }() + defer abort.Signal() for { err, ok := <-errChan @@ -354,10 +352,8 @@ func errorHandler(stopChan chan struct{}, errChan chan error) chan error { encounteredErrors <- result return } - if firstErr { - close(stopChan) - firstErr = false - } + + abort.Signal() result = errors.Join(result, err) } diff --git a/utildb/gen_deleted_accounts_test.go b/utildb/gen_deleted_accounts_test.go index eb37f5daf..030795b2e 100644 --- a/utildb/gen_deleted_accounts_test.go +++ b/utildb/gen_deleted_accounts_test.go @@ -5,6 +5,7 @@ import ( "math/big" "sync" "testing" + "time" "github.com/Fantom-foundation/Aida/executor" "github.com/Fantom-foundation/Aida/state/proxy" @@ -17,9 +18,9 @@ import ( ) func Test_errorHandlerAllErrorsAreMerged(t *testing.T) { - stopChan := make(chan struct{}) + abort := utils.MakeEvent() errChan := make(chan error, 2) - encounteredErrors := errorHandler(stopChan, errChan) + encounteredErrors := errorHandler(abort, errChan) errChan <- fmt.Errorf("error1") errChan <- fmt.Errorf("error2") close(errChan) @@ -27,10 +28,26 @@ func Test_errorHandlerAllErrorsAreMerged(t *testing.T) { assert.Equal(t, "error1\nerror2", got.Error()) } +func Test_errorHandlerSendsAbortSignal(t *testing.T) { + abort := utils.MakeEvent() + errChan := make(chan error, 2) + encounteredErrors := errorHandler(abort, errChan) + errChan <- fmt.Errorf("error1") + close(errChan) + got := <-encounteredErrors + assert.Equal(t, "error1", got.Error()) + + select { + case <-abort.Wait(): + default: + t.Errorf("abort signal should be sent") + } +} + func Test_errorHandlerResultGetsClosed(t *testing.T) { - stopChan := make(chan struct{}) + abort := utils.MakeEvent() errChan := make(chan error, 2) - encounteredErrors := errorHandler(stopChan, errChan) + encounteredErrors := errorHandler(abort, errChan) close(errChan) err, ok := <-encounteredErrors if !ok { @@ -46,26 +63,30 @@ func Test_errorHandlerResultGetsClosed(t *testing.T) { } } -func Test_launchWorkersParallel(t *testing.T) { +func Test_launchWorkersParallelAbortsOnSignal(t *testing.T) { ctrl := gomock.NewController(t) processor := executor.NewMockTxProcessor(ctrl) wg := &sync.WaitGroup{} - cfg := &utils.Config{Workers: 3, ChainID: 250} + cfg := &utils.Config{Workers: 2, ChainID: 250} - stopChan := make(chan struct{}) errChan := make(chan error) + abort := utils.MakeEvent() - testTx := makeTestTx(6) + testTx := makeTestTx(4) // channel for each worker to get tasks for processing workerInputChannels := make(map[int]chan *substate.Transaction) for i := 0; i < cfg.Workers; i++ { workerInputChannels[i] = make(chan *substate.Transaction) go func(workerId int, workerIn chan *substate.Transaction) { - // 2 transactions for each worker - workerIn <- &substate.Transaction{Block: uint64(workerId), Substate: testTx[workerId]} - workerIn <- &substate.Transaction{Block: uint64(workerId + cfg.Workers), Substate: testTx[workerId+cfg.Workers]} + for k := 0; ; k++ { + index := workerId + k*cfg.Workers + if index >= len(testTx) { + break + } + workerIn <- &substate.Transaction{Block: uint64(index), Substate: testTx[index]} + } close(workerIn) }(i, workerInputChannels[i]) } @@ -73,19 +94,75 @@ func Test_launchWorkersParallel(t *testing.T) { processor.EXPECT().ProcessTransaction(gomock.Any(), 0, gomock.Any(), substatecontext.NewTxContext(testTx[0])).Return(nil, nil) processor.EXPECT().ProcessTransaction(gomock.Any(), 1, gomock.Any(), substatecontext.NewTxContext(testTx[1])).Return(nil, nil) processor.EXPECT().ProcessTransaction(gomock.Any(), 2, gomock.Any(), substatecontext.NewTxContext(testTx[2])).Return(nil, nil) - processor.EXPECT().ProcessTransaction(gomock.Any(), 3, gomock.Any(), substatecontext.NewTxContext(testTx[3])).Return(nil, nil) - processor.EXPECT().ProcessTransaction(gomock.Any(), 4, gomock.Any(), substatecontext.NewTxContext(testTx[4])).Return(nil, nil) - processor.EXPECT().ProcessTransaction(gomock.Any(), 5, gomock.Any(), substatecontext.NewTxContext(testTx[5])).Return(nil, nil) + // block 3 is missing because of aborting + + outPut := txProcessor(wg, cfg, workerInputChannels, processor, abort, errChan, nil) + + _, ok := <-outPut[0] + if !ok { + t.Fatalf("output channel should be open") + } + + // wait for second worker processing + time.Sleep(100 * time.Millisecond) - outPut := launchWorkers(wg, cfg, workerInputChannels, processor, stopChan, errChan, nil) + // abort before allowing processing of block 3 + abort.Signal() + + wg.Wait() + ctrl.Finish() +} + +func Test_launchWorkersParallelCorrectOutputOrder(t *testing.T) { + ctrl := gomock.NewController(t) + processor := executor.NewMockTxProcessor(ctrl) + + wg := &sync.WaitGroup{} + cfg := &utils.Config{Workers: utils.GetRandom(2, 5), ChainID: 250} + + errChan := make(chan error) + + testSize := cfg.Workers*utils.GetRandom(100, 1000) + utils.GetRandom(0, cfg.Workers-1) + testTx := makeTestTx(testSize) + if len(testTx) != testSize { + t.Fatalf("internal test error: testTx size is incorrect") + } + + // channel for each worker to get tasks for processing + workerInputChannels := make(map[int]chan *substate.Transaction) + for i := 0; i < cfg.Workers; i++ { + workerInputChannels[i] = make(chan *substate.Transaction) + go func(workerId int, workerIn chan *substate.Transaction) { + for k := 0; ; k++ { + index := workerId + k*cfg.Workers + if index >= len(testTx) { + break + } + + workerIn <- &substate.Transaction{Block: uint64(index), Substate: testTx[index]} + } + + close(workerIn) + }(i, workerInputChannels[i]) + } + + for i := 0; i < len(testTx); i++ { + processor.EXPECT().ProcessTransaction(gomock.Any(), i, gomock.Any(), substatecontext.NewTxContext(testTx[i])).Return(nil, nil) + } + + outPut := txProcessor(wg, cfg, workerInputChannels, processor, utils.MakeEvent(), errChan, nil) var orderCheck uint64 = 0 - // 2 transactions for each worker - for k := 0; k < 2; k++ { + +loop: + for { for i := 0; i < cfg.Workers; i++ { r, ok := <-outPut[i] if !ok { - t.Fatalf("results channel should be open") + if int(orderCheck) != testSize { + t.Fatalf("results are missing got: %d, expected: %d", orderCheck, testSize) + } + break loop } if orderCheck != r.tx.Block { t.Fatalf("results are in incorrect order") @@ -93,30 +170,11 @@ func Test_launchWorkersParallel(t *testing.T) { orderCheck++ } } - wg.Wait() + wg.Wait() ctrl.Finish() } -// makeTestTx creates dummy substates that will be processed without crashing. -func makeTestTx(count int) []*substate.Substate { - testTxArr := make([]*substate.Substate, count) - for i := 0; i < count; i++ { - var testTx = &substate.Substate{ - Env: &substate.SubstateEnv{}, - Message: &substate.SubstateMessage{ - Gas: 10000, - GasPrice: big.NewInt(0), - }, - Result: &substate.SubstateResult{ - GasUsed: 1, - }, - } - testTxArr[i] = testTx - } - return testTxArr -} - func Test_readAccounts(t *testing.T) { t.Run("Empty", func(t *testing.T) { arr := make([]proxy.ContractLiveliness, 0) @@ -210,11 +268,9 @@ func Test_readAccounts(t *testing.T) { }) } -func Test_resultCollector(t *testing.T) { - +func Test_resultCollectorCorrectResultOrder(t *testing.T) { wg := &sync.WaitGroup{} cfg := &utils.Config{Workers: 100, ChainID: 250} - stopChan := make(chan struct{}) // channel for each worker to get tasks for processing workerOutputChannels := make(map[int]chan txLivelinessResult) @@ -226,7 +282,7 @@ func Test_resultCollector(t *testing.T) { }(i, workerOutputChannels[i]) } - res := resultCollector(wg, cfg, workerOutputChannels, stopChan) + res := resultCollector(wg, cfg, workerOutputChannels, utils.MakeEvent()) currentBlk := 0 for r := range res { @@ -240,3 +296,50 @@ func Test_resultCollector(t *testing.T) { currentBlk++ } } + +func Test_resultCollectorAbortsOnSignal(t *testing.T) { + wg := &sync.WaitGroup{} + cfg := &utils.Config{Workers: 100, ChainID: 250} + + abort := utils.MakeEvent() + + // channel for each worker to get tasks for processing + workerOutputChannels := make(map[int]chan txLivelinessResult) + for i := 0; i < cfg.Workers; i++ { + workerOutputChannels[i] = make(chan txLivelinessResult) + go func(workerId int, workerOut chan txLivelinessResult) { + workerOut <- txLivelinessResult{liveliness: []proxy.ContractLiveliness{{Addr: common.HexToAddress(fmt.Sprintf("0x%x", workerId)), IsDeleted: true}}} + close(workerOutputChannels[workerId]) + }(i, workerOutputChannels[i]) + } + + res := resultCollector(wg, cfg, workerOutputChannels, abort) + + currentBlk := 0 + for range res { + if currentBlk == 50 { + abort.Signal() + break + } + currentBlk++ + } +} + +// makeTestTx creates dummy substates that will be processed without crashing. +func makeTestTx(count int) []*substate.Substate { + testTxArr := make([]*substate.Substate, count) + for i := 0; i < count; i++ { + var testTx = &substate.Substate{ + Env: &substate.SubstateEnv{}, + Message: &substate.SubstateMessage{ + Gas: 10000, + GasPrice: big.NewInt(0), + }, + Result: &substate.SubstateResult{ + GasUsed: 1, + }, + } + testTxArr[i] = testTx + } + return testTxArr +} From 32c08107863cf88aff949494d029e19943a5608c Mon Sep 17 00:00:00 2001 From: matej Date: Tue, 23 Apr 2024 20:12:44 +0200 Subject: [PATCH 4/7] Change maps of chans to slice --- utildb/gen_deleted_accounts.go | 10 +++++----- utildb/gen_deleted_accounts_test.go | 18 +++++++++--------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/utildb/gen_deleted_accounts.go b/utildb/gen_deleted_accounts.go index 4e94ae1b1..d8fab06f6 100644 --- a/utildb/gen_deleted_accounts.go +++ b/utildb/gen_deleted_accounts.go @@ -191,7 +191,7 @@ func resolveDeletionsAndResurrections(ddb *substate.DestroyedAccountDB, orderedR } // resultCollector collects results from workers in round-robin fashion and sends them to a single channel. -func resultCollector(wg *sync.WaitGroup, cfg *utils.Config, workerOutputChannels map[int]chan txLivelinessResult, abort utils.Event) chan txLivelinessResult { +func resultCollector(wg *sync.WaitGroup, cfg *utils.Config, workerOutputChannels []chan txLivelinessResult, abort utils.Event) chan txLivelinessResult { orderedResults := make(chan txLivelinessResult, cfg.Workers) wg.Add(1) go func() { @@ -225,9 +225,9 @@ func resultCollector(wg *sync.WaitGroup, cfg *utils.Config, workerOutputChannels } // txProcessor launches workers to process transactions in parallel. -func txProcessor(wg *sync.WaitGroup, cfg *utils.Config, workerInputChannels map[int]chan *substate.Transaction, processor executor.TxProcessor, abort utils.Event, errChan chan error, log logger.Logger) map[int]chan txLivelinessResult { +func txProcessor(wg *sync.WaitGroup, cfg *utils.Config, workerInputChannels []chan *substate.Transaction, processor executor.TxProcessor, abort utils.Event, errChan chan error, log logger.Logger) []chan txLivelinessResult { // channel for each worker to send results - workerOutputChannels := make(map[int]chan txLivelinessResult) + workerOutputChannels := make([]chan txLivelinessResult, cfg.Workers) for i := 0; i < cfg.Workers; i++ { workerOutputChannels[i] = make(chan txLivelinessResult) } @@ -269,11 +269,11 @@ func txProcessor(wg *sync.WaitGroup, cfg *utils.Config, workerInputChannels map[ } // taskFeeder feeds tasks to workers in round-robin fashion. -func taskFeeder(wg *sync.WaitGroup, cfg *utils.Config, iter substate.SubstateIterator, lastBlock uint64, abort utils.Event, log logger.Logger) map[int]chan *substate.Transaction { +func taskFeeder(wg *sync.WaitGroup, cfg *utils.Config, iter substate.SubstateIterator, lastBlock uint64, abort utils.Event, log logger.Logger) []chan *substate.Transaction { wg.Add(1) // channel for each worker to get tasks for processing - workerInputChannels := make(map[int]chan *substate.Transaction) + workerInputChannels := make([]chan *substate.Transaction, cfg.Workers) for i := 0; i < cfg.Workers; i++ { workerInputChannels[i] = make(chan *substate.Transaction) } diff --git a/utildb/gen_deleted_accounts_test.go b/utildb/gen_deleted_accounts_test.go index 030795b2e..b57c51053 100644 --- a/utildb/gen_deleted_accounts_test.go +++ b/utildb/gen_deleted_accounts_test.go @@ -76,7 +76,7 @@ func Test_launchWorkersParallelAbortsOnSignal(t *testing.T) { testTx := makeTestTx(4) // channel for each worker to get tasks for processing - workerInputChannels := make(map[int]chan *substate.Transaction) + workerInputChannels := make([]chan *substate.Transaction, cfg.Workers) for i := 0; i < cfg.Workers; i++ { workerInputChannels[i] = make(chan *substate.Transaction) go func(workerId int, workerIn chan *substate.Transaction) { @@ -129,7 +129,7 @@ func Test_launchWorkersParallelCorrectOutputOrder(t *testing.T) { } // channel for each worker to get tasks for processing - workerInputChannels := make(map[int]chan *substate.Transaction) + workerInputChannels := make([]chan *substate.Transaction, cfg.Workers) for i := 0; i < cfg.Workers; i++ { workerInputChannels[i] = make(chan *substate.Transaction) go func(workerId int, workerIn chan *substate.Transaction) { @@ -254,16 +254,16 @@ func Test_readAccounts(t *testing.T) { t.Run("ResurrectionDeletionResurrection", func(t *testing.T) { arr := make([]proxy.ContractLiveliness, 0) deleteHistory := make(map[common.Address]bool) - deleteHistory[common.HexToAddress("0x1")] = true + deleteHistory[common.HexToAddress("0x1")] = false - arr = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: false}) arr = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: true}) + arr = append(arr, proxy.ContractLiveliness{Addr: common.HexToAddress("0x1"), IsDeleted: false}) del, res := readAccounts(arr, &deleteHistory) - if len(del) != 1 || len(res) != 0 { + if len(del) != 0 || len(res) != 1 { t.Fatalf("should return empty deletion array and 1 resurrected") } - if !deleteHistory[common.HexToAddress("0x1")] { - t.Fatalf("deleteHistory should have 0x1 deleted") + if deleteHistory[common.HexToAddress("0x1")] { + t.Fatalf("deleteHistory should have 0x1 ressurected") } }) } @@ -273,7 +273,7 @@ func Test_resultCollectorCorrectResultOrder(t *testing.T) { cfg := &utils.Config{Workers: 100, ChainID: 250} // channel for each worker to get tasks for processing - workerOutputChannels := make(map[int]chan txLivelinessResult) + workerOutputChannels := make([]chan txLivelinessResult, cfg.Workers) for i := 0; i < cfg.Workers; i++ { workerOutputChannels[i] = make(chan txLivelinessResult) go func(workerId int, workerOut chan txLivelinessResult) { @@ -304,7 +304,7 @@ func Test_resultCollectorAbortsOnSignal(t *testing.T) { abort := utils.MakeEvent() // channel for each worker to get tasks for processing - workerOutputChannels := make(map[int]chan txLivelinessResult) + workerOutputChannels := make([]chan txLivelinessResult, cfg.Workers) for i := 0; i < cfg.Workers; i++ { workerOutputChannels[i] = make(chan txLivelinessResult) go func(workerId int, workerOut chan txLivelinessResult) { From 971448c19e184dd7a875460046d9a42637347aa4 Mon Sep 17 00:00:00 2001 From: matej Date: Tue, 23 Apr 2024 20:57:24 +0200 Subject: [PATCH 5/7] Remove negligible optimization --- utildb/gen_deleted_accounts.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/utildb/gen_deleted_accounts.go b/utildb/gen_deleted_accounts.go index d8fab06f6..9eb86997c 100644 --- a/utildb/gen_deleted_accounts.go +++ b/utildb/gen_deleted_accounts.go @@ -163,10 +163,6 @@ func GenDeletedAccountsAction(cfg *utils.Config, ddb *substate.DestroyedAccountD // resolveDeletionsAndResurrections reads txLivelinessResults and resolves deletions and resurrections. func resolveDeletionsAndResurrections(ddb *substate.DestroyedAccountDB, orderedResults chan txLivelinessResult, abort utils.Event, errChan chan error) { var deleteHistory = make(map[common.Address]bool) - defer func() { - // explicitly set to nil to release memory as soon as possible - deleteHistory = nil - }() for { select { From 1ec9365b2af7dc00dd4e614c36c4855f1af12eb8 Mon Sep 17 00:00:00 2001 From: matej Date: Wed, 15 May 2024 11:40:29 +0200 Subject: [PATCH 6/7] Check block transaction context errors --- executor/extension/primer/primer.go | 5 ++++- utils/prime_ctx.go | 22 +++++++++++++++++----- utils/statedb.go | 20 ++++++++++++++++---- 3 files changed, 37 insertions(+), 10 deletions(-) diff --git a/executor/extension/primer/primer.go b/executor/extension/primer/primer.go index 415f64c4c..5329465f4 100644 --- a/executor/extension/primer/primer.go +++ b/executor/extension/primer/primer.go @@ -168,7 +168,10 @@ func (p *stateDbPrimer[T]) prime(stateDb state.StateDB) error { return fmt.Errorf("cannot generate update-set; %w", err) } if hasPrimed { - p.ctx.SuicideAccounts(stateDb, deletedAccounts) + err = p.ctx.SuicideAccounts(stateDb, deletedAccounts) + if err != nil { + return err + } } if err = p.ctx.PrimeStateDB(substatecontext.NewWorldState(update), stateDb); err != nil { return fmt.Errorf("cannot prime state-db; %w", err) diff --git a/utils/prime_ctx.go b/utils/prime_ctx.go index c110a63b8..77f57fbb5 100644 --- a/utils/prime_ctx.go +++ b/utils/prime_ctx.go @@ -231,11 +231,17 @@ func (pc *PrimeContext) PrimeStateDBRandom(ws txcontext.WorldState, db state.Sta } // SuicideAccounts clears storage of all input accounts. -func (pc *PrimeContext) SuicideAccounts(db state.StateDB, accounts []common.Address) { +func (pc *PrimeContext) SuicideAccounts(db state.StateDB, accounts []common.Address) error { count := 0 db.BeginSyncPeriod(0) - db.BeginBlock(pc.block) - db.BeginTransaction(0) + err := db.BeginBlock(pc.block) + if err != nil { + return fmt.Errorf("DeleteDestroyedAccounts BeginBlock: %w", err) + } + err = db.BeginTransaction(0) + if err != nil { + return fmt.Errorf("DeleteDestroyedAccounts BeginTransaction: %w", err) + } for _, addr := range accounts { if db.Exist(addr) { db.Suicide(addr) @@ -244,8 +250,14 @@ func (pc *PrimeContext) SuicideAccounts(db state.StateDB, accounts []common.Addr pc.exist[addr] = false } } - db.EndTransaction() - db.EndBlock() + err = db.EndTransaction() + if err != nil { + return fmt.Errorf("DeleteDestroyedAccounts EndTransaction: %w", err) + } + err = db.EndBlock() + if err != nil { + return fmt.Errorf("DeleteDestroyedAccounts EndBlock: %w", err) + } db.EndSyncPeriod() pc.block++ pc.log.Infof("\t\t %v suicided accounts were removed from statedb (before priming).", count) diff --git a/utils/statedb.go b/utils/statedb.go index ecb48166b..cbbc50243 100644 --- a/utils/statedb.go +++ b/utils/statedb.go @@ -260,14 +260,26 @@ func DeleteDestroyedAccountsFromStateDB(db state.StateDB, cfg *Config, target ui return nil } db.BeginSyncPeriod(0) - db.BeginBlock(target) - db.BeginTransaction(0) + err = db.BeginBlock(target) + if err != nil { + return fmt.Errorf("DeleteDestroyedAccounts BeginBlock: %w", err) + } + err = db.BeginTransaction(0) + if err != nil { + return fmt.Errorf("DeleteDestroyedAccounts BeginTransaction: %w", err) + } for _, addr := range accounts { db.Suicide(addr) log.Debugf("Perform suicide on %v", addr) } - db.EndTransaction() - db.EndBlock() + err = db.EndTransaction() + if err != nil { + return fmt.Errorf("DeleteDestroyedAccounts EndTransaction: %w", err) + } + err = db.EndBlock() + if err != nil { + return fmt.Errorf("DeleteDestroyedAccounts EndBlock: %w", err) + } db.EndSyncPeriod() return nil } From 943c4d9d4122673cc8b38f7629a0a785478fe1ee Mon Sep 17 00:00:00 2001 From: matej Date: Wed, 15 May 2024 11:58:36 +0200 Subject: [PATCH 7/7] Fix return --- utils/prime_ctx.go | 1 + 1 file changed, 1 insertion(+) diff --git a/utils/prime_ctx.go b/utils/prime_ctx.go index 77f57fbb5..d9c485780 100644 --- a/utils/prime_ctx.go +++ b/utils/prime_ctx.go @@ -261,6 +261,7 @@ func (pc *PrimeContext) SuicideAccounts(db state.StateDB, accounts []common.Addr db.EndSyncPeriod() pc.block++ pc.log.Infof("\t\t %v suicided accounts were removed from statedb (before priming).", count) + return nil } func (pc *PrimeContext) GetBlock() uint64 {