Skip to content

Commit

Permalink
add dropped connection detection and retry handling for LRO and provi…
Browse files Browse the repository at this point in the history
…sioning state
  • Loading branch information
jackofallops committed Oct 25, 2024
1 parent 92ddd11 commit 99247a8
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 188 deletions.
3 changes: 1 addition & 2 deletions sdk/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ func extendedRetryPolicy(resp *http.Response, err error) (bool, error) {
// connection
tcpDialTCPRe := regexp.MustCompile(`dial tcp`)

// A regular expression to match complete packet loss - see comment below on packet-loss scenarios
// A regular expression to match complete packet loss
completePacketLossRe := regexp.MustCompile(`EOF`)

if err != nil {
Expand Down Expand Up @@ -809,7 +809,6 @@ func extendedRetryPolicy(resp *http.Response, err error) (bool, error) {
return false, v
}

// Such as Temporary Proxy outage, or recoverable disruption to network traffic (e.g. bgp events, Proxy failures etc)
if completePacketLossRe.MatchString(v.Error()) {
return false, v
}
Expand Down
20 changes: 18 additions & 2 deletions sdk/client/resourcemanager/poller_lro.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -27,6 +28,9 @@ type longRunningOperationPoller struct {
initialRetryDuration time.Duration
originalUrl *url.URL
pollingUrl *url.URL

droppedConnectionCount int
maxDroppedConnections int
}

func pollingUriForLongRunningOperation(resp *client.Response) string {
Expand All @@ -39,8 +43,9 @@ func pollingUriForLongRunningOperation(resp *client.Response) string {

func longRunningOperationPollerFromResponse(resp *client.Response, client *client.Client) (*longRunningOperationPoller, error) {
poller := longRunningOperationPoller{
client: client,
initialRetryDuration: 10 * time.Second,
client: client,
initialRetryDuration: 10 * time.Second,
maxDroppedConnections: 3,
}

pollingUrl := pollingUriForLongRunningOperation(resp)
Expand Down Expand Up @@ -107,9 +112,20 @@ func (p *longRunningOperationPoller) Poll(ctx context.Context) (result *pollers.
}
result.HttpResponse, err = req.Execute(ctx)
if err != nil {
var e *url.Error
if errors.As(err, &e) {
p.droppedConnectionCount++
if p.droppedConnectionCount < p.maxDroppedConnections {
result.Status = pollers.PollingStatusUnknown
return result, nil
}
}

return nil, err
}

p.droppedConnectionCount = 0

if result.HttpResponse != nil {
var respBody []byte
respBody, err = io.ReadAll(result.HttpResponse.Body)
Expand Down
159 changes: 80 additions & 79 deletions sdk/client/resourcemanager/poller_lro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,85 +304,86 @@ func TestPollerLRO_InStatus_AcceptedThenInProgressThenSuccess(t *testing.T) {
helpers.assertCalled(t, 3)
}

//func TestPollerLRO_InProvisioningState_AcceptedThenDroppedThenInProgressThenSuccess(t *testing.T) {
// ctx := context.TODO()
// helpers := newLongRunningOperationsEndpoint([]expectedResponse{
// responseWithHttpStatusCode(http.StatusAccepted),
// responseThatDropsTheConnection(),
// responseWithStatusInProvisioningState(statusInProgress),
// responseWithStatusInProvisioningState(statusSucceeded),
// })
// server := httptest.NewServer(http.HandlerFunc(helpers.endpoint(t)))
// defer server.Close()
//
// response := &client.Response{
// Response: helpers.response(),
// }
// client := client.NewClient(server.URL, "MyService", "2020-02-01")
// poller, err := longRunningOperationPollerFromResponse(response, client)
// if err != nil {
// t.Fatal(err.Error())
// }
//
// expectedStatuses := []pollers.PollingStatus{
// pollers.PollingStatusInProgress, // the 202 Accepted
// // NOTE: the Dropped Connection will be ignored/silently retried
// pollers.PollingStatusInProgress, // working on it
// pollers.PollingStatusSucceeded, // good
// }
// for i, expected := range expectedStatuses {
// t.Logf("Poll %d..", i)
// result, err := poller.Poll(ctx)
// if err != nil {
// t.Fatal(err.Error())
// }
// if result.Status != expected {
// t.Fatalf("expected status to be %q but got %q", expected, result.Status)
// }
// }
// // sanity-checking - expect 4 calls but 3 statuses (since the dropped connection is silently retried)
// helpers.assertCalled(t, 4)
//}

//func TestPollerLRO_InStatus_AcceptedThenDroppedThenInProgressThenSuccess(t *testing.T) {
// ctx := context.TODO()
// helpers := newLongRunningOperationsEndpoint([]expectedResponse{
// responseWithHttpStatusCode(http.StatusAccepted),
// responseThatDropsTheConnection(),
// responseWithStatusInStatusField(statusInProgress),
// responseWithStatusInStatusField(statusSucceeded),
// })
// server := httptest.NewServer(http.HandlerFunc(helpers.endpoint(t)))
// defer server.Close()
//
// response := &client.Response{
// Response: helpers.response(),
// }
// client := client.NewClient(server.URL, "MyService", "2020-02-01")
// poller, err := longRunningOperationPollerFromResponse(response, client)
// if err != nil {
// t.Fatal(err.Error())
// }
//
// expectedStatuses := []pollers.PollingStatus{
// pollers.PollingStatusInProgress, // the 202 Accepted
// // NOTE: the Dropped Connection will be ignored/silently retried
// pollers.PollingStatusInProgress, // working on it
// pollers.PollingStatusSucceeded, // good
// }
// for i, expected := range expectedStatuses {
// t.Logf("Poll %d..", i)
// result, err := poller.Poll(ctx)
// if err != nil {
// t.Fatal(err.Error())
// }
// if result.Status != expected {
// t.Fatalf("expected status to be %q but got %q", expected, result.Status)
// }
// }
// // sanity-checking - expect 4 calls but 3 statuses (since the dropped connection is silently retried)
// helpers.assertCalled(t, 4)
//}
func TestPollerLRO_InProvisioningState_AcceptedThenDroppedThenInProgressThenSuccess(t *testing.T) {
ctx := context.TODO()
helpers := newLongRunningOperationsEndpoint([]expectedResponse{
responseWithHttpStatusCode(http.StatusAccepted),
responseThatDropsTheConnection(),
responseWithStatusInProvisioningState(statusInProgress),
responseWithStatusInProvisioningState(statusSucceeded),
})
server := httptest.NewServer(http.HandlerFunc(helpers.endpoint(t)))
defer server.Close()

response := &client.Response{
Response: helpers.response(),
}
client := client.NewClient(server.URL, "MyService", "2020-02-01")
poller, err := longRunningOperationPollerFromResponse(response, client)
if err != nil {
t.Fatal(err.Error())
}

expectedStatuses := []pollers.PollingStatus{
pollers.PollingStatusInProgress, // the 202 Accepted
pollers.PollingStatusUnknown,
// NOTE: the Dropped Connection will be ignored/silently retried
pollers.PollingStatusInProgress, // working on it
pollers.PollingStatusSucceeded, // good
}
for i, expected := range expectedStatuses {
t.Logf("Poll %d..", i)
result, err := poller.Poll(ctx)
if err != nil {
t.Fatal(err.Error())
}
if result.Status != expected {
t.Fatalf("expected status to be %q but got %q", expected, result.Status)
}
}
// sanity-checking - expect 4 calls but 3 statuses (since the dropped connection is silently retried)
helpers.assertCalled(t, 4)
}

func TestPollerLRO_InStatus_AcceptedThenDroppedThenInProgressThenSuccess(t *testing.T) {
ctx := context.TODO()
helpers := newLongRunningOperationsEndpoint([]expectedResponse{
responseWithHttpStatusCode(http.StatusAccepted),
responseThatDropsTheConnection(),
responseWithStatusInStatusField(statusInProgress),
responseWithStatusInStatusField(statusSucceeded),
})
server := httptest.NewServer(http.HandlerFunc(helpers.endpoint(t)))
defer server.Close()

response := &client.Response{
Response: helpers.response(),
}
client := client.NewClient(server.URL, "MyService", "2020-02-01")
poller, err := longRunningOperationPollerFromResponse(response, client)
if err != nil {
t.Fatal(err.Error())
}

expectedStatuses := []pollers.PollingStatus{
pollers.PollingStatusInProgress, // the 202 Accepted
pollers.PollingStatusUnknown,
pollers.PollingStatusInProgress, // working on it
pollers.PollingStatusSucceeded, // good
}
for i, expected := range expectedStatuses {
t.Logf("Poll %d..", i)
result, err := poller.Poll(ctx)
if err != nil {
t.Fatal(err.Error())
}
if result.Status != expected {
t.Fatalf("expected status to be %q but got %q", expected, result.Status)
}
}
// sanity-checking - expect 4 calls but 3 statuses (since the dropped connection is silently retried)
helpers.assertCalled(t, 4)
}

func TestPollerLRO_InProvisioningState_404ThenImmediateSuccess(t *testing.T) {
// This scenario handles the API returning a 404 initially, then succeeded
Expand Down
26 changes: 21 additions & 5 deletions sdk/client/resourcemanager/poller_provisioning_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package resourcemanager

import (
"context"
"errors"
"fmt"
"net/http"
"net/url"
Expand All @@ -27,6 +28,9 @@ type provisioningStatePoller struct {
initialRetryDuration time.Duration
originalUri string
resourcePath string

droppedConnectionCount int
maxDroppedConnections int
}

func provisioningStatePollerFromResponse(response *client.Response, lroIsSelfReference bool, client *Client, pollingInterval time.Duration) (*provisioningStatePoller, error) {
Expand Down Expand Up @@ -58,11 +62,12 @@ func provisioningStatePollerFromResponse(response *client.Response, lroIsSelfRef
}

return &provisioningStatePoller{
apiVersion: apiVersion,
client: client,
initialRetryDuration: pollingInterval,
originalUri: originalUri,
resourcePath: resourcePath,
apiVersion: apiVersion,
client: client,
initialRetryDuration: pollingInterval,
originalUri: originalUri,
resourcePath: resourcePath,
maxDroppedConnections: 3,
}, nil
}

Expand All @@ -84,6 +89,17 @@ func (p *provisioningStatePoller) Poll(ctx context.Context) (*pollers.PollResult
}
resp, err := p.client.Execute(ctx, req)
if err != nil {
var e *url.Error
if errors.As(err, &e) {
p.droppedConnectionCount++
if p.droppedConnectionCount < p.maxDroppedConnections {
return &pollers.PollResult{
PollInterval: p.initialRetryDuration,
Status: pollers.PollingStatusUnknown,
}, nil
}
}

return nil, fmt.Errorf("executing request: %+v", err)
}
if resp == nil {
Expand Down
Loading

0 comments on commit 99247a8

Please sign in to comment.