From 046e47cf9532ffa9d7cb5699e946d68504432194 Mon Sep 17 00:00:00 2001 From: DimVlas Date: Wed, 22 May 2024 22:55:05 +0300 Subject: [PATCH 1/4] HW5 is completed --- hw05_parallel_execution/.sync | 0 hw05_parallel_execution/go.mod | 4 +- hw05_parallel_execution/run.go | 126 ++++++++++++++- hw05_parallel_execution/run_test.go | 233 ++++++++++++++++++++++++++++ 4 files changed, 359 insertions(+), 4 deletions(-) delete mode 100644 hw05_parallel_execution/.sync diff --git a/hw05_parallel_execution/.sync b/hw05_parallel_execution/.sync deleted file mode 100644 index e69de29..0000000 diff --git a/hw05_parallel_execution/go.mod b/hw05_parallel_execution/go.mod index c311b79..e30e89c 100644 --- a/hw05_parallel_execution/go.mod +++ b/hw05_parallel_execution/go.mod @@ -1,6 +1,6 @@ -module github.com/fixme_my_friend/hw05_parallel_execution +module github.com/DimVlas/otus_hw/hw05_parallel_execution -go 1.19 +go 1.22.2 require ( github.com/stretchr/testify v1.7.0 diff --git a/hw05_parallel_execution/run.go b/hw05_parallel_execution/run.go index 9e3193f..bc48a8f 100644 --- a/hw05_parallel_execution/run.go +++ b/hw05_parallel_execution/run.go @@ -2,14 +2,136 @@ package hw05parallelexecution import ( "errors" + "sync" + "sync/atomic" + "time" ) -var ErrErrorsLimitExceeded = errors.New("errors limit exceeded") +var ( + ErrErrorsLimitExceeded = errors.New("errors limit exceeded") + ErrEmptyTasks = errors.New("empty task list") +) type Task func() error // Run starts tasks in n goroutines and stops its work when receiving m errors from tasks. func Run(tasks []Task, n, m int) error { - // Place your code here. + if len(tasks) == 0 { + return ErrEmptyTasks + } + if m <= 0 { + return ErrErrorsLimitExceeded + } + + var ( + wg sync.WaitGroup + + errMaxCount = int32(m) // максимально допустимое кол-во ошибок + workerMaxCount = int32(n) // максимальное возможное кол-во воркеров + errCount atomic.Int32 // текущее кол-во ошибок + workerCount atomic.Int32 // текущее кол-во работающих воркеров + ) + + if len(tasks) < n { + workerMaxCount = int32(len(tasks)) + } + + for _, task := range tasks { + // если количество работающих тасков меньше workerMaxCount -> запускаем таск + // иначе, ждем 10 млсек + for workerCount.Load() >= workerMaxCount { + time.Sleep(time.Millisecond * 10) + } + + if errCount.Load() >= errMaxCount { // если кол-во ошибок больше допустимого + // прекращаем запускать воркеры + break + } + + workerCount.Add(1) + wg.Add(1) + go func(t Task) { + defer func() { + workerCount.Add(-1) + wg.Done() + }() + + err := t() + if err != nil { + errCount.Add(1) + } + }(task) + } + + wg.Wait() + + if errCount.Load() >= errMaxCount { + return ErrErrorsLimitExceeded + } + return nil } + +// Run Tasks with channels. +func RunChan(tasks []Task, n, m int) error { + if len(tasks) == 0 { + return ErrEmptyTasks + } + + if m <= 0 { + return ErrErrorsLimitExceeded + } + + workerMaxCount := n + // если кол-во задач меньше кол-ва воркеров, ограничиваем кол-во воркеров + if len(tasks) < n { + workerMaxCount = len(tasks) + } + + tasksCh := make(chan Task) // канал задач. + errorCh := make(chan error, workerMaxCount) // канал ошибок. + wg := sync.WaitGroup{} + + defer func() { + close(tasksCh) + wg.Wait() + close(errorCh) + }() + + // запускаем воркеры + for i := 0; i < workerMaxCount; i++ { + wg.Add(1) + go doWork(&wg, tasksCh, errorCh) + } + + errCount := 0 // текущее кол-во ошибок + + for i := 0; i < len(tasks); { + select { + case <-errorCh: + errCount++ + if errCount >= m { + return ErrErrorsLimitExceeded + } + case tasksCh <- tasks[i]: + i++ + } + } + + return nil +} + +// tasks - канал задач. +// errs - канал ошибок. +func doWork(wg *sync.WaitGroup, tasks <-chan Task, errs chan<- error) { + defer func() { + wg.Done() + }() + + for task := range tasks { + err := task() + if err != nil { + errs <- err + } + } +} diff --git a/hw05_parallel_execution/run_test.go b/hw05_parallel_execution/run_test.go index a7069d7..086c2bc 100644 --- a/hw05_parallel_execution/run_test.go +++ b/hw05_parallel_execution/run_test.go @@ -3,6 +3,7 @@ package hw05parallelexecution import ( "errors" "fmt" + "log" "math/rand" "sync/atomic" "testing" @@ -67,4 +68,236 @@ func TestRun(t *testing.T) { require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed") require.LessOrEqual(t, int64(elapsedTime), int64(sumTime/2), "tasks were run sequentially?") }) + + t.Run("all tasks without elapsedTime", func(t *testing.T) { + tasksCount := 50 + tasks := make([]Task, 0, tasksCount) + + var runTasksCount int32 + var sumTime time.Duration + var err error + + for i := 0; i < tasksCount; i++ { + taskSleep := time.Millisecond * time.Duration(rand.Intn(100)) + sumTime += taskSleep + + tasks = append(tasks, func() error { + time.Sleep(taskSleep) + atomic.AddInt32(&runTasksCount, 1) + return nil + }) + } + + workersCount := 5 + maxErrorsCount := 1 + + require.Eventually(t, func() bool { + err = Run(tasks, workersCount, maxErrorsCount) + return err == nil + }, time.Duration(int64(sumTime/2)), time.Duration(int64(sumTime/1000)), "tasks were run sequentially?") + + require.NoError(t, err) + }) + + t.Run("empty tasks", func(t *testing.T) { + tasks := make([]Task, 0) + + err := Run(tasks, 1, 1) + + require.Truef(t, errors.Is(err, ErrEmptyTasks), "actual err - %v", err) + }) + + t.Run("task count less workers", func(t *testing.T) { + tasksCount := 5 + tasks := make([]Task, 0, tasksCount) + + var runTasksCount int32 + var sumTime time.Duration + + for i := 0; i < tasksCount; i++ { + taskSleep := time.Millisecond * time.Duration(rand.Intn(100)) + sumTime += taskSleep + + tasks = append(tasks, func() error { + time.Sleep(taskSleep) + atomic.AddInt32(&runTasksCount, 1) + return nil + }) + } + + workersCount := 10 + maxErrorsCount := 10 + err := Run(tasks, workersCount, maxErrorsCount) + + require.NoError(t, err) + require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed") + }) + + t.Run("zero errors", func(t *testing.T) { + tasksCount := 50 + tasks := make([]Task, 0, tasksCount) + + var runTasksCount int32 + + for i := 0; i < tasksCount; i++ { + if i == 5 { + tasks = append(tasks, func() error { + time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) + atomic.AddInt32(&runTasksCount, 1) + return nil + }) + } + } + + workersCount := 5 + maxErrorsCount := 0 + err := Run(tasks, workersCount, maxErrorsCount) + + require.Truef(t, errors.Is(err, ErrErrorsLimitExceeded), "actual err - %v", err) + }) +} + +func TestRunChan(t *testing.T) { + defer goleak.VerifyNone(t) + + t.Run("if were errors in first M tasks, than finished not more N+M tasks", func(t *testing.T) { + tasksCount := 50 + tasks := make([]Task, 0, tasksCount) + + var runTasksCount int32 + + for i := 0; i < tasksCount; i++ { + err := fmt.Errorf("error from task %d", i) + tasks = append(tasks, func() error { + time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) + atomic.AddInt32(&runTasksCount, 1) + return err + }) + } + + workersCount := 10 + maxErrorsCount := 23 + err := RunChan(tasks, workersCount, maxErrorsCount) + log.Println(err) + + require.Truef(t, errors.Is(err, ErrErrorsLimitExceeded), "actual err - %v", err) + require.LessOrEqual(t, runTasksCount, int32(workersCount+maxErrorsCount), "extra tasks were started") + }) + + t.Run("tasks without errors", func(t *testing.T) { + tasksCount := 50 + tasks := make([]Task, 0, tasksCount) + + var runTasksCount int32 + var sumTime time.Duration + + for i := 0; i < tasksCount; i++ { + taskSleep := time.Millisecond * time.Duration(rand.Intn(100)) + sumTime += taskSleep + + tasks = append(tasks, func() error { + time.Sleep(taskSleep) + atomic.AddInt32(&runTasksCount, 1) + return nil + }) + } + + workersCount := 5 + maxErrorsCount := 1 + + start := time.Now() + err := RunChan(tasks, workersCount, maxErrorsCount) + elapsedTime := time.Since(start) + require.NoError(t, err) + + require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed") + require.LessOrEqual(t, int64(elapsedTime), int64(sumTime/2), "tasks were run sequentially?") + }) + + t.Run("all tasks without elapsedTime", func(t *testing.T) { + tasksCount := 50 + tasks := make([]Task, 0, tasksCount) + + var runTasksCount int32 + var sumTime time.Duration + var err error + + for i := 0; i < tasksCount; i++ { + taskSleep := time.Millisecond * time.Duration(rand.Intn(100)) + sumTime += taskSleep + + tasks = append(tasks, func() error { + time.Sleep(taskSleep) + atomic.AddInt32(&runTasksCount, 1) + return nil + }) + } + + workersCount := 5 + maxErrorsCount := 1 + + require.Eventually(t, func() bool { + err = RunChan(tasks, workersCount, maxErrorsCount) + return err == nil + }, time.Duration(int64(sumTime/2)), time.Duration(int64(sumTime/1000)), "tasks were run sequentially?") + + require.NoError(t, err) + }) + + t.Run("empty tasks", func(t *testing.T) { + tasks := make([]Task, 0) + + err := RunChan(tasks, 1, 1) + + require.Truef(t, errors.Is(err, ErrEmptyTasks), "actual err - %v", err) + }) + + t.Run("task count less workers", func(t *testing.T) { + tasksCount := 5 + tasks := make([]Task, 0, tasksCount) + + var runTasksCount int32 + var sumTime time.Duration + + for i := 0; i < tasksCount; i++ { + taskSleep := time.Millisecond * time.Duration(rand.Intn(100)) + sumTime += taskSleep + + tasks = append(tasks, func() error { + time.Sleep(taskSleep) + atomic.AddInt32(&runTasksCount, 1) + return nil + }) + } + + workersCount := 10 + maxErrorsCount := 10 + err := RunChan(tasks, workersCount, maxErrorsCount) + + require.NoError(t, err) + require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed") + }) + + t.Run("zero errors", func(t *testing.T) { + tasksCount := 50 + tasks := make([]Task, 0, tasksCount) + + var runTasksCount int32 + + for i := 0; i < tasksCount; i++ { + if i == 5 { + tasks = append(tasks, func() error { + time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) + atomic.AddInt32(&runTasksCount, 1) + return nil + }) + } + } + + workersCount := 5 + maxErrorsCount := 0 + err := RunChan(tasks, workersCount, maxErrorsCount) + + require.Truef(t, errors.Is(err, ErrErrorsLimitExceeded), "actual err - %v", err) + }) } From c6b11629b39bebcb3011f30247b8ebfe73f8abd3 Mon Sep 17 00:00:00 2001 From: DimVlas Date: Wed, 22 May 2024 23:03:12 +0300 Subject: [PATCH 2/4] go.mod. Edit Go Version --- hw05_parallel_execution/go.mod | 2 +- hw05_parallel_execution/run_test.go | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/hw05_parallel_execution/go.mod b/hw05_parallel_execution/go.mod index e30e89c..6fe7b42 100644 --- a/hw05_parallel_execution/go.mod +++ b/hw05_parallel_execution/go.mod @@ -1,6 +1,6 @@ module github.com/DimVlas/otus_hw/hw05_parallel_execution -go 1.22.2 +go 1.19 require ( github.com/stretchr/testify v1.7.0 diff --git a/hw05_parallel_execution/run_test.go b/hw05_parallel_execution/run_test.go index 086c2bc..706de16 100644 --- a/hw05_parallel_execution/run_test.go +++ b/hw05_parallel_execution/run_test.go @@ -3,7 +3,6 @@ package hw05parallelexecution import ( "errors" "fmt" - "log" "math/rand" "sync/atomic" "testing" @@ -178,7 +177,6 @@ func TestRunChan(t *testing.T) { workersCount := 10 maxErrorsCount := 23 err := RunChan(tasks, workersCount, maxErrorsCount) - log.Println(err) require.Truef(t, errors.Is(err, ErrErrorsLimitExceeded), "actual err - %v", err) require.LessOrEqual(t, runTasksCount, int32(workersCount+maxErrorsCount), "extra tasks were started") From c70c4f4391127fd0c0f102da9edda8bac2ba1166 Mon Sep 17 00:00:00 2001 From: DimVlas Date: Sat, 25 May 2024 05:38:48 +0300 Subject: [PATCH 3/4] HW5 is completed. Chan --- hw05_parallel_execution/run.go | 95 +++++------------- hw05_parallel_execution/run_test.go | 144 ---------------------------- 2 files changed, 22 insertions(+), 217 deletions(-) diff --git a/hw05_parallel_execution/run.go b/hw05_parallel_execution/run.go index bc48a8f..bc47884 100644 --- a/hw05_parallel_execution/run.go +++ b/hw05_parallel_execution/run.go @@ -3,8 +3,6 @@ package hw05parallelexecution import ( "errors" "sync" - "sync/atomic" - "time" ) var ( @@ -14,66 +12,8 @@ var ( type Task func() error -// Run starts tasks in n goroutines and stops its work when receiving m errors from tasks. -func Run(tasks []Task, n, m int) error { - if len(tasks) == 0 { - return ErrEmptyTasks - } - if m <= 0 { - return ErrErrorsLimitExceeded - } - - var ( - wg sync.WaitGroup - - errMaxCount = int32(m) // максимально допустимое кол-во ошибок - workerMaxCount = int32(n) // максимальное возможное кол-во воркеров - errCount atomic.Int32 // текущее кол-во ошибок - workerCount atomic.Int32 // текущее кол-во работающих воркеров - ) - - if len(tasks) < n { - workerMaxCount = int32(len(tasks)) - } - - for _, task := range tasks { - // если количество работающих тасков меньше workerMaxCount -> запускаем таск - // иначе, ждем 10 млсек - for workerCount.Load() >= workerMaxCount { - time.Sleep(time.Millisecond * 10) - } - - if errCount.Load() >= errMaxCount { // если кол-во ошибок больше допустимого - // прекращаем запускать воркеры - break - } - - workerCount.Add(1) - wg.Add(1) - go func(t Task) { - defer func() { - workerCount.Add(-1) - wg.Done() - }() - - err := t() - if err != nil { - errCount.Add(1) - } - }(task) - } - - wg.Wait() - - if errCount.Load() >= errMaxCount { - return ErrErrorsLimitExceeded - } - - return nil -} - // Run Tasks with channels. -func RunChan(tasks []Task, n, m int) error { +func Run(tasks []Task, n, m int) error { if len(tasks) == 0 { return ErrEmptyTasks } @@ -88,14 +28,18 @@ func RunChan(tasks []Task, n, m int) error { workerMaxCount = len(tasks) } - tasksCh := make(chan Task) // канал задач. - errorCh := make(chan error, workerMaxCount) // канал ошибок. + tasksCh := make(chan Task) // канал задач. + stopErrCh := make(chan struct{}) // сигнальный канал. + errorCh := make(chan error) // канал ошибок. wg := sync.WaitGroup{} + wgErr := sync.WaitGroup{} defer func() { close(tasksCh) + close(stopErrCh) wg.Wait() close(errorCh) + wgErr.Wait() }() // запускаем воркеры @@ -104,25 +48,30 @@ func RunChan(tasks []Task, n, m int) error { go doWork(&wg, tasksCh, errorCh) } - errCount := 0 // текущее кол-во ошибок + wgErr.Add(1) + go func(wg *sync.WaitGroup) { + defer wg.Done() - for i := 0; i < len(tasks); { - select { - case <-errorCh: + errCount := 0 // текущее кол-во ошибок + for range errorCh { errCount++ - if errCount >= m { - return ErrErrorsLimitExceeded + if errCount == m { + stopErrCh <- struct{}{} } - case tasksCh <- tasks[i]: - i++ + } + }(&wgErr) + + for _, task := range tasks { + select { + case <-stopErrCh: + return ErrErrorsLimitExceeded + case tasksCh <- task: } } return nil } -// tasks - канал задач. -// errs - канал ошибок. func doWork(wg *sync.WaitGroup, tasks <-chan Task, errs chan<- error) { defer func() { wg.Done() diff --git a/hw05_parallel_execution/run_test.go b/hw05_parallel_execution/run_test.go index 706de16..c3240de 100644 --- a/hw05_parallel_execution/run_test.go +++ b/hw05_parallel_execution/run_test.go @@ -155,147 +155,3 @@ func TestRun(t *testing.T) { require.Truef(t, errors.Is(err, ErrErrorsLimitExceeded), "actual err - %v", err) }) } - -func TestRunChan(t *testing.T) { - defer goleak.VerifyNone(t) - - t.Run("if were errors in first M tasks, than finished not more N+M tasks", func(t *testing.T) { - tasksCount := 50 - tasks := make([]Task, 0, tasksCount) - - var runTasksCount int32 - - for i := 0; i < tasksCount; i++ { - err := fmt.Errorf("error from task %d", i) - tasks = append(tasks, func() error { - time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) - atomic.AddInt32(&runTasksCount, 1) - return err - }) - } - - workersCount := 10 - maxErrorsCount := 23 - err := RunChan(tasks, workersCount, maxErrorsCount) - - require.Truef(t, errors.Is(err, ErrErrorsLimitExceeded), "actual err - %v", err) - require.LessOrEqual(t, runTasksCount, int32(workersCount+maxErrorsCount), "extra tasks were started") - }) - - t.Run("tasks without errors", func(t *testing.T) { - tasksCount := 50 - tasks := make([]Task, 0, tasksCount) - - var runTasksCount int32 - var sumTime time.Duration - - for i := 0; i < tasksCount; i++ { - taskSleep := time.Millisecond * time.Duration(rand.Intn(100)) - sumTime += taskSleep - - tasks = append(tasks, func() error { - time.Sleep(taskSleep) - atomic.AddInt32(&runTasksCount, 1) - return nil - }) - } - - workersCount := 5 - maxErrorsCount := 1 - - start := time.Now() - err := RunChan(tasks, workersCount, maxErrorsCount) - elapsedTime := time.Since(start) - require.NoError(t, err) - - require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed") - require.LessOrEqual(t, int64(elapsedTime), int64(sumTime/2), "tasks were run sequentially?") - }) - - t.Run("all tasks without elapsedTime", func(t *testing.T) { - tasksCount := 50 - tasks := make([]Task, 0, tasksCount) - - var runTasksCount int32 - var sumTime time.Duration - var err error - - for i := 0; i < tasksCount; i++ { - taskSleep := time.Millisecond * time.Duration(rand.Intn(100)) - sumTime += taskSleep - - tasks = append(tasks, func() error { - time.Sleep(taskSleep) - atomic.AddInt32(&runTasksCount, 1) - return nil - }) - } - - workersCount := 5 - maxErrorsCount := 1 - - require.Eventually(t, func() bool { - err = RunChan(tasks, workersCount, maxErrorsCount) - return err == nil - }, time.Duration(int64(sumTime/2)), time.Duration(int64(sumTime/1000)), "tasks were run sequentially?") - - require.NoError(t, err) - }) - - t.Run("empty tasks", func(t *testing.T) { - tasks := make([]Task, 0) - - err := RunChan(tasks, 1, 1) - - require.Truef(t, errors.Is(err, ErrEmptyTasks), "actual err - %v", err) - }) - - t.Run("task count less workers", func(t *testing.T) { - tasksCount := 5 - tasks := make([]Task, 0, tasksCount) - - var runTasksCount int32 - var sumTime time.Duration - - for i := 0; i < tasksCount; i++ { - taskSleep := time.Millisecond * time.Duration(rand.Intn(100)) - sumTime += taskSleep - - tasks = append(tasks, func() error { - time.Sleep(taskSleep) - atomic.AddInt32(&runTasksCount, 1) - return nil - }) - } - - workersCount := 10 - maxErrorsCount := 10 - err := RunChan(tasks, workersCount, maxErrorsCount) - - require.NoError(t, err) - require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed") - }) - - t.Run("zero errors", func(t *testing.T) { - tasksCount := 50 - tasks := make([]Task, 0, tasksCount) - - var runTasksCount int32 - - for i := 0; i < tasksCount; i++ { - if i == 5 { - tasks = append(tasks, func() error { - time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) - atomic.AddInt32(&runTasksCount, 1) - return nil - }) - } - } - - workersCount := 5 - maxErrorsCount := 0 - err := RunChan(tasks, workersCount, maxErrorsCount) - - require.Truef(t, errors.Is(err, ErrErrorsLimitExceeded), "actual err - %v", err) - }) -} From 0dad4492b7fc017948dc00799c29b64b42043b17 Mon Sep 17 00:00:00 2001 From: DimVlas Date: Sat, 25 May 2024 05:45:50 +0300 Subject: [PATCH 4/4] HW5 is completed. Chan --- hw05_parallel_execution/run.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/hw05_parallel_execution/run.go b/hw05_parallel_execution/run.go index bc47884..2a29072 100644 --- a/hw05_parallel_execution/run.go +++ b/hw05_parallel_execution/run.go @@ -22,10 +22,8 @@ func Run(tasks []Task, n, m int) error { return ErrErrorsLimitExceeded } - workerMaxCount := n - // если кол-во задач меньше кол-ва воркеров, ограничиваем кол-во воркеров - if len(tasks) < n { - workerMaxCount = len(tasks) + if len(tasks) < n { // если кол-во задач меньше кол-ва воркеров, ограничиваем кол-во воркеров + n = len(tasks) } tasksCh := make(chan Task) // канал задач. @@ -43,7 +41,7 @@ func Run(tasks []Task, n, m int) error { }() // запускаем воркеры - for i := 0; i < workerMaxCount; i++ { + for i := 0; i < n; i++ { wg.Add(1) go doWork(&wg, tasksCh, errorCh) }