Skip to content

Commit

Permalink
Merge pull request #6 from DimVlas/hw06_pipeline_execution
Browse files Browse the repository at this point in the history
Hw06 pipeline execution
  • Loading branch information
DimVlas authored Jun 7, 2024
2 parents 88f0a3c + 3036f7e commit d20c7f7
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 3 deletions.
Empty file removed hw06_pipeline_execution/.sync
Empty file.
2 changes: 1 addition & 1 deletion hw06_pipeline_execution/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/fixme_my_friend/hw06_pipeline_execution
module github.com/DimVlas/otus_hw/hw06_pipeline_execution

go 1.19

Expand Down
36 changes: 34 additions & 2 deletions hw06_pipeline_execution/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,38 @@ type (
type Stage func(in In) (out Out)

func ExecutePipeline(in In, done In, stages ...Stage) Out {
// Place your code here.
return nil
output := in

for _, stage := range stages {
input := chanWrap(output, done)
output = stage(input)
}

return output
}

func chanWrap(in In, done In) Out {
out := make(Bi)

go func() {
defer close(out)

for {
select {
case <-done:
return
case val, ok := <-in:
if !ok {
return
}
select {
case <-done:
return
case out <- val:
}
}
}
}()

return out
}
120 changes: 120 additions & 0 deletions hw06_pipeline_execution/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,124 @@ func TestPipeline(t *testing.T) {
require.Len(t, result, 0)
require.Less(t, int64(elapsed), int64(abortDur)+int64(fault))
})

t.Run("empty case", func(t *testing.T) {
in := make(Bi)
done := make(Bi)
data := []int{1, 2, 3, 4, 5}

go func() {
for _, v := range data {
in <- v
}
close(in)
}()

stages := []Stage{}

result := make([]int, 0, 5)
for s := range ExecutePipeline(in, done, stages...) {
result = append(result, s.(int))
}

require.Len(t, result, 5)
require.Equal(t, []int{1, 2, 3, 4, 5}, result)
})

t.Run("empty data", func(t *testing.T) {
in := make(Bi)
done := make(Bi)
data := []int{}

go func() {
for _, v := range data {
in <- v
}
close(in)
}()

result := make([]string, 0, 5)
for s := range ExecutePipeline(in, done, stages...) {
result = append(result, s.(string))
}

require.Len(t, result, 0)
require.Equal(t, []string{}, result)
})
}

func TestChanWrap(t *testing.T) {
t.Run("To end", func(t *testing.T) {
in := make(Bi)
done := make(Bi)
data := []int{1, 2, 3, 4, 5}

go func() {
for _, v := range data {
in <- v
time.Sleep(time.Millisecond * 10)
}
close(in)
}()

res := make([]int, 0, 5)
for i := range chanWrap(in, done) {
res = append(res, i.(int))
}

require.Len(t, res, 5)
require.Equal(t, []int{1, 2, 3, 4, 5}, res)
})

t.Run("Close Done", func(t *testing.T) {
in := make(Bi)
done := make(Bi)
data := []int{1, 2, 3, 4, 5}

go func() {
for i, v := range data {
if i == 2 {
close(done)
return
}
in <- v
time.Sleep(time.Millisecond * 10)
}
close(in)
}()

res := make([]int, 0, 5)
for i := range chanWrap(in, done) {
res = append(res, i.(int))
}

require.Len(t, res, 2)
require.Equal(t, []int{1, 2}, res)
})

t.Run("Close In", func(t *testing.T) {
in := make(Bi)
done := make(Bi)
data := []int{1, 2, 3, 4, 5}

go func() {
for i, v := range data {
if i == 2 {
close(in)
return
}
in <- v
time.Sleep(time.Millisecond * 10)
}
close(in)
}()

res := make([]int, 0, 5)
for i := range chanWrap(in, done) {
res = append(res, i.(int))
}

require.Len(t, res, 2)
require.Equal(t, []int{1, 2}, res)
})
}

0 comments on commit d20c7f7

Please sign in to comment.