Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async activity Invocation Example in Go #286

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ These samples demonstrate some common control flow patterns using Temporal's Go
- [**Sticky task queue for activities**](./activities-sticky-queues): Demonstrates how
to create a sticky task queue to run certain activities on the same host.

- [**Async activities invocation**](./activities-async): Demonstrates how to invoke two activities in parallel and
return when both of them have completed without blocking between one activity invocation and the following one

### Scenario based examples

- [**DSL Workflow**](./dsl): Demonstrates how to implement a
Expand Down Expand Up @@ -222,7 +225,6 @@ resource waiting its successful completion

Mostly examples we haven't yet ported from https://github.com/temporalio/samples-java/

- Async activity calling: *Example to be completed*
- Async lambda: *Example to be completed*
- Periodic Workflow: Workflow that executes some logic periodically. *Example to be completed*
- Exception propagation and wrapping: *Example to be completed*
Expand Down
19 changes: 19 additions & 0 deletions activities-async/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Async Activities Invocation in GoLang

This sample shows how to submit two activities in non-blocking fashion and wait until both are completed to return,
or fail if one of the two fails. The example uses two activity invocation, registers that on a selector, and invoke
`selector.Select(ctx)` twice, since each selector call will proceed as soon as one result is available.



### Running this sample

```bash
go run activities-async/worker/main.go
```

Start the Workflow Execution:

```bash
go run activities-async/starter/main.go
```
13 changes: 13 additions & 0 deletions activities-async/activities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package activities_async

import (
"context"
)

func SayHello(ctx context.Context, param string) (string, error) {
return "Hello " + param + "!", nil
}

func SayGoodbye(ctx context.Context, param string) (string, error) {
return "Goodbye " + param + "!", nil
}
37 changes: 37 additions & 0 deletions activities-async/starter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"context"
activities_async "github.com/temporalio/samples-go/activities-async"
"log"

"go.temporal.io/sdk/client"
)

func main() {
// The client is a heavyweight object that should be created once per process.
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

workflowOptions := client.StartWorkflowOptions{
TaskQueue: "async-activities-task-queue",
}

we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, activities_async.AsyncActivitiesWorkflow)
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}

log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())

// Synchronously wait for the workflow completion.
var result string
err = we.Get(context.Background(), &result)
if err != nil {
log.Fatalln("Unable get workflow result", err)
}
log.Println("Workflow completed with result ", result)
}
27 changes: 27 additions & 0 deletions activities-async/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import (
activities_async "github.com/temporalio/samples-go/activities-async"
"log"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

func main() {
// The client and worker are heavyweight objects that should be created once per process.
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()
w := worker.New(c, "async-activities-task-queue", worker.Options{})
w.RegisterWorkflow(activities_async.AsyncActivitiesWorkflow)

w.RegisterActivity(activities_async.SayHello)
w.RegisterActivity(activities_async.SayGoodbye)
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}
33 changes: 33 additions & 0 deletions activities-async/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package activities_async

import (
"go.temporal.io/sdk/workflow"
"time"
)

func AsyncActivitiesWorkflow(ctx workflow.Context) (res string, err error) {
selector := workflow.NewSelector(ctx)
var res1 string
var res2 string
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)
fut1 := workflow.ExecuteActivity(ctx, SayHello, "Temporal")
fut2 := workflow.ExecuteActivity(ctx, SayGoodbye, "Temporal")
selector.AddFuture(fut1, func(future workflow.Future) {
err = future.Get(ctx, &res1)
})
selector.AddFuture(fut2, func(future workflow.Future) {
err = future.Get(ctx, &res2)
})
selector.Select(ctx)
if err != nil {
return
}
selector.Select(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an err was reported by the first select, you have swallowed it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed with 7156e31

if err == nil {
res = res1 + " It was great to meet you, but time has come. " + res2
}
return
}
47 changes: 47 additions & 0 deletions activities-async/workflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package activities_async

import (
"context"
"errors"
"github.com/stretchr/testify/mock"
"testing"

"github.com/stretchr/testify/require"
"go.temporal.io/sdk/testsuite"
)

func Test_Workflow_Succeeds(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

env.RegisterActivity(SayHello)
env.RegisterActivity(SayGoodbye)
env.ExecuteWorkflow(AsyncActivitiesWorkflow)

require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
var res string
err := env.GetWorkflowResult(&res)
require.NoError(t, err)
require.Equal(t, "Hello Temporal! It was great to meet you, but time has come. Goodbye Temporal!", res)
}

func testWorkflowFailsWhenActivityFail(t *testing.T, a interface{}) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
env.RegisterActivity(SayHello)
env.RegisterActivity(SayGoodbye)
env.OnActivity(a, mock.Anything, mock.Anything).Return(func(ctx context.Context, arg string) (string, error) {
return "", errors.New("activity failed")
})
env.ExecuteWorkflow(AsyncActivitiesWorkflow)
require.True(t, env.IsWorkflowCompleted())
require.Error(t, env.GetWorkflowError())
}

func Test_Workflow_FailsIfSayHelloFails(t *testing.T) {
testWorkflowFailsWhenActivityFail(t, SayHello)
}
func Test_Workflow_FailsIfSayGoodbyeFails(t *testing.T) {
testWorkflowFailsWhenActivityFail(t, SayGoodbye)
}