-
Notifications
You must be signed in to change notification settings - Fork 204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[POC] Show a way to implement cancellable updates #297
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
### Update Cancel Sample | ||
|
||
Here we show an example of a workflow with a long running update. Through the use of an interceptor we are able to cancel the update by sending another special "cancel" update. | ||
|
||
### Steps to run this sample: | ||
1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use). | ||
2) Run the following command to start the worker | ||
``` | ||
go run update-cancel/worker/main.go | ||
``` | ||
3) Run the following command to start the example | ||
``` | ||
go run update-cancel/starter/main.go | ||
``` |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hrmm, not sure this is a pattern we should encourage, especially hiding this logic in an interceptor. I think it may be clearer if we should how to cancel an update context from another update explicitly in workflow code. If they want to make that generic to apply to all updates hidden away in an interceptor, then they can. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason I moved it into the interceptor is to show you don't need to explicitly add support in your workflow There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, but I don't think we should encourage interceptors like this nor provide a general purpose utility. I think we should show a simple example how you can cancel a single update and if someone wants to go the extra step of hiding this from workflow authors, they can. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it just makes the code a less elegant to do it in the workflow, but for the samples repo your right we shouldn't encourage usage of interceptors like this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Definitely less elegant and less reusable. Acknowledged. Kinda on purpose, which is admittedly a bit strange, but we don't want people to blindly copy the approach. What I've found is that many times people want to customize these patterns so seeing them in-workflow is easy to understand. And really, if we wanted to write this properly for reuse, I might recommend a utility instead of an interceptor. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
package update_cancel | ||
|
||
import ( | ||
"errors" | ||
|
||
"go.temporal.io/sdk/interceptor" | ||
"go.temporal.io/sdk/workflow" | ||
) | ||
|
||
const ( | ||
UpdateCancelHandle = "update-cancel" | ||
) | ||
|
||
type workerInterceptor struct { | ||
interceptor.WorkerInterceptorBase | ||
} | ||
|
||
func NewWorkerInterceptor() interceptor.WorkerInterceptor { | ||
return &workerInterceptor{} | ||
} | ||
|
||
func (w *workerInterceptor) InterceptWorkflow( | ||
ctx workflow.Context, | ||
next interceptor.WorkflowInboundInterceptor, | ||
) interceptor.WorkflowInboundInterceptor { | ||
i := &workflowInboundInterceptor{root: w} | ||
i.Next = next | ||
return i | ||
} | ||
|
||
type workflowInboundInterceptor struct { | ||
ctxMap map[string]workflow.CancelFunc | ||
interceptor.WorkflowInboundInterceptorBase | ||
root *workerInterceptor | ||
} | ||
|
||
func (w *workflowInboundInterceptor) Init(outbound interceptor.WorkflowOutboundInterceptor) error { | ||
w.ctxMap = make(map[string]workflow.CancelFunc) | ||
return w.Next.Init(outbound) | ||
} | ||
|
||
func (w *workflowInboundInterceptor) ExecuteWorkflow(ctx workflow.Context, in *interceptor.ExecuteWorkflowInput) (interface{}, error) { | ||
err := workflow.SetUpdateHandlerWithOptions(ctx, UpdateCancelHandle, func(ctx workflow.Context, updateID string) error { | ||
// Cancel the update | ||
w.ctxMap[updateID]() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably want to delete from ctxMap here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't delete because I wanted duplicate cancels to behave the same as the server, but I didn't implement that behaviour yet |
||
return nil | ||
}, workflow.UpdateHandlerOptions{ | ||
Validator: func(ctx workflow.Context, updateID string) error { | ||
// Validate that the update ID is known | ||
if _, ok := w.ctxMap[updateID]; !ok { | ||
return errors.New("unknown update ID") | ||
} | ||
return nil | ||
}, | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return w.Next.ExecuteWorkflow(ctx, in) | ||
} | ||
|
||
func (w *workflowInboundInterceptor) ExecuteUpdate(ctx workflow.Context, in *interceptor.UpdateInput) (interface{}, error) { | ||
ctx, cancel := workflow.WithCancel(ctx) | ||
w.ctxMap[workflow.GetUpdateInfo(ctx).ID] = cancel | ||
return w.Next.ExecuteUpdate(ctx, in) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"log" | ||
"time" | ||
|
||
update_cancel "github.com/temporalio/samples-go/update-cancel" | ||
enumspb "go.temporal.io/api/enums/v1" | ||
updatepb "go.temporal.io/api/update/v1" | ||
"go.temporal.io/sdk/client" | ||
) | ||
|
||
func main() { | ||
c, err := client.Dial(client.Options{}) | ||
if err != nil { | ||
log.Fatalln("Unable to create client", err) | ||
} | ||
defer c.Close() | ||
|
||
workflowOptions := client.StartWorkflowOptions{ | ||
ID: "update_cancel-workflow-ID", | ||
TaskQueue: "update_cancel", | ||
} | ||
|
||
we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, update_cancel.UpdateWorkflow) | ||
if err != nil { | ||
log.Fatalln("Unable to execute workflow", err) | ||
} | ||
|
||
log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) | ||
|
||
cancellableUpdateID := "cancellable-update-ID" | ||
log.Println("Sending update", "UpdateID", cancellableUpdateID) | ||
|
||
// Send an async update request. | ||
handle, err := c.UpdateWorkflowWithOptions(context.Background(), &client.UpdateWorkflowWithOptionsRequest{ | ||
WorkflowID: we.GetID(), | ||
RunID: we.GetRunID(), | ||
UpdateName: update_cancel.UpdateHandle, | ||
UpdateID: cancellableUpdateID, | ||
WaitPolicy: &updatepb.WaitPolicy{ | ||
LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED, | ||
}, | ||
Args: []interface{}{ | ||
4 * time.Hour, | ||
}, | ||
}) | ||
if err != nil { | ||
log.Fatalln("Unable to execute update", err) | ||
} | ||
log.Println("Sent update") | ||
|
||
log.Println("Waiting 5s to send cancel") | ||
time.Sleep(5 * time.Second) | ||
log.Println("Sending cancel to update", "UpdateID", cancellableUpdateID) | ||
|
||
_, err = c.UpdateWorkflow(context.Background(), we.GetID(), we.GetRunID(), update_cancel.UpdateCancelHandle, cancellableUpdateID) | ||
if err != nil { | ||
log.Fatalln("Unable to send cancel", err) | ||
} | ||
log.Println("Sent cancel") | ||
|
||
var sleepTime time.Duration | ||
err = handle.Get(context.Background(), &sleepTime) | ||
if err != nil { | ||
log.Fatalln("Unable to get update result", err) | ||
} | ||
// Update will only sleep for 5s because it was cancelled. | ||
log.Println("Update slept for:", sleepTime) | ||
|
||
if err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), update_cancel.Done, nil); err != nil { | ||
log.Fatalf("failed to send %q signal to workflow: %v", update_cancel.Done, err) | ||
} | ||
var wfresult int | ||
if err = we.Get(context.Background(), &wfresult); err != nil { | ||
log.Fatalf("unable get workflow result: %v", err) | ||
} | ||
log.Println("workflow result:", wfresult) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package update_cancel | ||
|
||
import ( | ||
"time" | ||
|
||
"go.temporal.io/sdk/workflow" | ||
) | ||
|
||
const ( | ||
UpdateHandle = "update_handle" | ||
Done = "done" | ||
) | ||
|
||
func UpdateWorkflow(ctx workflow.Context) error { | ||
if err := workflow.SetUpdateHandler( | ||
ctx, | ||
UpdateHandle, | ||
func(ctx workflow.Context, sleepTime time.Duration) (time.Duration, error) { | ||
dt := workflow.Now(ctx) | ||
workflow.Sleep(ctx, sleepTime) | ||
return workflow.Now(ctx).Sub(dt), nil | ||
}, | ||
); err != nil { | ||
return err | ||
} | ||
|
||
_ = workflow.GetSignalChannel(ctx, Done).Receive(ctx, nil) | ||
return ctx.Err() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
package main | ||
|
||
import ( | ||
"log" | ||
|
||
update_cancel "github.com/temporalio/samples-go/update-cancel" | ||
"go.temporal.io/sdk/client" | ||
sdkinterceptor "go.temporal.io/sdk/interceptor" | ||
"go.temporal.io/sdk/worker" | ||
) | ||
|
||
func main() { | ||
// The client and worker are heavyweight objects that should be created once per process. | ||
c, err := client.Dial(client.Options{}) | ||
if err != nil { | ||
log.Fatalln("Unable to create client", err) | ||
} | ||
defer c.Close() | ||
|
||
w := worker.New(c, "update_cancel", worker.Options{ | ||
Interceptors: []sdkinterceptor.WorkerInterceptor{update_cancel.NewWorkerInterceptor()}, | ||
}) | ||
|
||
w.RegisterWorkflow(update_cancel.UpdateWorkflow) | ||
|
||
err = w.Run(worker.InterruptCh()) | ||
if err != nil { | ||
log.Fatalln("Unable to start worker", err) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update interceptors don't work on the current release of the Go SDK