Skip to content

Commit

Permalink
HW5 is completed
Browse files Browse the repository at this point in the history
  • Loading branch information
DimVlas committed May 22, 2024
1 parent 9c3a7c0 commit 046e47c
Show file tree
Hide file tree
Showing 4 changed files with 359 additions and 4 deletions.
Empty file removed hw05_parallel_execution/.sync
Empty file.
4 changes: 2 additions & 2 deletions hw05_parallel_execution/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/fixme_my_friend/hw05_parallel_execution
module github.com/DimVlas/otus_hw/hw05_parallel_execution

go 1.19
go 1.22.2

require (
github.com/stretchr/testify v1.7.0
Expand Down
126 changes: 124 additions & 2 deletions hw05_parallel_execution/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,136 @@ package hw05parallelexecution

import (
"errors"
"sync"
"sync/atomic"
"time"
)

var ErrErrorsLimitExceeded = errors.New("errors limit exceeded")
var (
ErrErrorsLimitExceeded = errors.New("errors limit exceeded")
ErrEmptyTasks = errors.New("empty task list")
)

type Task func() error

// Run starts tasks in n goroutines and stops its work when receiving m errors from tasks.
func Run(tasks []Task, n, m int) error {
// Place your code here.
if len(tasks) == 0 {
return ErrEmptyTasks
}
if m <= 0 {
return ErrErrorsLimitExceeded
}

var (
wg sync.WaitGroup

errMaxCount = int32(m) // максимально допустимое кол-во ошибок
workerMaxCount = int32(n) // максимальное возможное кол-во воркеров
errCount atomic.Int32 // текущее кол-во ошибок
workerCount atomic.Int32 // текущее кол-во работающих воркеров
)

if len(tasks) < n {
workerMaxCount = int32(len(tasks))
}

for _, task := range tasks {
// если количество работающих тасков меньше workerMaxCount -> запускаем таск
// иначе, ждем 10 млсек
for workerCount.Load() >= workerMaxCount {
time.Sleep(time.Millisecond * 10)
}

if errCount.Load() >= errMaxCount { // если кол-во ошибок больше допустимого
// прекращаем запускать воркеры
break
}

workerCount.Add(1)
wg.Add(1)
go func(t Task) {
defer func() {
workerCount.Add(-1)
wg.Done()
}()

err := t()
if err != nil {
errCount.Add(1)
}
}(task)
}

wg.Wait()

if errCount.Load() >= errMaxCount {
return ErrErrorsLimitExceeded
}

return nil
}

// Run Tasks with channels.
func RunChan(tasks []Task, n, m int) error {
if len(tasks) == 0 {
return ErrEmptyTasks
}

if m <= 0 {
return ErrErrorsLimitExceeded
}

workerMaxCount := n
// если кол-во задач меньше кол-ва воркеров, ограничиваем кол-во воркеров
if len(tasks) < n {
workerMaxCount = len(tasks)
}

tasksCh := make(chan Task) // канал задач.
errorCh := make(chan error, workerMaxCount) // канал ошибок.
wg := sync.WaitGroup{}

defer func() {
close(tasksCh)
wg.Wait()
close(errorCh)
}()

// запускаем воркеры
for i := 0; i < workerMaxCount; i++ {
wg.Add(1)
go doWork(&wg, tasksCh, errorCh)
}

errCount := 0 // текущее кол-во ошибок

for i := 0; i < len(tasks); {
select {
case <-errorCh:
errCount++
if errCount >= m {
return ErrErrorsLimitExceeded
}
case tasksCh <- tasks[i]:
i++
}
}

return nil
}

// tasks - канал задач.
// errs - канал ошибок.
func doWork(wg *sync.WaitGroup, tasks <-chan Task, errs chan<- error) {
defer func() {
wg.Done()
}()

for task := range tasks {
err := task()
if err != nil {
errs <- err
}
}
}
233 changes: 233 additions & 0 deletions hw05_parallel_execution/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package hw05parallelexecution
import (
"errors"
"fmt"
"log"
"math/rand"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -67,4 +68,236 @@ func TestRun(t *testing.T) {
require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed")
require.LessOrEqual(t, int64(elapsedTime), int64(sumTime/2), "tasks were run sequentially?")
})

t.Run("all tasks without elapsedTime", func(t *testing.T) {
tasksCount := 50
tasks := make([]Task, 0, tasksCount)

var runTasksCount int32
var sumTime time.Duration
var err error

for i := 0; i < tasksCount; i++ {
taskSleep := time.Millisecond * time.Duration(rand.Intn(100))
sumTime += taskSleep

tasks = append(tasks, func() error {
time.Sleep(taskSleep)
atomic.AddInt32(&runTasksCount, 1)
return nil
})
}

workersCount := 5
maxErrorsCount := 1

require.Eventually(t, func() bool {
err = Run(tasks, workersCount, maxErrorsCount)
return err == nil
}, time.Duration(int64(sumTime/2)), time.Duration(int64(sumTime/1000)), "tasks were run sequentially?")

require.NoError(t, err)
})

t.Run("empty tasks", func(t *testing.T) {
tasks := make([]Task, 0)

err := Run(tasks, 1, 1)

require.Truef(t, errors.Is(err, ErrEmptyTasks), "actual err - %v", err)
})

t.Run("task count less workers", func(t *testing.T) {
tasksCount := 5
tasks := make([]Task, 0, tasksCount)

var runTasksCount int32
var sumTime time.Duration

for i := 0; i < tasksCount; i++ {
taskSleep := time.Millisecond * time.Duration(rand.Intn(100))
sumTime += taskSleep

tasks = append(tasks, func() error {
time.Sleep(taskSleep)
atomic.AddInt32(&runTasksCount, 1)
return nil
})
}

workersCount := 10
maxErrorsCount := 10
err := Run(tasks, workersCount, maxErrorsCount)

require.NoError(t, err)
require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed")
})

t.Run("zero errors", func(t *testing.T) {
tasksCount := 50
tasks := make([]Task, 0, tasksCount)

var runTasksCount int32

for i := 0; i < tasksCount; i++ {
if i == 5 {
tasks = append(tasks, func() error {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
atomic.AddInt32(&runTasksCount, 1)
return nil
})
}
}

workersCount := 5
maxErrorsCount := 0
err := Run(tasks, workersCount, maxErrorsCount)

require.Truef(t, errors.Is(err, ErrErrorsLimitExceeded), "actual err - %v", err)
})
}

