Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rewind client API #123

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class HttpManagementPayload {
private final String terminatePostUri;
private final String resumePostUri;
private final String suspendPostUri;
public final String rewindPostUri;

/**
* Creates a {@link HttpManagementPayload} to manage orchestration instances
Expand All @@ -38,6 +39,7 @@ public HttpManagementPayload(
this.terminatePostUri = instanceStatusURL + "/terminate?reason={text}&" + requiredQueryStringParameters;
this.resumePostUri = instanceStatusURL + "/resume?reason={text}&" + requiredQueryStringParameters;
this.suspendPostUri = instanceStatusURL + "/suspend?reason={text}&" + requiredQueryStringParameters;
this.rewindPostUri = instanceStatusURL + "/rewind?reason={text}&" + requiredQueryStringParameters;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,4 +321,19 @@ public void resumeInstance(String instanceId) {
* @param reason the reason for resuming the orchestration instance
*/
public abstract void resumeInstance(String instanceId, @Nullable String reason);

/**
* Rewinds a failed orchestration instance.
* @param instanceId the ID of the orchestration instance to rewind
* @param reason the reason for rewinding the orchestration instance
*/
public abstract void rewindInstance(String instanceId, @Nullable String reason);

/**
* Rewinds a failed orchestration instance.
* @param instanceId the ID of the orchestration instance to rewind
*/
public void rewindInstance(String instanceId) {
this.rewindInstance(instanceId, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,16 @@ public String restartInstance(String instanceId, boolean restartWithNewInstanceI
}
}

@Override
public void rewindInstance(String instanceId, @Nullable String reason) {
RewindInstanceRequest.Builder rewindInstanceRequestBuilder = RewindInstanceRequest.newBuilder();
rewindInstanceRequestBuilder.setInstanceId(instanceId);
if (reason != null) {
rewindInstanceRequestBuilder.setReason(StringValue.of(reason));
}
this.sidecarClient.rewindInstance(rewindInstanceRequestBuilder.build());
}

private PurgeResult toPurgeResult(PurgeInstancesResponse response){
return new PurgeResult(response.getDeletedInstanceCount());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.functions;

import com.microsoft.azure.functions.*;
import com.microsoft.azure.functions.annotation.AuthorizationLevel;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.annotation.HttpTrigger;
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.TaskOrchestrationContext;
import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger;
import com.microsoft.durabletask.azurefunctions.DurableClientContext;
import com.microsoft.durabletask.azurefunctions.DurableClientInput;
import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger;

import java.util.Optional;

/**
* Azure Durable Functions with HTTP trigger - Rewind instance sample.
*/
public class RewindInstance {
private static int approvalFlag = 0;

/**
* This HTTP-triggered function starts the approval orchestration.
*/
@FunctionName("ApprovalWorkflowOrchestration")
public HttpResponseMessage approvalWorkflowOrchestration(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
final ExecutionContext context) throws InterruptedException {
context.getLogger().info("Java HTTP trigger processed a request.");

DurableTaskClient client = durableContext.getClient();
String instanceId = client.scheduleNewOrchestrationInstance("ApprovalWorkflow");
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
return durableContext.createCheckStatusResponse(request, instanceId);
}

@FunctionName("ApprovalWorkflow")
public int approvalWorkflow(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
int result = 0;
result += ctx.callActivity("RequestPrimaryApproval", 1, Integer.class).await();
result += ctx.callActivity("RequestSecondaryApproval", 1, Integer.class).await();
return result;
}

/**
* This is the activity function that gets invoked by the approval orchestration.
*/
@FunctionName("RequestPrimaryApproval")
public int requestPrimaryApproval(
@DurableActivityTrigger(name = "name") int number,
final ExecutionContext context) {
return 1;
}

/**
* This is the activity function that fails the first try and is then revived.
*/
@FunctionName("RequestSecondaryApproval")
public int requestSecondaryApproval(
@DurableActivityTrigger(name = "name") int number,
final ExecutionContext context) throws InterruptedException {
return number / approvalFlag++;
}

/**
* This HTTP-triggered function rewinds the orchestration using instanceId.
*/
@FunctionName("RewindInstance")
public String rewindInstance(
shreyas-gopalakrishna marked this conversation as resolved.
Show resolved Hide resolved
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
final ExecutionContext context) {
String instanceId = request.getQueryParameters().getOrDefault("instanceId", "");
String reason = "Orchestrator failed and needs to be revived.";

DurableTaskClient client = durableContext.getClient();
client.rewindInstance(instanceId, reason);
return "Failed orchestration instance is scheduled for rewind.";
}

/**
* This HTTP-triggered function resets the approvalFlag variable for testing purposes.
*/
@FunctionName("ResetApproval")
public static HttpResponseMessage resetApproval(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS)
HttpRequestMessage<Optional<String>> request,
final ExecutionContext context) {
context.getLogger().info("ResetApproval function invoked.");
approvalFlag = 0;
return request.createResponseBuilder(HttpStatus.OK).body(approvalFlag).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@
@Tag("e2e")
public class EndToEndTests {

private static final String hostHealthPingUrl = "/admin/host/ping";
private static final String startOrchestrationUrl = "/api/StartOrchestration";
private static final String startApprovalWorkflowUrl = "/api/ApprovalWorkflowOrchestration";
private static final String rewindInstanceFunctionUrl = "/api/RewindInstance";
private static final String resetApprovalUrl = "/api/ResetApprovalFlag";

@Order(1)
@Test
public void setupHost() {
String hostHealthPingPath = "/admin/host/ping";
post(hostHealthPingPath).then().statusCode(200);
post(hostHealthPingUrl).then().statusCode(200);
}

@ParameterizedTest
Expand Down Expand Up @@ -202,4 +207,74 @@ private boolean pollingCheck(String statusQueryGetUri,
}
return false;
}

@Order(2)
@Test
public void testRewindInstanceJavaAPI() throws InterruptedException {
Response response = post(startApprovalWorkflowUrl);
JsonPath startOrchestrationResponseJson = response.jsonPath();

// Wait for the ApprovalWorkflowOrchestration to fail
Thread.sleep(3000);

String instanceId = startOrchestrationResponseJson.get("id");
String statusQueryGetUri = startOrchestrationResponseJson.get("statusQueryGetUri");
Response statusResponse = get(statusQueryGetUri);
String runtimeStatus = statusResponse.jsonPath().get("runtimeStatus");
assertEquals("Failed", runtimeStatus);

// Rewind the instance using Java API
String rewindInstanceUrl = rewindInstanceFunctionUrl + "?instanceId=" + instanceId;
response = post(rewindInstanceUrl);
assertEquals("Failed orchestration instance is scheduled for rewind.", response.toString());

// Wait for orchestration to rewind and complete
Thread.sleep(3000);

for (int i = 0; i < 5; i++) {
statusResponse = get(statusQueryGetUri);
runtimeStatus = statusResponse.jsonPath().get("runtimeStatus");
if (!"Completed".equals(runtimeStatus)) {
Thread.sleep(1000);
} else break;
}
assertEquals("Completed", runtimeStatus);

// Reset approval for other test cases
post(resetApprovalUrl);
}

@Order(3)
@Test
public void testRewindInstanceHttpAPI() throws InterruptedException {
Response response = post(startApprovalWorkflowUrl);
JsonPath startOrchestrationResponseJson = response.jsonPath();

// Wait for the ApprovalWorkflowOrchestration to fail
Thread.sleep(3000);

String statusQueryGetUri = startOrchestrationResponseJson.get("statusQueryGetUri");
Response statusResponse = get(statusQueryGetUri);
String runtimeStatus = statusResponse.jsonPath().get("runtimeStatus");
assertEquals("Failed", runtimeStatus);

// Rewind the instance using Http API
String rewindPostUri = startOrchestrationResponseJson.get("rewindPostUri");
post(rewindPostUri);

// Wait for orchestration to rewind and complete
Thread.sleep(3000);

for (int i = 0; i < 5; i++) {
statusResponse = get(statusQueryGetUri);
runtimeStatus = statusResponse.jsonPath().get("runtimeStatus");
if (!"Completed".equals(runtimeStatus)) {
Thread.sleep(1000);
} else break;
}
assertEquals("Completed", runtimeStatus);

// Reset approval for other test cases
post(resetApprovalUrl);
}
}
Loading