diff --git a/.gitignore b/.gitignore index 3ff2e24..92c13c4 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ build/ .project .settings .classpath -repo/ \ No newline at end of file +repo/ +Nuget.Config \ No newline at end of file diff --git a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java index 5f7ab9e..9dd6c61 100644 --- a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java +++ b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java @@ -693,6 +693,19 @@ private void handleSubOrchestrationFailed(HistoryEvent e){ task.completeExceptionally(exception); } + private void handleEventSentEvent(HistoryEvent e) { + int taskId = e.getEventId(); + EventSentEvent eventSentEvent = e.getEventSent(); + OrchestratorAction taskAction = this.pendingActions.remove(taskId); + if (taskAction == null) { + String message = String.format( + "Non-deterministic orchestrator detected: a history event scheduling an sent-event task with sequence ID %d and name '%s' was replayed but the current orchestrator implementation didn't actually schedule this task. Was a change made to the orchestrator code after this instance had already started running?", + taskId, + eventSentEvent.getName()); + throw new NonDeterministicOrchestratorException(message); + } + } + private void handleExecutionTerminated(HistoryEvent e) { ExecutionTerminatedEvent executionTerminatedEvent = e.getExecutionTerminated(); this.completeInternal(executionTerminatedEvent.getInput().getValue(), null, OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED); @@ -772,6 +785,7 @@ private void processEvent(HistoryEvent e) { this.setCurrentInstant(instant); break; case ORCHESTRATORCOMPLETED: + case EXECUTIONCOMPLETED: // No action break; case EXECUTIONSTARTED: @@ -791,8 +805,6 @@ private void processEvent(HistoryEvent e) { TaskOrchestration orchestrator = factory.create(); orchestrator.run(this); break; -// case EXECUTIONCOMPLETED: -// break; // case EXECUTIONFAILED: // break; case EXECUTIONTERMINATED: @@ -822,8 +834,10 @@ private void processEvent(HistoryEvent e) { case SUBORCHESTRATIONINSTANCEFAILED: this.handleSubOrchestrationFailed(e); break; -// case EVENTSENT: -// break; + case EVENTSENT: + this.handleEventSentEvent(e); +// this.handleEventRaised(e); + break; case EVENTRAISED: this.handleEventRaised(e); break; diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index 5d6a47f..0b20722 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -1110,4 +1110,50 @@ private static String getExceptionMessage(String taskName, int expectedTaskId, S expectedTaskId, expectedExceptionMessage); } + + @Test + void sendEvent() throws IOException, InterruptedException, TimeoutException { + final String orchestratorOne = "Orchestrator1"; + final String orchestratorTwo = "Orchestrator2"; + final String instanceId = "testId"; + final String eventName = "testEvent"; + final String eventPayload = "testPayload"; + final String finishMessage = "Finished Sending Event"; + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorOne, ctx -> { + String awaitInput = ctx.waitForExternalEvent(eventName, String.class).await(); + ctx.complete(awaitInput); + }) + .addOrchestrator(orchestratorTwo, ctx -> { + ctx.sendEvent(instanceId, eventName, eventPayload); + ctx.complete(finishMessage); + }) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + client.scheduleNewOrchestrationInstance(orchestratorOne, null, instanceId); + String orchestratorTwoID = client.scheduleNewOrchestrationInstance(orchestratorTwo); + + OrchestrationMetadata orchestratorOneInstance = client.waitForInstanceCompletion( + instanceId, + defaultTimeout, + true); + + assertNotNull(orchestratorOneInstance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, orchestratorOneInstance.getRuntimeStatus()); + String outputOne = orchestratorOneInstance.readOutputAs(String.class); + assertEquals(eventPayload, outputOne); + + OrchestrationMetadata orchestratorTwoInstance = client.waitForInstanceCompletion( + orchestratorTwoID, + defaultTimeout, + true); + + assertNotNull(orchestratorTwoInstance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, orchestratorTwoInstance.getRuntimeStatus()); + String outputTwo = orchestratorTwoInstance.readOutputAs(String.class); + assertEquals(finishMessage, outputTwo); + } + } } diff --git a/samples-azure-functions/src/main/java/com/functions/SendEvent.java b/samples-azure-functions/src/main/java/com/functions/SendEvent.java new file mode 100644 index 0000000..413bf78 --- /dev/null +++ b/samples-azure-functions/src/main/java/com/functions/SendEvent.java @@ -0,0 +1,72 @@ +package com.functions; + +import com.microsoft.azure.functions.ExecutionContext; +import com.microsoft.azure.functions.HttpMethod; +import com.microsoft.azure.functions.HttpRequestMessage; +import com.microsoft.azure.functions.HttpResponseMessage; +import com.microsoft.azure.functions.annotation.AuthorizationLevel; +import com.microsoft.azure.functions.annotation.FunctionName; +import com.microsoft.azure.functions.annotation.HttpTrigger; +import com.microsoft.durabletask.DurableTaskClient; +import com.microsoft.durabletask.OrchestrationMetadata; +import com.microsoft.durabletask.Task; +import com.microsoft.durabletask.TaskOrchestrationContext; +import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger; +import com.microsoft.durabletask.azurefunctions.DurableClientContext; +import com.microsoft.durabletask.azurefunctions.DurableClientInput; +import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.TimeoutException; + +public class SendEvent { + + private final String instanceId = "waitEventID"; + private final String eventName = "testEvent"; + private final String eventData = "Hello World!"; + /** + * This HTTP-triggered function starts the orchestration. + */ + @FunctionName("WaitEventOrchestration") + public HttpResponseMessage waitEventOrchestration( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) throws TimeoutException, InterruptedException { + context.getLogger().info("Java HTTP trigger processed a request."); + DurableTaskClient client = durableContext.getClient(); + client.scheduleNewOrchestrationInstance("WaitEvent", null, instanceId); + context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); + } + + @FunctionName("SendEventOrchestration") + public HttpResponseMessage sendEventOrchestration( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) throws TimeoutException, InterruptedException { + context.getLogger().info("Java HTTP trigger processed a request."); + DurableTaskClient client = durableContext.getClient(); + String instanceId = client.scheduleNewOrchestrationInstance("SendEvent", null, "sendEventID"); + context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); + } + // + @FunctionName("WaitEvent") + public String waitEvent( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx, + final ExecutionContext context) { + String await = ctx.waitForExternalEvent(eventName, String.class).await(); + context.getLogger().info("Event received with payload: " + await); + return ctx.callActivity("Capitalize", await, String.class).await(); + } + + @FunctionName("SendEvent") + public void sendEvent( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx, + final ExecutionContext context) { + ctx.sendEvent(instanceId, eventName, eventData); + context.getLogger().info("Event sent"); + return; + } +} diff --git a/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java b/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java index a174b39..54038a1 100644 --- a/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java +++ b/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java @@ -37,7 +37,6 @@ public void basicChain() throws InterruptedException { assertEquals("Completed", runTimeStatus); } - @Test public void continueAsNew() throws InterruptedException { String startOrchestrationPath = "api/ContinueAsNew"; @@ -59,4 +58,27 @@ public void continueAsNew() throws InterruptedException { runTimeStatus = statusResponse.jsonPath().get("runtimeStatus"); assertEquals("Terminated", runTimeStatus); } + + @Test + public void sendEvent() throws InterruptedException { + String waitEventOrchestrationPath = "api/WaitEventOrchestration"; + String SendEventOrchestrationPath = "api/SendEventOrchestration"; + Response waitEventOrchestrationResponse = post(waitEventOrchestrationPath); + JsonPath jsonPath = waitEventOrchestrationResponse.jsonPath(); + String waitEventOrchestrationStatusQueryGetUri = jsonPath.get("statusQueryGetUri"); + String runTimeStatus; + for (int i = 0; i < 10; i++) { + Response statusResponse = get(waitEventOrchestrationStatusQueryGetUri); + runTimeStatus = statusResponse.jsonPath().get("runtimeStatus"); + assertEquals("Running", runTimeStatus); + Thread.sleep(1000); + } + + post(SendEventOrchestrationPath); + Thread.sleep(1000); + + Response statusResponse = get(waitEventOrchestrationStatusQueryGetUri); + runTimeStatus = statusResponse.jsonPath().get("runtimeStatus"); + assertEquals("Completed", runTimeStatus); + } }