func TestRunChan(t *testing.T) {
defer goleak.VerifyNone(t)

t.Run("if were errors in first M tasks, than finished not more N+M tasks", func(t *testing.T) {
tasksCount := 50
tasks := make([]Task, 0, tasksCount)

var runTasksCount int32

for i := 0; i < tasksCount; i++ {
err := fmt.Errorf("error from task %d", i)
tasks = append(tasks, func() error {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
atomic.AddInt32(&runTasksCount, 1)
return err
})
}

workersCount := 10
maxErrorsCount := 23
err := RunChan(tasks, workersCount, maxErrorsCount)
log.Println(err)

require.Truef(t, errors.Is(err, ErrErrorsLimitExceeded), "actual err - %v", err)
require.LessOrEqual(t, runTasksCount, int32(workersCount+maxErrorsCount), "extra tasks were started")
})

t.Run("tasks without errors", func(t *testing.T) {
tasksCount := 50
tasks := make([]Task, 0, tasksCount)

var runTasksCount int32
var sumTime time.Duration

for i := 0; i < tasksCount; i++ {
taskSleep := time.Millisecond * time.Duration(rand.Intn(100))
sumTime += taskSleep

tasks = append(tasks, func() error {
time.Sleep(taskSleep)
atomic.AddInt32(&runTasksCount, 1)
return nil
})
}

workersCount := 5
maxErrorsCount := 1

start := time.Now()
err := RunChan(tasks, workersCount, maxErrorsCount)
elapsedTime := time.Since(start)
require.NoError(t, err)

require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed")
require.LessOrEqual(t, int64(elapsedTime), int64(sumTime/2), "tasks were run sequentially?")
})

t.Run("all tasks without elapsedTime", func(t *testing.T) {
tasksCount := 50
tasks := make([]Task, 0, tasksCount)

var runTasksCount int32
var sumTime time.Duration
var err error

for i := 0; i < tasksCount; i++ {
taskSleep := time.Millisecond * time.Duration(rand.Intn(100))
sumTime += taskSleep

tasks = append(tasks, func() error {
time.Sleep(taskSleep)
atomic.AddInt32(&runTasksCount, 1)
return nil
})
}

workersCount := 5
maxErrorsCount := 1

require.Eventually(t, func() bool {
err = RunChan(tasks, workersCount, maxErrorsCount)
return err == nil
}, time.Duration(int64(sumTime/2)), time.Duration(int64(sumTime/1000)), "tasks were run sequentially?")

require.NoError(t, err)
})

t.Run("empty tasks", func(t *testing.T) {
tasks := make([]Task, 0)

err := RunChan(tasks, 1, 1)

require.Truef(t, errors.Is(err, ErrEmptyTasks), "actual err - %v", err)
})

t.Run("task count less workers", func(t *testing.T) {
tasksCount := 5
tasks := make([]Task, 0, tasksCount)

var runTasksCount int32
var sumTime time.Duration

for i := 0; i < tasksCount; i++ {
taskSleep := time.Millisecond * time.Duration(rand.Intn(100))
sumTime += taskSleep

tasks = append(tasks, func() error {
time.Sleep(taskSleep)
atomic.AddInt32(&runTasksCount, 1)
return nil
})
}

workersCount := 10
maxErrorsCount := 10
err := RunChan(tasks, workersCount, maxErrorsCount)

require.NoError(t, err)
require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed")
})

t.Run("zero errors", func(t *testing.T) {
tasksCount := 50
tasks := make([]Task, 0, tasksCount)

var runTasksCount int32

for i := 0; i < tasksCount; i++ {
if i == 5 {
tasks = append(tasks, func() error {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
atomic.AddInt32(&runTasksCount, 1)
return nil
})
}
}

workersCount := 5
maxErrorsCount := 0
err := RunChan(tasks, workersCount, maxErrorsCount)

require.Truef(t, errors.Is(err, ErrErrorsLimitExceeded), "actual err - %v", err)
})
}

0 comments on commit 046e47c

Please sign in to comment.