Skip to content

Commit

Permalink
HW5 is completed. Chan
Browse files Browse the repository at this point in the history
  • Loading branch information
DimVlas committed May 25, 2024
1 parent c6b1162 commit c70c4f4
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 217 deletions.
95 changes: 22 additions & 73 deletions hw05_parallel_execution/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package hw05parallelexecution
import (
"errors"
"sync"
"sync/atomic"
"time"
)

var (
Expand All @@ -14,66 +12,8 @@ var (

type Task func() error

// Run starts tasks in n goroutines and stops its work when receiving m errors from tasks.
func Run(tasks []Task, n, m int) error {
if len(tasks) == 0 {
return ErrEmptyTasks
}
if m <= 0 {
return ErrErrorsLimitExceeded
}

var (
wg sync.WaitGroup

errMaxCount = int32(m) // максимально допустимое кол-во ошибок
workerMaxCount = int32(n) // максимальное возможное кол-во воркеров
errCount atomic.Int32 // текущее кол-во ошибок
workerCount atomic.Int32 // текущее кол-во работающих воркеров
)

if len(tasks) < n {
workerMaxCount = int32(len(tasks))
}

for _, task := range tasks {
// если количество работающих тасков меньше workerMaxCount -> запускаем таск
// иначе, ждем 10 млсек
for workerCount.Load() >= workerMaxCount {
time.Sleep(time.Millisecond * 10)
}

if errCount.Load() >= errMaxCount { // если кол-во ошибок больше допустимого
// прекращаем запускать воркеры
break
}

workerCount.Add(1)
wg.Add(1)
go func(t Task) {
defer func() {
workerCount.Add(-1)
wg.Done()
}()

err := t()
if err != nil {
errCount.Add(1)
}
}(task)
}

wg.Wait()

if errCount.Load() >= errMaxCount {
return ErrErrorsLimitExceeded
}

return nil
}

// Run Tasks with channels.
func RunChan(tasks []Task, n, m int) error {
func Run(tasks []Task, n, m int) error {
if len(tasks) == 0 {
return ErrEmptyTasks
}
Expand All @@ -88,14 +28,18 @@ func RunChan(tasks []Task, n, m int) error {
workerMaxCount = len(tasks)
}

tasksCh := make(chan Task) // канал задач.
errorCh := make(chan error, workerMaxCount) // канал ошибок.
tasksCh := make(chan Task) // канал задач.
stopErrCh := make(chan struct{}) // сигнальный канал.
errorCh := make(chan error) // канал ошибок.
wg := sync.WaitGroup{}
wgErr := sync.WaitGroup{}

defer func() {
close(tasksCh)
close(stopErrCh)
wg.Wait()
close(errorCh)
wgErr.Wait()
}()

// запускаем воркеры
Expand All @@ -104,25 +48,30 @@ func RunChan(tasks []Task, n, m int) error {
go doWork(&wg, tasksCh, errorCh)
}

errCount := 0 // текущее кол-во ошибок
wgErr.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()

for i := 0; i < len(tasks); {
select {
case <-errorCh:
errCount := 0 // текущее кол-во ошибок
for range errorCh {
errCount++
if errCount >= m {
return ErrErrorsLimitExceeded
if errCount == m {
stopErrCh <- struct{}{}
}
case tasksCh <- tasks[i]:
i++
}
}(&wgErr)

for _, task := range tasks {
select {
case <-stopErrCh:
return ErrErrorsLimitExceeded
case tasksCh <- task:
}
}

return nil
}

// tasks - канал задач.
// errs - канал ошибок.
func doWork(wg *sync.WaitGroup, tasks <-chan Task, errs chan<- error) {
defer func() {
wg.Done()
Expand Down
144 changes: 0 additions & 144 deletions hw05_parallel_execution/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,147 +155,3 @@ func TestRun(t *testing.T) {
require.Truef(t, errors.Is(err, ErrErrorsLimitExceeded), "actual err - %v", err)
})
}

func TestRunChan(t *testing.T) {
defer goleak.VerifyNone(t)

t.Run("if were errors in first M tasks, than finished not more N+M tasks", func(t *testing.T) {
tasksCount := 50
tasks := make([]Task, 0, tasksCount)

var runTasksCount int32

for i := 0; i < tasksCount; i++ {
err := fmt.Errorf("error from task %d", i)
tasks = append(tasks, func() error {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
atomic.AddInt32(&runTasksCount, 1)
return err
})
}

workersCount := 10
maxErrorsCount := 23
err := RunChan(tasks, workersCount, maxErrorsCount)

require.Truef(t, errors.Is(err, ErrErrorsLimitExceeded), "actual err - %v", err)
require.LessOrEqual(t, runTasksCount, int32(workersCount+maxErrorsCount), "extra tasks were started")
})

t.Run("tasks without errors", func(t *testing.T) {
tasksCount := 50
tasks := make([]Task, 0, tasksCount)

var runTasksCount int32
var sumTime time.Duration

for i := 0; i < tasksCount; i++ {
taskSleep := time.Millisecond * time.Duration(rand.Intn(100))
sumTime += taskSleep

tasks = append(tasks, func() error {
time.Sleep(taskSleep)
atomic.AddInt32(&runTasksCount, 1)
return nil
})
}

workersCount := 5
maxErrorsCount := 1

start := time.Now()
err := RunChan(tasks, workersCount, maxErrorsCount)
elapsedTime := time.Since(start)
require.NoError(t, err)

require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed")
require.LessOrEqual(t, int64(elapsedTime), int64(sumTime/2), "tasks were run sequentially?")
})

t.Run("all tasks without elapsedTime", func(t *testing.T) {
tasksCount := 50
tasks := make([]Task, 0, tasksCount)

var runTasksCount int32
var sumTime time.Duration
var err error

for i := 0; i < tasksCount; i++ {
taskSleep := time.Millisecond * time.Duration(rand.Intn(100))
sumTime += taskSleep

tasks = append(tasks, func() error {
time.Sleep(taskSleep)
atomic.AddInt32(&runTasksCount, 1)
return nil
})
}

workersCount := 5
maxErrorsCount := 1

require.Eventually(t, func() bool {
err = RunChan(tasks, workersCount, maxErrorsCount)
return err == nil
}, time.Duration(int64(sumTime/2)), time.Duration(int64(sumTime/1000)), "tasks were run sequentially?")

require.NoError(t, err)
})

t.Run("empty tasks", func(t *testing.T) {
tasks := make([]Task, 0)

err := RunChan(tasks, 1, 1)

require.Truef(t, errors.Is(err, ErrEmptyTasks), "actual err - %v", err)
})

t.Run("task count less workers", func(t *testing.T) {
tasksCount := 5
tasks := make([]Task, 0, tasksCount)

var runTasksCount int32
var sumTime time.Duration

for i := 0; i < tasksCount; i++ {
taskSleep := time.Millisecond * time.Duration(rand.Intn(100))
sumTime += taskSleep

tasks = append(tasks, func() error {
time.Sleep(taskSleep)
atomic.AddInt32(&runTasksCount, 1)
return nil
})
}

workersCount := 10
maxErrorsCount := 10
err := RunChan(tasks, workersCount, maxErrorsCount)

require.NoError(t, err)
require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed")
})

t.Run("zero errors", func(t *testing.T) {
tasksCount := 50
tasks := make([]Task, 0, tasksCount)

var runTasksCount int32

for i := 0; i < tasksCount; i++ {
if i == 5 {
tasks = append(tasks, func() error {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
atomic.AddInt32(&runTasksCount, 1)
return nil
})
}
}

workersCount := 5
maxErrorsCount := 0
err := RunChan(tasks, workersCount, maxErrorsCount)

require.Truef(t, errors.Is(err, ErrErrorsLimitExceeded), "actual err - %v", err)
})
}

0 comments on commit c70c4f4

Please sign in to comment.