diff --git a/hw06_pipeline_execution/go.mod b/hw06_pipeline_execution/go.mod index ed224b9..d89f80c 100644 --- a/hw06_pipeline_execution/go.mod +++ b/hw06_pipeline_execution/go.mod @@ -1,4 +1,4 @@ -module github.com/fixme_my_friend/hw06_pipeline_execution +module github.com/DimVlas/otus_hw/hw06_pipeline_execution go 1.19 diff --git a/hw06_pipeline_execution/pipeline.go b/hw06_pipeline_execution/pipeline.go index 9044486..b6fd2c9 100644 --- a/hw06_pipeline_execution/pipeline.go +++ b/hw06_pipeline_execution/pipeline.go @@ -9,6 +9,30 @@ type ( type Stage func(in In) (out Out) func ExecutePipeline(in In, done In, stages ...Stage) Out { - // Place your code here. - return nil + output := in + + for _, stage := range stages { + input := chanWrap(output, done) + output = stage(input) + } + + return output +} + +func chanWrap(in In, done In) Out { + out := make(Bi) + + go func() { + defer close(out) + + for val := range in { + select { + case out <- val: + case <-done: + return + } + } + }() + + return out } diff --git a/hw06_pipeline_execution/pipeline_test.go b/hw06_pipeline_execution/pipeline_test.go index b638ed8..5cb1bcd 100644 --- a/hw06_pipeline_execution/pipeline_test.go +++ b/hw06_pipeline_execution/pipeline_test.go @@ -90,4 +90,48 @@ func TestPipeline(t *testing.T) { require.Len(t, result, 0) require.Less(t, int64(elapsed), int64(abortDur)+int64(fault)) }) + + t.Run("empty case", func(t *testing.T) { + in := make(Bi) + done := make(Bi) + data := []int{1, 2, 3, 4, 5} + + go func() { + for _, v := range data { + in <- v + } + close(in) + }() + + stages := []Stage{} + + result := make([]int, 0, 5) + for s := range ExecutePipeline(in, done, stages...) { + result = append(result, s.(int)) + } + + require.Len(t, result, 5) + require.Equal(t, []int{1, 2, 3, 4, 5}, result) + }) + + t.Run("empty data", func(t *testing.T) { + in := make(Bi) + done := make(Bi) + data := []int{} + + go func() { + for _, v := range data { + in <- v + } + close(in) + }() + + result := make([]string, 0, 5) + for s := range ExecutePipeline(in, done, stages...) { + result = append(result, s.(string)) + } + + require.Len(t, result, 0) + require.Equal(t, []string{}, result) + }) }