Skip to content

Commit

Permalink
fix/refactor: Refactor streaming to use the new built-in frisbee stre…
Browse files Browse the repository at this point in the history
…aming channels (#25)

* Refactor streaming to use the new built-in frisbee streaming channels

* Remove debug statements

* Fix missing import
  • Loading branch information
SuperManifolds authored Dec 7, 2022
1 parent 5bbbc14 commit a8c5009
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 404 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ go 1.18

require (
github.com/loopholelabs/common v0.4.4
github.com/loopholelabs/frisbee-go v0.7.0
github.com/loopholelabs/polyglot-go v0.5.0
github.com/loopholelabs/frisbee-go v0.7.1
github.com/loopholelabs/polyglot-go v0.5.1
github.com/loopholelabs/testing v0.2.3
github.com/rs/zerolog v1.28.0
github.com/stretchr/testify v1.8.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/loopholelabs/common v0.4.4 h1:Ge+1v1WiLYgR/4pziOQoJAwUqUm1c9j6nQvnkiFFBsk=
github.com/loopholelabs/common v0.4.4/go.mod h1:YKnljczr4jgxkHhhAwIHh3CJXaff89YBd8Vp3pwpG3k=
github.com/loopholelabs/frisbee-go v0.7.0 h1:LGzG/NYQBsRnmojfVoSBTC31PgfV9XutQA39IU9gzf8=
github.com/loopholelabs/frisbee-go v0.7.0/go.mod h1:XfDgwwOkgN/ktzaDAq3Zu0A9Dl0w5/xkZ2qfXAvRbjs=
github.com/loopholelabs/polyglot-go v0.5.0 h1:F65/d+65qgAu2F0GcWzP6UVIwd9897bNEgylNMr8FGk=
github.com/loopholelabs/polyglot-go v0.5.0/go.mod h1:Z0QiNv4KRuWjQWpUerMhmkvRh6ks1pYmEH4SGpG0EHQ=
github.com/loopholelabs/frisbee-go v0.7.1 h1:imAu7k1blav6FH9nLLn2wqi8d3rHJZqk9e20EglMEqo=
github.com/loopholelabs/frisbee-go v0.7.1/go.mod h1:vvW59GSxsw0euO6NtOIWD4lAgXu0jNE9bjFPQGxdOBc=
github.com/loopholelabs/polyglot-go v0.5.1 h1:21QVDELp+EodPUAL+Aw8GNXLyt2BFj9gYQsGvHIFlcc=
github.com/loopholelabs/polyglot-go v0.5.1/go.mod h1:Z0QiNv4KRuWjQWpUerMhmkvRh6ks1pYmEH4SGpG0EHQ=
github.com/loopholelabs/testing v0.2.3 h1:4nVuK5ctaE6ua5Z0dYk2l7xTFmcpCYLUeGjRBp8keOA=
github.com/loopholelabs/testing v0.2.3/go.mod h1:gqtGY91soYD1fQoKQt/6kP14OYpS7gcbcIgq5mc9m8Q=
github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40=
Expand Down
2 changes: 1 addition & 1 deletion pkg/generator/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var (
requiredImports = []string{
"errors",
"github.com/loopholelabs/polyglot-go",
"net",
}

serviceImports = []string{
Expand All @@ -31,7 +32,6 @@ var (
}

streamMethodImports = []string{
"github.com/loopholelabs/common/pkg/queue",
"go.uber.org/atomic",
"io",
}
Expand Down
23 changes: 6 additions & 17 deletions pkg/generator/test/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,31 +42,19 @@ func TestRPC(t *testing.T) {
go server.ServeConn(sConn)

t.Run("Synchronous Request", func(t *testing.T) {
t.Parallel()
go func() {
testSynchronous(client, t)
}()
testSynchronous(client, t)
})

t.Run("Bi-directional Stream", func(t *testing.T) {
t.Parallel()
go func() {
testBidirectional(client, t)
}()
testBidirectional(client, t)
})

t.Run("Server Stream", func(t *testing.T) {
t.Parallel()
go func() {
testServerStreaming(client, t)
}()
testServerStreaming(client, t)
})

t.Run("Client Stream", func(t *testing.T) {
t.Parallel()
go func() {
testClientStreaming(client, t)
}()
testClientStreaming(client, t)
})
}

Expand Down Expand Up @@ -151,6 +139,7 @@ func testClientStreaming(client *Client, t *testing.T) {
err := stream.Send(data)
assert.NoError(t, err)
}
err = stream.CloseSend()
res, err := stream.CloseAndRecv()
assert.NoError(t, err)
assert.Equal(t, "Hello World", res.Message)
}
12 changes: 3 additions & 9 deletions pkg/generator/test/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,7 @@ func (s svc) Echo(ctx context.Context, request *Request) (*Response, error) {
}

func (s svc) EchoStream(srv *EchoStreamServer) error {
ctx := srv.Context()
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

request, err := srv.Recv()
if err == io.EOF {
err = srv.CloseSend()
Expand Down Expand Up @@ -89,9 +82,10 @@ func (s svc) Upload(srv *UploadServer) error {
for {
res, err := srv.Recv()
if err == io.EOF {
assert.Equal(s.t, 10, received)
return srv.CloseSend()
assert.Equal(s.t, 11, received)
return srv.CloseAndSend(&Response{Message: "Hello World", Test: &Data{}})
}
received += 1
assert.NoError(s.t, err)
assert.Equal(s.t, "Hello World", res.Message)
}
Expand Down
Loading

0 comments on commit a8c5009

Please sign in to comment.