diff --git a/hw05_parallel_execution/run.go b/hw05_parallel_execution/run.go index bc48a8f..bc47884 100644 --- a/hw05_parallel_execution/run.go +++ b/hw05_parallel_execution/run.go @@ -3,8 +3,6 @@ package hw05parallelexecution import ( "errors" "sync" - "sync/atomic" - "time" ) var ( @@ -14,66 +12,8 @@ var ( type Task func() error -// Run starts tasks in n goroutines and stops its work when receiving m errors from tasks. -func Run(tasks []Task, n, m int) error { - if len(tasks) == 0 { - return ErrEmptyTasks - } - if m <= 0 { - return ErrErrorsLimitExceeded - } - - var ( - wg sync.WaitGroup - - errMaxCount = int32(m) // максимально допустимое кол-во ошибок - workerMaxCount = int32(n) // максимальное возможное кол-во воркеров - errCount atomic.Int32 // текущее кол-во ошибок - workerCount atomic.Int32 // текущее кол-во работающих воркеров - ) - - if len(tasks) < n { - workerMaxCount = int32(len(tasks)) - } - - for _, task := range tasks { - // если количество работающих тасков меньше workerMaxCount -> запускаем таск - // иначе, ждем 10 млсек - for workerCount.Load() >= workerMaxCount { - time.Sleep(time.Millisecond * 10) - } - - if errCount.Load() >= errMaxCount { // если кол-во ошибок больше допустимого - // прекращаем запускать воркеры - break - } - - workerCount.Add(1) - wg.Add(1) - go func(t Task) { - defer func() { - workerCount.Add(-1) - wg.Done() - }() - - err := t() - if err != nil { - errCount.Add(1) - } - }(task) - } - - wg.Wait() - - if errCount.Load() >= errMaxCount { - return ErrErrorsLimitExceeded - } - - return nil -} - // Run Tasks with channels. -func RunChan(tasks []Task, n, m int) error { +func Run(tasks []Task, n, m int) error { if len(tasks) == 0 { return ErrEmptyTasks } @@ -88,14 +28,18 @@ func RunChan(tasks []Task, n, m int) error { workerMaxCount = len(tasks) } - tasksCh := make(chan Task) // канал задач. - errorCh := make(chan error, workerMaxCount) // канал ошибок. + tasksCh := make(chan Task) // канал задач. + stopErrCh := make(chan struct{}) // сигнальный канал. + errorCh := make(chan error) // канал ошибок. wg := sync.WaitGroup{} + wgErr := sync.WaitGroup{} defer func() { close(tasksCh) + close(stopErrCh) wg.Wait() close(errorCh) + wgErr.Wait() }() // запускаем воркеры @@ -104,25 +48,30 @@ func RunChan(tasks []Task, n, m int) error { go doWork(&wg, tasksCh, errorCh) } - errCount := 0 // текущее кол-во ошибок + wgErr.Add(1) + go func(wg *sync.WaitGroup) { + defer wg.Done() - for i := 0; i < len(tasks); { - select { - case <-errorCh: + errCount := 0 // текущее кол-во ошибок + for range errorCh { errCount++ - if errCount >= m { - return ErrErrorsLimitExceeded + if errCount == m { + stopErrCh <- struct{}{} } - case tasksCh <- tasks[i]: - i++ + } + }(&wgErr) + + for _, task := range tasks { + select { + case <-stopErrCh: + return ErrErrorsLimitExceeded + case tasksCh <- task: } } return nil } -// tasks - канал задач. -// errs - канал ошибок. func doWork(wg *sync.WaitGroup, tasks <-chan Task, errs chan<- error) { defer func() { wg.Done() diff --git a/hw05_parallel_execution/run_test.go b/hw05_parallel_execution/run_test.go index 706de16..c3240de 100644 --- a/hw05_parallel_execution/run_test.go +++ b/hw05_parallel_execution/run_test.go @@ -155,147 +155,3 @@ func TestRun(t *testing.T) { require.Truef(t, errors.Is(err, ErrErrorsLimitExceeded), "actual err - %v", err) }) } - -func TestRunChan(t *testing.T) { - defer goleak.VerifyNone(t) - - t.Run("if were errors in first M tasks, than finished not more N+M tasks", func(t *testing.T) { - tasksCount := 50 - tasks := make([]Task, 0, tasksCount) - - var runTasksCount int32 - - for i := 0; i < tasksCount; i++ { - err := fmt.Errorf("error from task %d", i) - tasks = append(tasks, func() error { - time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) - atomic.AddInt32(&runTasksCount, 1) - return err - }) - } - - workersCount := 10 - maxErrorsCount := 23 - err := RunChan(tasks, workersCount, maxErrorsCount) - - require.Truef(t, errors.Is(err, ErrErrorsLimitExceeded), "actual err - %v", err) - require.LessOrEqual(t, runTasksCount, int32(workersCount+maxErrorsCount), "extra tasks were started") - }) - - t.Run("tasks without errors", func(t *testing.T) { - tasksCount := 50 - tasks := make([]Task, 0, tasksCount) - - var runTasksCount int32 - var sumTime time.Duration - - for i := 0; i < tasksCount; i++ { - taskSleep := time.Millisecond * time.Duration(rand.Intn(100)) - sumTime += taskSleep - - tasks = append(tasks, func() error { - time.Sleep(taskSleep) - atomic.AddInt32(&runTasksCount, 1) - return nil - }) - } - - workersCount := 5 - maxErrorsCount := 1 - - start := time.Now() - err := RunChan(tasks, workersCount, maxErrorsCount) - elapsedTime := time.Since(start) - require.NoError(t, err) - - require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed") - require.LessOrEqual(t, int64(elapsedTime), int64(sumTime/2), "tasks were run sequentially?") - }) - - t.Run("all tasks without elapsedTime", func(t *testing.T) { - tasksCount := 50 - tasks := make([]Task, 0, tasksCount) - - var runTasksCount int32 - var sumTime time.Duration - var err error - - for i := 0; i < tasksCount; i++ { - taskSleep := time.Millisecond * time.Duration(rand.Intn(100)) - sumTime += taskSleep - - tasks = append(tasks, func() error { - time.Sleep(taskSleep) - atomic.AddInt32(&runTasksCount, 1) - return nil - }) - } - - workersCount := 5 - maxErrorsCount := 1 - - require.Eventually(t, func() bool { - err = RunChan(tasks, workersCount, maxErrorsCount) - return err == nil - }, time.Duration(int64(sumTime/2)), time.Duration(int64(sumTime/1000)), "tasks were run sequentially?") - - require.NoError(t, err) - }) - - t.Run("empty tasks", func(t *testing.T) { - tasks := make([]Task, 0) - - err := RunChan(tasks, 1, 1) - - require.Truef(t, errors.Is(err, ErrEmptyTasks), "actual err - %v", err) - }) - - t.Run("task count less workers", func(t *testing.T) { - tasksCount := 5 - tasks := make([]Task, 0, tasksCount) - - var runTasksCount int32 - var sumTime time.Duration - - for i := 0; i < tasksCount; i++ { - taskSleep := time.Millisecond * time.Duration(rand.Intn(100)) - sumTime += taskSleep - - tasks = append(tasks, func() error { - time.Sleep(taskSleep) - atomic.AddInt32(&runTasksCount, 1) - return nil - }) - } - - workersCount := 10 - maxErrorsCount := 10 - err := RunChan(tasks, workersCount, maxErrorsCount) - - require.NoError(t, err) - require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed") - }) - - t.Run("zero errors", func(t *testing.T) { - tasksCount := 50 - tasks := make([]Task, 0, tasksCount) - - var runTasksCount int32 - - for i := 0; i < tasksCount; i++ { - if i == 5 { - tasks = append(tasks, func() error { - time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) - atomic.AddInt32(&runTasksCount, 1) - return nil - }) - } - } - - workersCount := 5 - maxErrorsCount := 0 - err := RunChan(tasks, workersCount, maxErrorsCount) - - require.Truef(t, errors.Is(err, ErrErrorsLimitExceeded), "actual err - %v", err) - }) -}