Skip to content

Commit

Permalink
Add Signal-with-Start (#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanos authored Aug 15, 2024
1 parent dc7885b commit 4b12728
Show file tree
Hide file tree
Showing 11 changed files with 3,114 additions and 1,594 deletions.
11 changes: 10 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,16 @@ jobs:
run: cargo build
- name: Check diff
run: |
[[ -z $(git status --porcelain loadgen/kitchensink/ workers/python/protos/) ]] || (git diff; echo "Protos changed"; false) 1>&2
git config --global core.safecrlf false
git diff > generator.diff
git diff --exit-code
- name: Upload generator diff
uses: actions/upload-artifact@v4
if: always()
with:
name: generator-diff
path: generator.diff
if-no-files-found: ignore

push-latest-docker-images:
uses: ./.github/workflows/all-docker-images.yml
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ Omes (pronounced oh-mess) is the Hebrew word for "load" (עומס).

- [Go](https://golang.org/) 1.20+
- `protoc` and `protoc-gen-go` must be installed
- tip: don't worry about the specific versions here; instead, the GitHub action will make a diff
available for download that you can use with `git apply`
- [Java](https://openjdk.org/) 8+
- TypeScript: [Node](https://nodejs.org) 16+
- [Python](https://www.python.org/) 3.10+
Expand Down
27 changes: 24 additions & 3 deletions loadgen/kitchen-sink-gen/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use crate::protos::temporal::{
api::common::v1::{Memo, Payload, Payloads},
omes::kitchen_sink::{
action, awaitable_choice, client_action, do_actions_update, do_query, do_signal,
do_signal::do_signal_actions, do_update, execute_activity_action, Action, ActionSet,
do_signal::do_signal_actions, do_update, execute_activity_action, with_start_client_action, Action, ActionSet,
AwaitWorkflowState, AwaitableChoice, ClientAction, ClientActionSet, ClientSequence,
DoQuery, DoSignal, DoUpdate, ExecuteActivityAction, ExecuteChildWorkflowAction,
HandlerInvocation, RemoteActivityOptions, ReturnResultAction, SetPatchMarkerAction,
TestInput, TimerAction, UpsertMemoAction, UpsertSearchAttributesAction, WorkflowInput,
WorkflowState,
TestInput, TimerAction, UpsertMemoAction, UpsertSearchAttributesAction, WithStartClientAction,
WorkflowInput, WorkflowState,
},
};
use anyhow::Error;
Expand Down Expand Up @@ -302,10 +302,19 @@ impl<'a> Arbitrary<'a> for TestInput {
fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
// We always want a client sequence
let mut client_sequence: ClientSequence = u.arbitrary()?;

// Sometimes we want a with-start client action
let with_start_action = if u.ratio(80, 100)? {
None
} else {
Some(WithStartClientAction::arbitrary(u)?)
};

let mut ti = Self {
// Input may or may not be present
workflow_input: u.arbitrary()?,
client_sequence: None,
with_start_action: with_start_action,
};

// Finally, return at the end
Expand Down Expand Up @@ -399,6 +408,16 @@ impl<'a> Arbitrary<'a> for ClientAction {
}
}

impl<'a> Arbitrary<'a> for WithStartClientAction {
fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
let mut signal_action: DoSignal = u.arbitrary()?;
signal_action.with_start = true;
Ok(Self {
variant: Some(with_start_client_action::Variant::DoSignal(signal_action)),
})
}
}

impl<'a> Arbitrary<'a> for DoSignal {
fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
let variant = if u.ratio(95, 100)? {
Expand All @@ -419,6 +438,7 @@ impl<'a> Arbitrary<'a> for DoSignal {
};
Ok(Self {
variant: Some(variant),
with_start: u.arbitrary()?,
})
}
}
Expand Down Expand Up @@ -778,6 +798,7 @@ fn mk_client_signal_action(actions: impl IntoIterator<Item = action::Variant>) -
)))
.into(),
)),
with_start: false,
})),
}
}
Expand Down
72 changes: 54 additions & 18 deletions loadgen/kitchensink/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"context"
"errors"
"fmt"
"time"

"go.temporal.io/api/common/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/workflow"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/durationpb"
"time"
)

func NoOpSingleActivityActionSet() *ActionSet {
Expand Down Expand Up @@ -57,9 +58,31 @@ func ResourceConsumingActivity(bytesToAllocate uint64, cpuYieldEveryNIters uint3
}

type ClientActionsExecutor struct {
Client client.Client
WorkflowID string
RunID string
Client client.Client
StartOptions client.StartWorkflowOptions
WorkflowType string
WorkflowInput *WorkflowInput
Handle client.WorkflowRun
runID string
}

func (e *ClientActionsExecutor) Start(
ctx context.Context,
withStartAction *WithStartClientAction,
) error {
var err error
if withStartAction == nil {
e.Handle, err = e.Client.ExecuteWorkflow(ctx, e.StartOptions, e.WorkflowType, e.WorkflowInput)
} else if sig := withStartAction.GetDoSignal(); sig != nil {
e.Handle, err = e.executeSignalAction(ctx, sig)
} else {
return fmt.Errorf("unsupported with_start_action: %v", withStartAction.String())
}
if err != nil {
return fmt.Errorf("failed to start kitchen sink workflow: %w", err)
}
e.runID = e.Handle.GetRunID()
return nil
}

func (e *ClientActionsExecutor) ExecuteClientSequence(ctx context.Context, clientSeq *ClientSequence) error {
Expand All @@ -68,7 +91,6 @@ func (e *ClientActionsExecutor) ExecuteClientSequence(ctx context.Context, clien
return err
}
}

return nil
}

