Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close backend connection when frontend is not found #417

Merged
merged 13 commits into from
Jan 18, 2023
Merged
16 changes: 14 additions & 2 deletions konnectivity-client/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,23 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) {

case client.PacketType_DATA:
resp := pkt.GetData()
if resp.ConnectID == 0 {
klog.ErrorS(nil, "Received packet missing ConnectID", "packetType", "DATA")
continue
}
// TODO: flow control
conn, ok := t.conns.get(resp.ConnectID)

if !ok {
klog.V(1).InfoS("Connection not recognized", "connectionID", resp.ConnectID)
klog.ErrorS(nil, "Connection not recognized", "connectionID", resp.ConnectID, "packetType", "DATA")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we were taking this back to Info?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connection ID is only removed from t.conns when receiving a CLOSE_RSP from the server, not for a client-side close. So the only way this would be hit is if the server sent a DATA packet after a CLOSE_RSP, which should never happen.

Note that I did change the CLOSE_RSP version of this log (line 374) to an info log, as there are cases where multiple CLOSE_RSP packets might be sent.

Copy link
Contributor

@cheftako cheftako Jan 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I buy that argument (for now) but it suggests that we only clean up the connection map if we get a CLOSE_RSP packet. That seems like it could represent a possible leak. If we decide to fix that possible leak then we may need to revisit this log message.

t.Send(&client.Packet{
Type: client.PacketType_CLOSE_REQ,
Payload: &client.Packet_CloseRequest{
CloseRequest: &client.CloseRequest{
ConnectID: resp.ConnectID,
},
},
})
continue
}
timer := time.NewTimer((time.Duration)(t.readTimeoutSeconds) * time.Second)
Expand All @@ -357,7 +369,7 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) {
conn, ok := t.conns.get(resp.ConnectID)

if !ok {
klog.V(1).InfoS("Connection not recognized", "connectionID", resp.ConnectID)
klog.V(1).InfoS("Connection not recognized", "connectionID", resp.ConnectID, "packetType", "CLOSE_RSP")
continue
}
close(conn.readCh)
Expand Down
21 changes: 21 additions & 0 deletions pkg/agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,26 @@ func (a *Client) Serve() {
case client.PacketType_DATA:
data := pkt.GetData()
klog.V(4).InfoS("received DATA", "connectionID", data.ConnectID)
if data.ConnectID == 0 {
klog.ErrorS(nil, "Received packet missing ConnectID from frontend", "packetType", "DATA")
continue
}

ctx, ok := a.connManager.Get(data.ConnectID)
if ok {
ctx.send(data.Data)
} else {
klog.V(2).InfoS("received DATA for unrecognized connection", "connectionID", data.ConnectID)
a.Send(&client.Packet{
tallclair marked this conversation as resolved.
Show resolved Hide resolved
Type: client.PacketType_CLOSE_RSP,
Payload: &client.Packet_CloseResponse{
CloseResponse: &client.CloseResponse{
ConnectID: data.ConnectID,
Error: "unrecognized connectID",
},
},
})
continue
}

case client.PacketType_CLOSE_REQ:
Expand Down Expand Up @@ -590,9 +606,14 @@ func (a *Client) proxyToRemote(connID int64, ctx *connContext) {
// As the read side of the dataCh channel, we cannot close it.
// However serve() may be blocked writing to the channel,
// so we need to consume the channel until it is closed.
discardedPktCount := 0
for range ctx.dataCh {
// Ignore values as this indicates there was a problem
// with the remote connection.
discardedPktCount++
}
if discardedPktCount > 0 {
klog.V(2).InfoS("Discard packets while exiting proxyToRemote", "pktCount", discardedPktCount, "connectionID", connID)
}
}()

Expand Down
111 changes: 75 additions & 36 deletions pkg/agent/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,25 +59,25 @@ func TestServeData_HTTP(t *testing.T) {
}))
defer ts.Close()

// Stimulate sending KAS DIAL_REQ to (Agent) Client
// Simulate sending KAS DIAL_REQ to (Agent) Client
dialPacket := newDialPacket("tcp", ts.URL[len("http://"):], 111)
err = stream.Send(dialPacket)
if err != nil {
t.Fatal(err.Error())
}

