-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbuilder_streams.go
165 lines (139 loc) · 3.98 KB
/
builder_streams.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package cmdchain
import (
"errors"
"io"
"os"
"os/exec"
"sync"
)
func (c *chain) linkStreams(cmd *exec.Cmd) {
//link this command's input with the previous command's output (cmd1 -> cmd2)
prevCmdDesc := c.cmdDescriptors[len(c.cmdDescriptors)-2]
var prevOut, prevErr io.ReadCloser
var err error
defer func() {
c.buildErrors.addError(err)
}()
prevOut, prevErr, err = c.linkOutAndErr(&prevCmdDesc)
if err != nil {
return
}
if prevCmdDesc.outToIn && !prevCmdDesc.errToIn {
if prevCmdDesc.outFork == nil {
cmd.Stdin = prevOut
} else {
cmd.Stdin, err = c.forkStream(prevOut, prevCmdDesc.outFork)
}
} else if !prevCmdDesc.outToIn && prevCmdDesc.errToIn {
if prevCmdDesc.errFork == nil {
cmd.Stdin = prevErr
} else {
cmd.Stdin, err = c.forkStream(prevErr, prevCmdDesc.errFork)
}
} else if prevCmdDesc.outToIn && prevCmdDesc.errToIn {
var outR io.Reader = prevOut
var errR io.Reader = prevErr
if prevCmdDesc.outFork != nil {
outR, err = c.forkStream(prevOut, prevCmdDesc.outFork)
if err != nil {
return
}
}
if prevCmdDesc.errFork != nil {
errR, err = c.forkStream(prevErr, prevCmdDesc.errFork)
if err != nil {
return
}
}
cmd.Stdin, err = c.combineStream(outR, errR)
} else {
//this should never be happen!
err = errors.New("invalid stream configuration")
}
}
func (c *chain) linkOutAndErr(prevCmd *cmdDescriptor) (outStream io.ReadCloser, errStream io.ReadCloser, err error) {
if prevCmd.outToIn {
outStream, err = prevCmd.command.StdoutPipe()
if err != nil {
return
}
} else if prevCmd.outFork != nil {
prevCmd.command.Stdout = prevCmd.outFork
}
if prevCmd.errToIn {
errStream, err = prevCmd.command.StderrPipe()
if err != nil {
return
}
} else if prevCmd.errFork != nil {
prevCmd.command.Stderr = prevCmd.errFork
}
return
}
func (c *chain) forkStream(src io.ReadCloser, target io.Writer) (io.Reader, error) {
//initialise pipe and copy content inside own goroutine
pipeReader, pipeWriter, err := os.Pipe()
if err != nil {
return nil, err
}
/*
+------+ +------+
| cmd1 | ---+---> | cmd2 |
+------+ | +------+
V
+---------+
| outFork |
+---------+
*/
c.streamRoutinesWg.Add(1)
go func(cmdIndex int, src io.Reader) {
//we have to make sure, the pipe will be closed after the prevCommand
//have closed their output stream - otherwise this will cause a never
//ending wait for finishing the command execution!
defer pipeWriter.Close()
defer c.streamRoutinesWg.Done()
//the cmdOut must be written into both writer: outFork and pipeWriter.
//input from pipeWriter will redirected to pipeReader (the input for
//the next command)
_, err := io.Copy(io.MultiWriter(pipeWriter, target), src)
c.streamErrors.setError(cmdIndex, err)
}(len(c.cmdDescriptors)-1, src)
return pipeReader, nil
}
func (c *chain) combineStream(sources ...io.Reader) (*os.File, error) {
cmdIndex := len(c.cmdDescriptors) - 1
return c.combineStreamForCommand(cmdIndex, sources...)
}
func (c *chain) combineStreamForCommand(cmdIndex int, sources ...io.Reader) (*os.File, error) {
pipeReader, pipeWriter, err := os.Pipe()
if err != nil {
return nil, err
}
streamErrors := MultipleErrors{
errors: make([]error, len(sources)),
}
wg := sync.WaitGroup{}
wg.Add(len(sources))
for i, src := range sources {
//spawn goroutine for each stream to ensure the sources
//will read in parallel
go func(i int, src io.Reader) {
defer wg.Done()
_, err := io.Copy(pipeWriter, src)
if err != nil {
streamErrors.setError(i, err)
}
}(i, src)
}
c.streamRoutinesWg.Add(1)
go func() {
//we have to make sure that the pipe will be closed after all source streams
//are read. otherwise this will cause a never ending wait for finishing the command execution!
defer pipeWriter.Close()
defer c.streamErrors.setError(cmdIndex, streamErrors)
defer c.streamRoutinesWg.Done()
//wait until all streams are read
wg.Wait()
}()
return pipeReader, nil
}