diff --git a/README.md b/README.md index 2a4e7588..7ed04250 100644 --- a/README.md +++ b/README.md @@ -186,6 +186,9 @@ These samples demonstrate some common control flow patterns using Temporal's Go - [**Sticky task queue for activities**](./activities-sticky-queues): Demonstrates how to create a sticky task queue to run certain activities on the same host. +- [**Async activities invocation**](./activities-async): Demonstrates how to invoke two activities in parallel and + return when both of them have completed without blocking between one activity invocation and the following one + ### Scenario based examples - [**DSL Workflow**](./dsl): Demonstrates how to implement a @@ -222,7 +225,6 @@ resource waiting its successful completion Mostly examples we haven't yet ported from https://github.com/temporalio/samples-java/ -- Async activity calling: *Example to be completed* - Async lambda: *Example to be completed* - Periodic Workflow: Workflow that executes some logic periodically. *Example to be completed* - Exception propagation and wrapping: *Example to be completed* diff --git a/activities-async/README.md b/activities-async/README.md new file mode 100644 index 00000000..9f431d7c --- /dev/null +++ b/activities-async/README.md @@ -0,0 +1,19 @@ +# Async Activities Invocation in GoLang + +This sample shows how to submit two activities in non-blocking fashion and wait until both are completed to return, +or fail if one of the two fails. The example uses two activity invocation, registers that on a selector, and invoke +`selector.Select(ctx)` twice, since each selector call will proceed as soon as one result is available. + + + +### Running this sample + +```bash +go run activities-async/worker/main.go +``` + +Start the Workflow Execution: + +```bash +go run activities-async/starter/main.go +``` diff --git a/activities-async/activities.go b/activities-async/activities.go new file mode 100644 index 00000000..3677aa46 --- /dev/null +++ b/activities-async/activities.go @@ -0,0 +1,13 @@ +package activities_async + +import ( + "context" +) + +func SayHello(ctx context.Context, param string) (string, error) { + return "Hello " + param + "!", nil +} + +func SayGoodbye(ctx context.Context, param string) (string, error) { + return "Goodbye " + param + "!", nil +} diff --git a/activities-async/starter/main.go b/activities-async/starter/main.go new file mode 100644 index 00000000..a8f18f9b --- /dev/null +++ b/activities-async/starter/main.go @@ -0,0 +1,37 @@ +package main + +import ( + "context" + activities_async "github.com/temporalio/samples-go/activities-async" + "log" + + "go.temporal.io/sdk/client" +) + +func main() { + // The client is a heavyweight object that should be created once per process. + c, err := client.Dial(client.Options{}) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + workflowOptions := client.StartWorkflowOptions{ + TaskQueue: "async-activities-task-queue", + } + + we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, activities_async.AsyncActivitiesWorkflow) + if err != nil { + log.Fatalln("Unable to execute workflow", err) + } + + log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) + + // Synchronously wait for the workflow completion. + var result string + err = we.Get(context.Background(), &result) + if err != nil { + log.Fatalln("Unable get workflow result", err) + } + log.Println("Workflow completed with result ", result) +} diff --git a/activities-async/worker/main.go b/activities-async/worker/main.go new file mode 100644 index 00000000..e8fa5cb2 --- /dev/null +++ b/activities-async/worker/main.go @@ -0,0 +1,27 @@ +package main + +import ( + activities_async "github.com/temporalio/samples-go/activities-async" + "log" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" +) + +func main() { + // The client and worker are heavyweight objects that should be created once per process. + c, err := client.Dial(client.Options{}) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + w := worker.New(c, "async-activities-task-queue", worker.Options{}) + w.RegisterWorkflow(activities_async.AsyncActivitiesWorkflow) + + w.RegisterActivity(activities_async.SayHello) + w.RegisterActivity(activities_async.SayGoodbye) + err = w.Run(worker.InterruptCh()) + if err != nil { + log.Fatalln("Unable to start worker", err) + } +} diff --git a/activities-async/workflow.go b/activities-async/workflow.go new file mode 100644 index 00000000..21708376 --- /dev/null +++ b/activities-async/workflow.go @@ -0,0 +1,33 @@ +package activities_async + +import ( + "go.temporal.io/sdk/workflow" + "time" +) + +func AsyncActivitiesWorkflow(ctx workflow.Context) (res string, err error) { + selector := workflow.NewSelector(ctx) + var res1 string + var res2 string + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 10 * time.Second, + } + ctx = workflow.WithActivityOptions(ctx, ao) + fut1 := workflow.ExecuteActivity(ctx, SayHello, "Temporal") + fut2 := workflow.ExecuteActivity(ctx, SayGoodbye, "Temporal") + selector.AddFuture(fut1, func(future workflow.Future) { + err = future.Get(ctx, &res1) + }) + selector.AddFuture(fut2, func(future workflow.Future) { + err = future.Get(ctx, &res2) + }) + selector.Select(ctx) + if err != nil { + return + } + selector.Select(ctx) + if err == nil { + res = res1 + " It was great to meet you, but time has come. " + res2 + } + return +} diff --git a/activities-async/workflow_test.go b/activities-async/workflow_test.go new file mode 100644 index 00000000..a73d7a3e --- /dev/null +++ b/activities-async/workflow_test.go @@ -0,0 +1,47 @@ +package activities_async + +import ( + "context" + "errors" + "github.com/stretchr/testify/mock" + "testing" + + "github.com/stretchr/testify/require" + "go.temporal.io/sdk/testsuite" +) + +func Test_Workflow_Succeeds(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + + env.RegisterActivity(SayHello) + env.RegisterActivity(SayGoodbye) + env.ExecuteWorkflow(AsyncActivitiesWorkflow) + + require.True(t, env.IsWorkflowCompleted()) + require.NoError(t, env.GetWorkflowError()) + var res string + err := env.GetWorkflowResult(&res) + require.NoError(t, err) + require.Equal(t, "Hello Temporal! It was great to meet you, but time has come. Goodbye Temporal!", res) +} + +func testWorkflowFailsWhenActivityFail(t *testing.T, a interface{}) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + env.RegisterActivity(SayHello) + env.RegisterActivity(SayGoodbye) + env.OnActivity(a, mock.Anything, mock.Anything).Return(func(ctx context.Context, arg string) (string, error) { + return "", errors.New("activity failed") + }) + env.ExecuteWorkflow(AsyncActivitiesWorkflow) + require.True(t, env.IsWorkflowCompleted()) + require.Error(t, env.GetWorkflowError()) +} + +func Test_Workflow_FailsIfSayHelloFails(t *testing.T) { + testWorkflowFailsWhenActivityFail(t, SayHello) +} +func Test_Workflow_FailsIfSayGoodbyeFails(t *testing.T) { + testWorkflowFailsWhenActivityFail(t, SayGoodbye) +}