// Expect receiving DIAL_RSP packet from (Agent) Client
pkg, err := stream.Recv()
pkt, err := stream.Recv()
if err != nil {
t.Fatal(err.Error())
}
if pkg == nil {
if pkt == nil {
t.Fatal("unexpected nil packet")
}
if pkg.Type != client.PacketType_DIAL_RSP {
t.Errorf("expect PacketType_DIAL_RSP; got %v", pkg.Type)
if pkt.Type != client.PacketType_DIAL_RSP {
t.Errorf("expect PacketType_DIAL_RSP; got %v", pkt.Type)
}
dialRsp := pkg.Payload.(*client.Packet_DialResponse)
dialRsp := pkt.Payload.(*client.Packet_DialResponse)
connID := dialRsp.DialResponse.ConnectID
if dialRsp.DialResponse.Random != 111 {
t.Errorf("expect random=111; got %v", dialRsp.DialResponse.Random)
Expand All @@ -91,14 +91,14 @@ func TestServeData_HTTP(t *testing.T) {
}

// Expect receiving http response via (Agent) Client
pkg, _ = stream.Recv()
if pkg == nil {
pkt, _ = stream.Recv()
if pkt == nil {
t.Fatal("unexpected nil packet")
}
if pkg.Type != client.PacketType_DATA {
t.Errorf("expect PacketType_DATA; got %v", pkg.Type)
if pkt.Type != client.PacketType_DATA {
t.Errorf("expect PacketType_DATA; got %v", pkt.Type)
}
data := pkg.Payload.(*client.Packet_Data).Data.Data
data := pkt.Payload.(*client.Packet_Data).Data.Data

// Verify response data
//
Expand All @@ -117,14 +117,14 @@ func TestServeData_HTTP(t *testing.T) {
ts.Close()

// Verify receiving CLOSE_RSP
pkg, _ = stream.Recv()
if pkg == nil {
pkt, _ = stream.Recv()
if pkt == nil {
t.Fatal("unexpected nil packet")
}
if pkg.Type != client.PacketType_CLOSE_RSP {
t.Errorf("expect PacketType_CLOSE_RSP; got %v", pkg.Type)
if pkt.Type != client.PacketType_CLOSE_RSP {
t.Errorf("expect PacketType_CLOSE_RSP; got %v", pkt.Type)
}
closeErr := pkg.Payload.(*client.Packet_CloseResponse).CloseResponse.Error
closeErr := pkt.Payload.(*client.Packet_CloseResponse).CloseResponse.Error
if closeErr != "" {
t.Errorf("expect nil closeErr; got %v", closeErr)
}
Expand Down Expand Up @@ -159,22 +159,22 @@ func TestClose_Client(t *testing.T) {
}))
defer ts.Close()

// Stimulate sending KAS DIAL_REQ to (Agent) Client
// Simulate sending KAS DIAL_REQ to (Agent) Client
dialPacket := newDialPacket("tcp", ts.URL[len("http://"):], 111)
err := stream.Send(dialPacket)
if err != nil {
t.Fatal(err)
}

// Expect receiving DIAL_RSP packet from (Agent) Client
pkg, _ := stream.Recv()
if pkg == nil {
pkt, _ := stream.Recv()
if pkt == nil {
t.Fatal("unexpected nil packet")
}
if pkg.Type != client.PacketType_DIAL_RSP {
t.Errorf("expect PacketType_DIAL_RSP; got %v", pkg.Type)
if pkt.Type != client.PacketType_DIAL_RSP {
t.Errorf("expect PacketType_DIAL_RSP; got %v", pkt.Type)
}
dialRsp := pkg.Payload.(*client.Packet_DialResponse)
dialRsp := pkt.Payload.(*client.Packet_DialResponse)
connID := dialRsp.DialResponse.ConnectID
if dialRsp.DialResponse.Random != 111 {
t.Errorf("expect random=111; got %v", dialRsp.DialResponse.Random)
Expand All @@ -186,14 +186,14 @@ func TestClose_Client(t *testing.T) {
}

// Expect receiving close response via (Agent) Client
pkg, _ = stream.Recv()
if pkg == nil {
pkt, _ = stream.Recv()
if pkt == nil {
t.Error("unexpected nil packet")
}
if pkg.Type != client.PacketType_CLOSE_RSP {
t.Errorf("expect PacketType_CLOSE_RSP; got %v", pkg.Type)
if pkt.Type != client.PacketType_CLOSE_RSP {
t.Errorf("expect PacketType_CLOSE_RSP; got %v", pkt.Type)
}
closeErr := pkg.Payload.(*client.Packet_CloseResponse).CloseResponse.Error
closeErr := pkt.Payload.(*client.Packet_CloseResponse).CloseResponse.Error
if closeErr != "" {
t.Errorf("expect nil closeErr; got %v", closeErr)
}
Expand All @@ -209,20 +209,59 @@ func TestClose_Client(t *testing.T) {
}

// Expect receiving close response via (Agent) Client
pkg, _ = stream.Recv()
if pkg == nil {
pkt, _ = stream.Recv()
if pkt == nil {
t.Error("unexpected nil packet")
}
if pkg.Type != client.PacketType_CLOSE_RSP {
t.Errorf("expect PacketType_CLOSE_RSP; got %+v", pkg)
if pkt.Type != client.PacketType_CLOSE_RSP {
t.Errorf("expect PacketType_CLOSE_RSP; got %+v", pkt)
}
closeErr = pkg.Payload.(*client.Packet_CloseResponse).CloseResponse.Error
closeErr = pkt.Payload.(*client.Packet_CloseResponse).CloseResponse.Error
if closeErr != "Unknown connectID" {
t.Errorf("expect Unknown connectID; got %v", closeErr)
}

}

func TestConnectionMismatch(t *testing.T) {
var stream agent.AgentService_ConnectClient
stopCh := make(chan struct{})
cs := &ClientSet{
clients: make(map[string]*Client),
stopCh: stopCh,
}
testClient := &Client{
connManager: newConnectionManager(),
stopCh: stopCh,
cs: cs,
}
testClient.stream, stream = pipe()

// Start agent
go testClient.Serve()
defer close(stopCh)

// Simulate sending a DATA packet to (Agent) Client
const connID = 12345
pkt := newDataPacket(connID, []byte("hello world"))
if err := stream.Send(pkt); err != nil {
t.Fatal(err)
}

// Expect to receive CLOSE_RSP packet from (Agent) Client
pkt, err := stream.Recv()
if err != nil {
t.Fatal(err)
}
if pkt.Type != client.PacketType_CLOSE_RSP {
t.Errorf("expect PacketType_CLOSE_RSP; got %v", pkt.Type)
}
closeRsp := pkt.Payload.(*client.Packet_CloseResponse).CloseResponse
if closeRsp.ConnectID != connID {
t.Errorf("expect connID=%d; got %v", connID, closeRsp.ConnectID)
}
}

// brokenStream wraps a ConnectClient and returns an error on Send and/or Recv if the respective
// error is non-nil.
type brokenStream struct {
Expand Down Expand Up @@ -287,7 +326,7 @@ func TestFailedSend_DialResp_GRPC(t *testing.T) {
time.Sleep(time.Second)
defer goleakVerifyNone(t, goleak.IgnoreCurrent())

// Stimulate sending KAS DIAL_REQ to (Agent) Client
// Simulate sending KAS DIAL_REQ to (Agent) Client
dialPacket := newDialPacket("tcp", strings.TrimPrefix(ts.URL, "http://"), 111)
err := stream.Send(dialPacket)
if err != nil {
Expand Down Expand Up @@ -329,9 +368,9 @@ func (s *fakeStream) Send(packet *client.Packet) error {

func (s *fakeStream) Recv() (*client.Packet, error) {
select {
case pkg := <-s.r:
klog.V(4).InfoS("[DEBUG] recv", "packet", pkg)
return pkg, nil
case pkt := <-s.r:
klog.V(4).InfoS("[DEBUG] recv", "packet", pkt)
return pkt, nil
case <-time.After(5 * time.Second):
return nil, errors.New("timeout recv")
}
Expand Down
Loading