Skip to content

Commit

Permalink
Update-with-Start
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanos committed Sep 3, 2024
1 parent 808b3a9 commit 4b5167e
Show file tree
Hide file tree
Showing 13 changed files with 3,036 additions and 3,693 deletions.
20 changes: 9 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
module github.com/temporalio/omes

go 1.21

toolchain go1.22.5
go 1.22.6

require (
github.com/davecgh/go-spew v1.1.1
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.4
github.com/google/uuid v1.6.0
Expand All @@ -13,12 +12,12 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
github.com/temporalio/features v0.0.0-20240806202554-bdfe567c9d89
go.temporal.io/api v1.36.0
go.temporal.io/sdk v1.28.1
go.temporal.io/api v1.38.0
go.temporal.io/sdk v1.28.2-0.20240829163125-e85a098eaee2
go.uber.org/zap v1.27.0
golang.org/x/mod v0.20.0
golang.org/x/sync v0.8.0
golang.org/x/sys v0.23.0
golang.org/x/sys v0.24.0
google.golang.org/protobuf v1.34.2
)

Expand All @@ -32,13 +31,12 @@ require (
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.21.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/nexus-rpc/sdk-go v0.0.9 // indirect
github.com/nexus-rpc/sdk-go v0.0.10 // indirect
github.com/pborman/uuid v1.2.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/robfig/cron v1.2.0 // indirect
Expand All @@ -48,8 +46,8 @@ require (
golang.org/x/net v0.28.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/time v0.6.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240805194559-2c9e96a0b5d4 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240805194559-2c9e96a0b5d4 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240822170219-fc7c04adadcd // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect
google.golang.org/grpc v1.65.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
201 changes: 14 additions & 187 deletions go.sum

Large diffs are not rendered by default.

21 changes: 16 additions & 5 deletions loadgen/kitchen-sink-gen/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,21 @@ impl<'a> Arbitrary<'a> for ClientAction {

impl<'a> Arbitrary<'a> for WithStartClientAction {
fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
let mut signal_action: DoSignal = u.arbitrary()?;
signal_action.with_start = true;
Ok(Self {
variant: Some(with_start_client_action::Variant::DoSignal(signal_action)),
})
let action_kind = u.int_in_range(0..=1)?;
let variant = match action_kind {
0 => with_start_client_action::Variant::DoSignal({
let mut signal_action: DoSignal = u.arbitrary()?;
signal_action.with_start = true;
signal_action
}),
1 => with_start_client_action::Variant::DoUpdate({
let mut update_action: DoUpdate = u.arbitrary()?;
update_action.with_start = true;
update_action
}),
_ => unreachable!(),
};
Ok(Self { variant: Some(variant) })
}
}

Expand Down Expand Up @@ -481,6 +491,7 @@ impl<'a> Arbitrary<'a> for DoUpdate {
Ok(Self {
variant: Some(variant),
failure_expected,
with_start: u.arbitrary()?,
})
}
}
Expand Down
72 changes: 48 additions & 24 deletions loadgen/kitchensink/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/workflow"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -75,6 +76,8 @@ func (e *ClientActionsExecutor) Start(
e.Handle, err = e.Client.ExecuteWorkflow(ctx, e.StartOptions, e.WorkflowType, e.WorkflowInput)
} else if sig := withStartAction.GetDoSignal(); sig != nil {
e.Handle, err = e.executeSignalAction(ctx, sig)
} else if upd := withStartAction.GetDoUpdate(); upd != nil {
e.Handle, err = e.executeUpdateAction(ctx, upd)
} else {
return fmt.Errorf("unsupported with_start_action: %v", withStartAction.String())
}
Expand Down Expand Up @@ -147,30 +150,7 @@ func (e *ClientActionsExecutor) executeClientAction(ctx context.Context, action
_, err = e.executeSignalAction(ctx, sig)
return err
} else if update := action.GetDoUpdate(); update != nil {
var handle client.WorkflowUpdateHandle
if actionsUpdate := update.GetDoActions(); actionsUpdate != nil {
handle, err = e.Client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{
WorkflowID: e.StartOptions.ID,
UpdateName: "do_actions_update",
WaitForStage: client.WorkflowUpdateStageCompleted,
Args: []any{actionsUpdate},
})
} else if handler := update.GetCustom(); handler != nil {
handle, err = e.Client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{
WorkflowID: e.StartOptions.ID,
UpdateName: handler.Name,
WaitForStage: client.WorkflowUpdateStageCompleted,
Args: []any{handler.Args},
})
} else {
return fmt.Errorf("do_update must recognizable variant")
}
if err == nil {
err = handle.Get(ctx, nil)
}
if update.FailureExpected {
err = nil
}
_, err = e.executeUpdateAction(ctx, update)
return err
} else if query := action.GetDoQuery(); query != nil {
if query.GetReportState() != nil {
Expand Down Expand Up @@ -207,8 +187,52 @@ func (e *ClientActionsExecutor) executeSignalAction(ctx context.Context, sig *Do
}

if sig.WithStart {
e.StartOptions.WithStartOperation = nil
return e.Client.SignalWithStartWorkflow(
ctx, e.StartOptions.ID, signalName, signalArgs, e.StartOptions, e.WorkflowType, e.WorkflowInput)
}
return nil, e.Client.SignalWorkflow(ctx, e.StartOptions.ID, "", signalName, signalArgs)
}

func (e *ClientActionsExecutor) executeUpdateAction(ctx context.Context, upd *DoUpdate) (run client.WorkflowRun, err error) {
var opts client.UpdateWorkflowOptions
if actionsUpdate := upd.GetDoActions(); actionsUpdate != nil {
opts = client.UpdateWorkflowOptions{
WorkflowID: e.StartOptions.ID,
UpdateName: "do_actions_update",
WaitForStage: client.WorkflowUpdateStageCompleted,
Args: []any{actionsUpdate},
}
} else if handler := upd.GetCustom(); handler != nil {
opts = client.UpdateWorkflowOptions{
WorkflowID: e.StartOptions.ID,
UpdateName: handler.Name,
WaitForStage: client.WorkflowUpdateStageCompleted,
Args: []any{handler.Args},
}
} else {
return nil, fmt.Errorf("do_update must recognizable variant")
}

var handle client.WorkflowUpdateHandle
if upd.WithStart {
op := client.NewUpdateWithStartWorkflowOperation(opts)
e.StartOptions.WithStartOperation = op
e.StartOptions.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING

run, err = e.Client.ExecuteWorkflow(ctx, e.StartOptions, e.WorkflowType, e.WorkflowInput)
if err == nil {
handle, err = op.Get(ctx)
}
} else {
handle, err = e.Client.UpdateWorkflow(ctx, opts)
}

if err == nil {
err = handle.Get(ctx, nil)
}
if upd.FailureExpected {
err = nil
}
return run, err
}
Loading

0 comments on commit 4b5167e

Please sign in to comment.