Skip to content

Commit

Permalink
HW6 is completed
Browse files Browse the repository at this point in the history
  • Loading branch information
DimVlas committed May 31, 2024
1 parent 88f0a3c commit 326fff6
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 3 deletions.
2 changes: 1 addition & 1 deletion hw06_pipeline_execution/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/fixme_my_friend/hw06_pipeline_execution
module github.com/DimVlas/otus_hw/hw06_pipeline_execution

go 1.19

Expand Down
28 changes: 26 additions & 2 deletions hw06_pipeline_execution/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,30 @@ type (
type Stage func(in In) (out Out)

func ExecutePipeline(in In, done In, stages ...Stage) Out {
// Place your code here.
return nil
output := in

for _, stage := range stages {
input := chanWrap(output, done)
output = stage(input)
}

return output
}

func chanWrap(in In, done In) Out {
out := make(Bi)

go func() {
defer close(out)

for val := range in {
select {
case out <- val:
case <-done:
return
}
}
}()

return out
}
44 changes: 44 additions & 0 deletions hw06_pipeline_execution/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,48 @@ func TestPipeline(t *testing.T) {
require.Len(t, result, 0)
require.Less(t, int64(elapsed), int64(abortDur)+int64(fault))
})

t.Run("empty case", func(t *testing.T) {
in := make(Bi)
done := make(Bi)
data := []int{1, 2, 3, 4, 5}

go func() {
for _, v := range data {
in <- v
}
close(in)
}()

stages := []Stage{}

result := make([]int, 0, 5)
for s := range ExecutePipeline(in, done, stages...) {
result = append(result, s.(int))
}

require.Len(t, result, 5)
require.Equal(t, []int{1, 2, 3, 4, 5}, result)
})

t.Run("empty data", func(t *testing.T) {
in := make(Bi)
done := make(Bi)
data := []int{}

go func() {
for _, v := range data {
in <- v
}
close(in)
}()

result := make([]string, 0, 5)
for s := range ExecutePipeline(in, done, stages...) {
result = append(result, s.(string))
}

require.Len(t, result, 0)
require.Equal(t, []string{}, result)
})
}

0 comments on commit 326fff6

Please sign in to comment.