diff --git a/hw05_parallel_execution/.sync b/hw05_parallel_execution/.sync deleted file mode 100644 index e69de29..0000000 diff --git a/hw05_parallel_execution/go.mod b/hw05_parallel_execution/go.mod index c311b79..e30e89c 100644 --- a/hw05_parallel_execution/go.mod +++ b/hw05_parallel_execution/go.mod @@ -1,6 +1,6 @@ -module github.com/fixme_my_friend/hw05_parallel_execution +module github.com/DimVlas/otus_hw/hw05_parallel_execution -go 1.19 +go 1.22.2 require ( github.com/stretchr/testify v1.7.0 diff --git a/hw05_parallel_execution/run.go b/hw05_parallel_execution/run.go index 9e3193f..bc48a8f 100644 --- a/hw05_parallel_execution/run.go +++ b/hw05_parallel_execution/run.go @@ -2,14 +2,136 @@ package hw05parallelexecution import ( "errors" + "sync" + "sync/atomic" + "time" ) -var ErrErrorsLimitExceeded = errors.New("errors limit exceeded") +var ( + ErrErrorsLimitExceeded = errors.New("errors limit exceeded") + ErrEmptyTasks = errors.New("empty task list") +) type Task func() error // Run starts tasks in n goroutines and stops its work when receiving m errors from tasks. func Run(tasks []Task, n, m int) error { - // Place your code here. + if len(tasks) == 0 { + return ErrEmptyTasks + } + if m <= 0 { + return ErrErrorsLimitExceeded + } + + var ( + wg sync.WaitGroup + + errMaxCount = int32(m) // максимально допустимое кол-во ошибок + workerMaxCount = int32(n) // максимальное возможное кол-во воркеров + errCount atomic.Int32 // текущее кол-во ошибок + workerCount atomic.Int32 // текущее кол-во работающих воркеров + ) + + if len(tasks) < n { + workerMaxCount = int32(len(tasks)) + } + + for _, task := range tasks { + // если количество работающих тасков меньше workerMaxCount -> запускаем таск + // иначе, ждем 10 млсек + for workerCount.Load() >= workerMaxCount { + time.Sleep(time.Millisecond * 10) + } + + if errCount.Load() >= errMaxCount { // если кол-во ошибок больше допустимого + // прекращаем запускать воркеры + break + } + + workerCount.Add(1) + wg.Add(1) + go func(t Task) { + defer func() { + workerCount.Add(-1) + wg.Done() + }() + + err := t() + if err != nil { + errCount.Add(1) + } + }(task) + } + + wg.Wait() + + if errCount.Load() >= errMaxCount { + return ErrErrorsLimitExceeded + } + return nil } + +// Run Tasks with channels. +func RunChan(tasks []Task, n, m int) error { + if len(tasks) == 0 { + return ErrEmptyTasks + } + + if m <= 0 { + return ErrErrorsLimitExceeded + } + + workerMaxCount := n + // если кол-во задач меньше кол-ва воркеров, ограничиваем кол-во воркеров + if len(tasks) < n { + workerMaxCount = len(tasks) + } + + tasksCh := make(chan Task) // канал задач. + errorCh := make(chan error, workerMaxCount) // канал ошибок. + wg := sync.WaitGroup{} + + defer func() { + close(tasksCh) + wg.Wait() + close(errorCh) + }() + + // запускаем воркеры + for i := 0; i < workerMaxCount; i++ { + wg.Add(1) + go doWork(&wg, tasksCh, errorCh) + } + + errCount := 0 // текущее кол-во ошибок + + for i := 0; i < len(tasks); { + select { + case <-errorCh: + errCount++ + if errCount >= m { + return ErrErrorsLimitExceeded + } + case tasksCh <- tasks[i]: + i++ + } + } + + return nil +} + +// tasks - канал задач. +// errs - канал ошибок. +func doWork(wg *sync.WaitGroup, tasks <-chan Task, errs chan<- error) { + defer func() { + wg.Done() + }() + + for task := range tasks { + err := task() + if err != nil { + errs <- err + } + } +} diff --git a/hw05_parallel_execution/run_test.go b/hw05_parallel_execution/run_test.go index a7069d7..086c2bc 100644 --- a/hw05_parallel_execution/run_test.go +++ b/hw05_parallel_execution/run_test.go @@ -3,6 +3,7 @@ package hw05parallelexecution import ( "errors" "fmt" + "log" "math/rand" "sync/atomic" "testing" @@ -67,4 +68,236 @@ func TestRun(t *testing.T) { require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed") require.LessOrEqual(t, int64(elapsedTime), int64(sumTime/2), "tasks were run sequentially?") }) + + t.Run("all tasks without elapsedTime", func(t *testing.T) { + tasksCount := 50 + tasks := make([]Task, 0, tasksCount) + + var runTasksCount int32 + var sumTime time.Duration + var err error + + for i := 0; i < tasksCount; i++ { + taskSleep := time.Millisecond * time.Duration(rand.Intn(100)) + sumTime += taskSleep + + tasks = append(tasks, func() error { + time.Sleep(taskSleep) + atomic.AddInt32(&runTasksCount, 1) + return nil + }) + } + + workersCount := 5 + maxErrorsCount := 1 + + require.Eventually(t, func() bool { + err = Run(tasks, workersCount, maxErrorsCount) + return err == nil + }, time.Duration(int64(sumTime/2)), time.Duration(int64(sumTime/1000)), "tasks were run sequentially?") + + require.NoError(t, err) + }) + + t.Run("empty tasks", func(t *testing.T) { + tasks := make([]Task, 0) + + err := Run(tasks, 1, 1) + + require.Truef(t, errors.Is(err, ErrEmptyTasks), "actual err - %v", err) + }) + + t.Run("task count less workers", func(t *testing.T) { + tasksCount := 5 + tasks := make([]Task, 0, tasksCount) + + var runTasksCount int32 + var sumTime time.Duration + + for i := 0; i < tasksCount; i++ { + taskSleep := time.Millisecond * time.Duration(rand.Intn(100)) + sumTime += taskSleep + + tasks = append(tasks, func() error { + time.Sleep(taskSleep) + atomic.AddInt32(&runTasksCount, 1) + return nil + }) + } + + workersCount := 10 + maxErrorsCount := 10 + err := Run(tasks, workersCount, maxErrorsCount) + + require.NoError(t, err) + require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed") + }) + + t.Run("zero errors", func(t *testing.T) { + tasksCount := 50 + tasks := make([]Task, 0, tasksCount) + + var runTasksCount int32 + + for i := 0; i < tasksCount; i++ { + if i == 5 { + tasks = append(tasks, func() error { + time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) + atomic.AddInt32(&runTasksCount, 1) + return nil + }) + } + } + + workersCount := 5 + maxErrorsCount := 0 + err := Run(tasks, workersCount, maxErrorsCount) + + require.Truef(t, errors.Is(err, ErrErrorsLimitExceeded), "actual err - %v", err) + }) +} + +func TestRunChan(t *testing.T) { + defer goleak.VerifyNone(t) + + t.Run("if were errors in first M tasks, than finished not more N+M tasks", func(t *testing.T) { + tasksCount := 50 + tasks := make([]Task, 0, tasksCount) + + var runTasksCount int32 + + for i := 0; i < tasksCount; i++ { + err := fmt.Errorf("error from task %d", i) + tasks = append(tasks, func() error { + time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) + atomic.AddInt32(&runTasksCount, 1) + return err + }) + } + + workersCount := 10 + maxErrorsCount := 23 + err := RunChan(tasks, workersCount, maxErrorsCount) + log.Println(err) + + require.Truef(t, errors.Is(err, ErrErrorsLimitExceeded), "actual err - %v", err) + require.LessOrEqual(t, runTasksCount, int32(workersCount+maxErrorsCount), "extra tasks were started") + }) + + t.Run("tasks without errors", func(t *testing.T) { + tasksCount := 50 + tasks := make([]Task, 0, tasksCount) + + var runTasksCount int32 + var sumTime time.Duration + + for i := 0; i < tasksCount; i++ { + taskSleep := time.Millisecond * time.Duration(rand.Intn(100)) + sumTime += taskSleep + + tasks = append(tasks, func() error { + time.Sleep(taskSleep) + atomic.AddInt32(&runTasksCount, 1) + return nil + }) + } + + workersCount := 5 + maxErrorsCount := 1 + + start := time.Now() + err := RunChan(tasks, workersCount, maxErrorsCount) + elapsedTime := time.Since(start) + require.NoError(t, err) + + require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed") + require.LessOrEqual(t, int64(elapsedTime), int64(sumTime/2), "tasks were run sequentially?") + }) + + t.Run("all tasks without elapsedTime", func(t *testing.T) { + tasksCount := 50 + tasks := make([]Task, 0, tasksCount) + + var runTasksCount int32 + var sumTime time.Duration + var err error + + for i := 0; i < tasksCount; i++ { + taskSleep := time.Millisecond * time.Duration(rand.Intn(100)) + sumTime += taskSleep + + tasks = append(tasks, func() error { + time.Sleep(taskSleep) + atomic.AddInt32(&runTasksCount, 1) + return nil + }) + } + + workersCount := 5 + maxErrorsCount := 1 + + require.Eventually(t, func() bool { + err = RunChan(tasks, workersCount, maxErrorsCount) + return err == nil + }, time.Duration(int64(sumTime/2)), time.Duration(int64(sumTime/1000)), "tasks were run sequentially?") + + require.NoError(t, err) + }) + + t.Run("empty tasks", func(t *testing.T) { + tasks := make([]Task, 0) + + err := RunChan(tasks, 1, 1) + + require.Truef(t, errors.Is(err, ErrEmptyTasks), "actual err - %v", err) + }) + + t.Run("task count less workers", func(t *testing.T) { + tasksCount := 5 + tasks := make([]Task, 0, tasksCount) + + var runTasksCount int32 + var sumTime time.Duration + + for i := 0; i < tasksCount; i++ { + taskSleep := time.Millisecond * time.Duration(rand.Intn(100)) + sumTime += taskSleep + + tasks = append(tasks, func() error { + time.Sleep(taskSleep) + atomic.AddInt32(&runTasksCount, 1) + return nil + }) + } + + workersCount := 10 + maxErrorsCount := 10 + err := RunChan(tasks, workersCount, maxErrorsCount) + + require.NoError(t, err) + require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed") + }) + + t.Run("zero errors", func(t *testing.T) { + tasksCount := 50 + tasks := make([]Task, 0, tasksCount) + + var runTasksCount int32 + + for i := 0; i < tasksCount; i++ { + if i == 5 { + tasks = append(tasks, func() error { + time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) + atomic.AddInt32(&runTasksCount, 1) + return nil + }) + } + } + + workersCount := 5 + maxErrorsCount := 0 + err := RunChan(tasks, workersCount, maxErrorsCount) + + require.Truef(t, errors.Is(err, ErrErrorsLimitExceeded), "actual err - %v", err) + }) }