diff --git a/hw06_pipeline_execution/.sync b/hw06_pipeline_execution/.sync deleted file mode 100644 index e69de29..0000000 diff --git a/hw06_pipeline_execution/go.mod b/hw06_pipeline_execution/go.mod index ed224b9..d89f80c 100644 --- a/hw06_pipeline_execution/go.mod +++ b/hw06_pipeline_execution/go.mod @@ -1,4 +1,4 @@ -module github.com/fixme_my_friend/hw06_pipeline_execution +module github.com/DimVlas/otus_hw/hw06_pipeline_execution go 1.19 diff --git a/hw06_pipeline_execution/pipeline.go b/hw06_pipeline_execution/pipeline.go index 9044486..d9b70bb 100644 --- a/hw06_pipeline_execution/pipeline.go +++ b/hw06_pipeline_execution/pipeline.go @@ -9,6 +9,38 @@ type ( type Stage func(in In) (out Out) func ExecutePipeline(in In, done In, stages ...Stage) Out { - // Place your code here. - return nil + output := in + + for _, stage := range stages { + input := chanWrap(output, done) + output = stage(input) + } + + return output +} + +func chanWrap(in In, done In) Out { + out := make(Bi) + + go func() { + defer close(out) + + for { + select { + case <-done: + return + case val, ok := <-in: + if !ok { + return + } + select { + case <-done: + return + case out <- val: + } + } + } + }() + + return out } diff --git a/hw06_pipeline_execution/pipeline_test.go b/hw06_pipeline_execution/pipeline_test.go index b638ed8..63c3a59 100644 --- a/hw06_pipeline_execution/pipeline_test.go +++ b/hw06_pipeline_execution/pipeline_test.go @@ -90,4 +90,124 @@ func TestPipeline(t *testing.T) { require.Len(t, result, 0) require.Less(t, int64(elapsed), int64(abortDur)+int64(fault)) }) + + t.Run("empty case", func(t *testing.T) { + in := make(Bi) + done := make(Bi) + data := []int{1, 2, 3, 4, 5} + + go func() { + for _, v := range data { + in <- v + } + close(in) + }() + + stages := []Stage{} + + result := make([]int, 0, 5) + for s := range ExecutePipeline(in, done, stages...) { + result = append(result, s.(int)) + } + + require.Len(t, result, 5) + require.Equal(t, []int{1, 2, 3, 4, 5}, result) + }) + + t.Run("empty data", func(t *testing.T) { + in := make(Bi) + done := make(Bi) + data := []int{} + + go func() { + for _, v := range data { + in <- v + } + close(in) + }() + + result := make([]string, 0, 5) + for s := range ExecutePipeline(in, done, stages...) { + result = append(result, s.(string)) + } + + require.Len(t, result, 0) + require.Equal(t, []string{}, result) + }) +} + +func TestChanWrap(t *testing.T) { + t.Run("To end", func(t *testing.T) { + in := make(Bi) + done := make(Bi) + data := []int{1, 2, 3, 4, 5} + + go func() { + for _, v := range data { + in <- v + time.Sleep(time.Millisecond * 10) + } + close(in) + }() + + res := make([]int, 0, 5) + for i := range chanWrap(in, done) { + res = append(res, i.(int)) + } + + require.Len(t, res, 5) + require.Equal(t, []int{1, 2, 3, 4, 5}, res) + }) + + t.Run("Close Done", func(t *testing.T) { + in := make(Bi) + done := make(Bi) + data := []int{1, 2, 3, 4, 5} + + go func() { + for i, v := range data { + if i == 2 { + close(done) + return + } + in <- v + time.Sleep(time.Millisecond * 10) + } + close(in) + }() + + res := make([]int, 0, 5) + for i := range chanWrap(in, done) { + res = append(res, i.(int)) + } + + require.Len(t, res, 2) + require.Equal(t, []int{1, 2}, res) + }) + + t.Run("Close In", func(t *testing.T) { + in := make(Bi) + done := make(Bi) + data := []int{1, 2, 3, 4, 5} + + go func() { + for i, v := range data { + if i == 2 { + close(in) + return + } + in <- v + time.Sleep(time.Millisecond * 10) + } + close(in) + }() + + res := make([]int, 0, 5) + for i := range chanWrap(in, done) { + res = append(res, i.(int)) + } + + require.Len(t, res, 2) + require.Equal(t, []int{1, 2}, res) + }) }