Expand Down Expand Up @@ -103,13 +125,13 @@ func (e *ClientActionsExecutor) executeClientActionSet(ctx context.Context, acti
}
}
if actionSet.GetWaitForCurrentRunToFinishAtEnd() {
err := e.Client.GetWorkflow(ctx, e.WorkflowID, e.RunID).
err := e.Client.GetWorkflow(ctx, e.StartOptions.ID, e.runID).
GetWithOptions(ctx, nil, client.WorkflowRunGetOptions{DisableFollowingRuns: true})
var canErr *workflow.ContinueAsNewError
if err != nil && !errors.As(err, &canErr) {
return err
}
e.RunID = e.Client.GetWorkflow(ctx, e.WorkflowID, "").GetRunID()
e.runID = e.Client.GetWorkflow(ctx, e.StartOptions.ID, "").GetRunID()
}
return nil
}
Expand All @@ -122,26 +144,20 @@ func (e *ClientActionsExecutor) executeClientAction(ctx context.Context, action

var err error
if sig := action.GetDoSignal(); sig != nil {
if sigActions := sig.GetDoSignalActions(); sigActions != nil {
err = e.Client.SignalWorkflow(ctx, e.WorkflowID, "", "do_actions_signal", sigActions)
} else if handler := sig.GetCustom(); handler != nil {
err = e.Client.SignalWorkflow(ctx, e.WorkflowID, "", handler.Name, handler.Args)
} else {
return fmt.Errorf("do_signal must recognizable variant")
}
_, err = e.executeSignalAction(ctx, sig)
return err
} else if update := action.GetDoUpdate(); update != nil {
var handle client.WorkflowUpdateHandle
if actionsUpdate := update.GetDoActions(); actionsUpdate != nil {
handle, err = e.Client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{
WorkflowID: e.WorkflowID,
WorkflowID: e.StartOptions.ID,
UpdateName: "do_actions_update",
WaitForStage: client.WorkflowUpdateStageCompleted,
Args: []any{actionsUpdate},
})
} else if handler := update.GetCustom(); handler != nil {
handle, err = e.Client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{
WorkflowID: e.WorkflowID,
WorkflowID: e.StartOptions.ID,
UpdateName: handler.Name,
WaitForStage: client.WorkflowUpdateStageCompleted,
Args: []any{handler.Args},
Expand All @@ -159,9 +175,9 @@ func (e *ClientActionsExecutor) executeClientAction(ctx context.Context, action
} else if query := action.GetDoQuery(); query != nil {
if query.GetReportState() != nil {
// TODO: Use args
_, err = e.Client.QueryWorkflow(ctx, e.WorkflowID, "", "report_state", nil)
_, err = e.Client.QueryWorkflow(ctx, e.StartOptions.ID, "", "report_state", nil)
} else if handler := query.GetCustom(); handler != nil {
_, err = e.Client.QueryWorkflow(ctx, e.WorkflowID, "", handler.Name, handler.Args)
_, err = e.Client.QueryWorkflow(ctx, e.StartOptions.ID, "", handler.Name, handler.Args)
} else {
return fmt.Errorf("do_query must recognizable variant")
}
Expand All @@ -176,3 +192,23 @@ func (e *ClientActionsExecutor) executeClientAction(ctx context.Context, action
return fmt.Errorf("client action must be set")
}
}

func (e *ClientActionsExecutor) executeSignalAction(ctx context.Context, sig *DoSignal) (client.WorkflowRun, error) {
var signalName string
var signalArgs any
if sigActions := sig.GetDoSignalActions(); sigActions != nil {
signalName = "do_actions_signal"
signalArgs = sigActions
} else if handler := sig.GetCustom(); handler != nil {
signalName = handler.Name
signalArgs = handler.Args
} else {
return nil, fmt.Errorf("do_signal must recognizable variant")
}

if sig.WithStart {
return e.Client.SignalWithStartWorkflow(
ctx, e.StartOptions.ID, signalName, signalArgs, e.StartOptions, e.WorkflowType, e.WorkflowInput)
}
return nil, e.Client.SignalWorkflow(ctx, e.StartOptions.ID, "", signalName, signalArgs)
}
Loading

0 comments on commit 4b12728

Please sign in to comment.