From 2677a5cc63540d88f5f77848d42ea969556e6dff Mon Sep 17 00:00:00 2001 From: Surinder Singh Date: Tue, 28 Apr 2020 10:29:02 -0700 Subject: [PATCH 1/8] [WIP] Queueing Budget --- go.mod | 2 +- go.sum | 2 ++ pkg/apis/flyteworkflow/v1alpha1/iface.go | 2 ++ .../flyteworkflow/v1alpha1/node_status.go | 11 ++++++++++ pkg/apis/flyteworkflow/v1alpha1/workflow.go | 4 ++++ pkg/compiler/transformers/k8s/workflow.go | 17 ++++++++++----- pkg/controller/nodes/executor.go | 2 ++ .../nodes/handler/node_exec_context.go | 2 ++ pkg/controller/nodes/node_exec_context.go | 21 +++++++++++++------ pkg/controller/nodes/task/handler.go | 9 ++++++++ .../nodes/task/k8s/plugin_manager.go | 2 ++ pkg/controller/workflow/executor.go | 5 +++++ 12 files changed, 67 insertions(+), 12 deletions(-) diff --git a/go.mod b/go.mod index 91d90e327..0d1b11ecf 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/imdario/mergo v0.3.8 // indirect github.com/jmespath/go-jmespath v0.3.0 // indirect github.com/lyft/datacatalog v0.2.1 - github.com/lyft/flyteidl v0.17.24 + github.com/lyft/flyteidl v0.17.27 github.com/lyft/flyteplugins v0.3.21 github.com/lyft/flytestdlib v0.3.3 github.com/magiconair/properties v1.8.1 diff --git a/go.sum b/go.sum index da2ab9b78..99b7876b7 100644 --- a/go.sum +++ b/go.sum @@ -391,6 +391,8 @@ github.com/lyft/flyteidl v0.17.9 h1:JXT9PovHqS9V3YN74x9zWT0kvIEL48c2uNoujF1KMes= github.com/lyft/flyteidl v0.17.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteidl v0.17.24 h1:N5mmk2/0062VjbIeUXLHWVZwkxGW20RdZtshaea2nL0= github.com/lyft/flyteidl v0.17.24/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= +github.com/lyft/flyteidl v0.17.27 h1:0EdSHauzdPEYmubYib/XC6fLb+srzP4yDRN1P9o4W/I= +github.com/lyft/flyteidl v0.17.27/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteplugins v0.3.21 h1:0PaQ5CZkUY07cNiBPcxdL1Pm26A0QRwoFw1VT6ly8tU= github.com/lyft/flyteplugins v0.3.21/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A= github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= diff --git a/pkg/apis/flyteworkflow/v1alpha1/iface.go b/pkg/apis/flyteworkflow/v1alpha1/iface.go index 9a11dfe95..1d3c02397 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -261,6 +261,8 @@ type ExecutableNodeStatus interface { GetSystemFailures() uint32 GetWorkflowNodeStatus() ExecutableWorkflowNodeStatus GetTaskNodeStatus() ExecutableTaskNodeStatus + GetQueuingBudgetSeconds() *int64 + GetQueuingDelaySeconds() *int64 IsCached() bool } diff --git a/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/pkg/apis/flyteworkflow/v1alpha1/node_status.go index 9a9b7f6c0..a5aad451d 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -183,6 +183,9 @@ type NodeStatus struct { // Not Persisted DataReferenceConstructor storage.ReferenceConstructor `json:"-"` + + QueuingBudgetSeconds *int64 `json:"queuingBudgetSeconds,omitempty"` + QueuingDelaySeconds *int64 `json:"queuingDelaySeconds,omitempty"` } func (in *NodeStatus) IsDirty() bool { @@ -420,6 +423,14 @@ func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason st in.SetDirty() } +func (in *NodeStatus) GetQueuingBudgetSeconds() *int64 { + return in.QueuingBudgetSeconds +} + +func (in *NodeStatus) GetQueuingDelaySeconds() *int64 { + return in.QueuingDelaySeconds +} + func (in *NodeStatus) GetStartedAt() *metav1.Time { return in.StartedAt } diff --git a/pkg/apis/flyteworkflow/v1alpha1/workflow.go b/pkg/apis/flyteworkflow/v1alpha1/workflow.go index 1176b6469..4782f4b0e 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/workflow.go +++ b/pkg/apis/flyteworkflow/v1alpha1/workflow.go @@ -47,6 +47,10 @@ type FlyteWorkflow struct { // non-Serialized fields DataReferenceConstructor storage.ReferenceConstructor `json:"-"` + + // Description + // +optional + QueuingBudgetSeconds *int64 } type NodeDefaults struct { diff --git a/pkg/compiler/transformers/k8s/workflow.go b/pkg/compiler/transformers/k8s/workflow.go index ff458ce86..892d6c446 100644 --- a/pkg/compiler/transformers/k8s/workflow.go +++ b/pkg/compiler/transformers/k8s/workflow.go @@ -164,6 +164,12 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li interruptible = wf.GetMetadataDefaults().GetInterruptible() } + var queuingBudgetSeconds *int64 + if wf.GetMetadata() != nil && wf.GetMetadata().GetQueuingBudget() != nil { + budgetSeconds := wf.GetMetadata().GetQueuingBudget().GetSeconds() + queuingBudgetSeconds = &budgetSeconds + } + obj := &v1alpha1.FlyteWorkflow{ TypeMeta: v1.TypeMeta{ Kind: v1alpha1.FlyteWorkflowKind, @@ -173,11 +179,12 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li Namespace: namespace, Labels: map[string]string{}, }, - Inputs: &v1alpha1.Inputs{LiteralMap: inputs}, - WorkflowSpec: primarySpec, - SubWorkflows: subwfs, - Tasks: buildTasks(tasks, errs.NewScope()), - NodeDefaults: v1alpha1.NodeDefaults{Interruptible: interruptible}, + Inputs: &v1alpha1.Inputs{LiteralMap: inputs}, + WorkflowSpec: primarySpec, + SubWorkflows: subwfs, + Tasks: buildTasks(tasks, errs.NewScope()), + NodeDefaults: v1alpha1.NodeDefaults{Interruptible: interruptible}, + QueuingBudgetSeconds: queuingBudgetSeconds, } var err error diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index 47e5e3b99..cbc142dc5 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -328,6 +328,8 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx *node // across execute which is used to emit metrics lastAttemptStartTime := nodeStatus.GetLastAttemptStartedAt() + // pass the max-wait-time from here + // plugin that new interface: calculate remaining budget and invoke that budget-allocator p, err := c.execute(ctx, h, nCtx, nodeStatus) if err != nil { logger.Errorf(ctx, "failed Execute for node. Error: %s", err.Error()) diff --git a/pkg/controller/nodes/handler/node_exec_context.go b/pkg/controller/nodes/handler/node_exec_context.go index 187d8bb85..8e33a6903 100644 --- a/pkg/controller/nodes/handler/node_exec_context.go +++ b/pkg/controller/nodes/handler/node_exec_context.go @@ -2,6 +2,7 @@ package handler import ( "context" + "time" "github.com/lyft/flyteidl/clients/go/events" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" @@ -38,6 +39,7 @@ type NodeExecutionMetadata interface { GetAnnotations() map[string]string GetK8sServiceAccount() string IsInterruptible() bool + GetQueuingBudget() time.Duration } type NodeExecutionContext interface { diff --git a/pkg/controller/nodes/node_exec_context.go b/pkg/controller/nodes/node_exec_context.go index 225d712fb..0c0df1745 100644 --- a/pkg/controller/nodes/node_exec_context.go +++ b/pkg/controller/nodes/node_exec_context.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strconv" + "time" "github.com/lyft/flyteidl/clients/go/events" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" @@ -28,6 +29,7 @@ type nodeExecMetadata struct { nodeExecID *core.NodeExecutionIdentifier interrutptible bool nodeLabels map[string]string + queuingBudget time.Duration } func (e nodeExecMetadata) GetNodeExecutionID() *core.NodeExecutionIdentifier { @@ -46,6 +48,10 @@ func (e nodeExecMetadata) IsInterruptible() bool { return e.interrutptible } +func (e nodeExecMetadata) GetQueuingBudget() time.Duration { + return e.queuingBudget +} + func (e nodeExecMetadata) GetLabels() map[string]string { return e.nodeLabels } @@ -135,7 +141,7 @@ func (e nodeExecContext) MaxDatasetSizeBytes() int64 { return e.maxDatasetSizeBytes } -func newNodeExecContext(_ context.Context, store *storage.DataStore, execContext executors.ExecutionContext, nl executors.NodeLookup, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, interruptible bool, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error, rawOutputPrefix storage.DataReference, outputShardSelector ioutils.ShardSelector) *nodeExecContext { +func newNodeExecContext(_ context.Context, store *storage.DataStore, execContext executors.ExecutionContext, nl executors.NodeLookup, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, interruptible bool, queueingBudget time.Duration, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error, rawOutputPrefix storage.DataReference, outputShardSelector ioutils.ShardSelector) *nodeExecContext { md := nodeExecMetadata{ Meta: execContext, nodeExecID: &core.NodeExecutionIdentifier{ @@ -143,6 +149,7 @@ func newNodeExecContext(_ context.Context, store *storage.DataStore, execContext ExecutionId: execContext.GetExecutionID().WorkflowExecutionIdentifier, }, interrutptible: interruptible, + queuingBudget: queueingBudget, } // Copy the wf labels before adding node specific labels. @@ -198,19 +205,20 @@ func (c *nodeExecutor) newNodeExecContextDefault(ctx context.Context, currentNod return nil } - interrutible := executionContext.IsInterruptible() + interruptible := executionContext.IsInterruptible() if n.IsInterruptible() != nil { - interrutible = *n.IsInterruptible() + interruptible = *n.IsInterruptible() } s := nl.GetNodeExecutionStatus(ctx, currentNodeID) // a node is not considered interruptible if the system failures have exceeded the configured threshold - if interrutible && s.GetSystemFailures() >= c.interruptibleFailureThreshold { - interrutible = false + if interruptible && s.GetSystemFailures() >= c.interruptibleFailureThreshold { + interruptible = false c.metrics.InterruptedThresholdHit.Inc(ctx) } + var queueingBudget time.Duration return newNodeExecContext(ctx, c.store, executionContext, nl, n, s, ioutils.NewCachedInputReader( ctx, @@ -224,7 +232,8 @@ func (c *nodeExecutor) newNodeExecContextDefault(ctx context.Context, currentNod ), ), ), - interrutible, + interruptible, + queueingBudget, c.maxDatasetSizeBytes, &taskEventRecorder{TaskEventRecorder: c.taskRecorder}, tr, diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index c552a379a..28fcf9380 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -243,6 +243,7 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta } }() childCtx := context.WithValue(ctx, pluginContextKey, p.GetID()) + // this is where plugin is called and RQ is handled trns, err = p.Handle(childCtx, tCtx) return }() @@ -365,6 +366,12 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) // So now we will derive this from the plugin phase // TODO @kumare re-evaluate this decision + // STEP 0: bookkeeping step just to accept the request in taskhandler, so any queueing from here onwards is measured in the task + // handler + if ts.PluginPhase == pluginCore.PhaseUndefined { + + } + // STEP 1: Check Cache if ts.PluginPhase == pluginCore.PhaseUndefined && checkCatalog { // This is assumed to be first time. we will check catalog and call handle @@ -405,6 +412,8 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) // Lets check if this value in cache is less than or equal to one in the store if barrierTick <= ts.BarrierClockTick { var err error + + // thie is where the object is created the first time pluginTrns, err = t.invokePlugin(ctx, p, tCtx, ts) if err != nil { return handler.UnknownTransition, errors.Wrapf(errors.RuntimeExecutionError, nCtx.NodeID(), err, "failed during plugin execution") diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index 998bbc42b..f999620e0 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -178,6 +178,7 @@ func (e *PluginManager) LaunchResource(ctx context.Context, tCtx pluginsCore.Tas cfg := nodeTaskConfig.GetConfig() backOffHandler := e.backOffController.GetOrCreateHandler(ctx, key, cfg.BackOffConfig.BaseSecond, cfg.BackOffConfig.MaxDuration.Duration) + // this is returning an error on RQ, which keep node in same queued state. We should instead return PhaseNotReady err = backOffHandler.Handle(ctx, func() error { return e.kubeClient.GetClient().Create(ctx, o) }, podRequestedResources) @@ -282,6 +283,7 @@ func (e PluginManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutio return pluginsCore.UnknownTransition, errors.Wrapf(errors.CorruptedPluginState, err, "Failed to read unmarshal custom state") } if ps.Phase == PluginPhaseNotStarted { + // backoff on RQ t, err := e.LaunchResource(ctx, tCtx) if err == nil && t.Info().Phase() == pluginsCore.PhaseQueued { if err := tCtx.PluginStateWriter().Put(pluginStateVersion, &PluginState{Phase: PluginPhaseStarted}); err != nil { diff --git a/pkg/controller/workflow/executor.go b/pkg/controller/workflow/executor.go index 3c82d6863..7bbd33a17 100644 --- a/pkg/controller/workflow/executor.go +++ b/pkg/controller/workflow/executor.go @@ -434,3 +434,8 @@ func NewExecutor(ctx context.Context, store *storage.DataStore, enQWorkflow v1al }, }, nil } + +// Interface for the Workflow p. This is the mutable portion for a Workflow +type QueueingBudgetAllocationStrategy interface { + GetQueueingBudget(n core.Node) time.Duration +} From 8cda95ca94036cd56717a75eda99808ff9611a46 Mon Sep 17 00:00:00 2001 From: Surinder Singh Date: Tue, 28 Apr 2020 13:29:13 -0700 Subject: [PATCH 2/8] . --- pkg/apis/flyteworkflow/v1alpha1/iface.go | 2 +- pkg/controller/executors/node_lookup.go | 2 + .../nodes/handler/node_exec_context.go | 2 + pkg/controller/nodes/node_exec_context.go | 50 ++++++------- pkg/controller/workflow/executor.go | 72 ++++++++++++++++++- 5 files changed, 97 insertions(+), 31 deletions(-) diff --git a/pkg/apis/flyteworkflow/v1alpha1/iface.go b/pkg/apis/flyteworkflow/v1alpha1/iface.go index 1d3c02397..1d7f33ea7 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -261,7 +261,7 @@ type ExecutableNodeStatus interface { GetSystemFailures() uint32 GetWorkflowNodeStatus() ExecutableWorkflowNodeStatus GetTaskNodeStatus() ExecutableTaskNodeStatus - GetQueuingBudgetSeconds() *int64 + GetMaxQueueTime() *int64 GetQueuingDelaySeconds() *int64 IsCached() bool diff --git a/pkg/controller/executors/node_lookup.go b/pkg/controller/executors/node_lookup.go index e5adee8c7..e216a93bc 100644 --- a/pkg/controller/executors/node_lookup.go +++ b/pkg/controller/executors/node_lookup.go @@ -12,6 +12,8 @@ import ( type NodeLookup interface { GetNode(nodeID v1alpha1.NodeID) (v1alpha1.ExecutableNode, bool) GetNodeExecutionStatus(ctx context.Context, id v1alpha1.NodeID) v1alpha1.ExecutableNodeStatus + //GetNodeSchedulingParameters(ctx context.Context, id v1alpha1.NodeID) v1alpha1.NodeSchedulingParameters + } // Implements a de-generate case of NodeLookup, where only one Node is always looked up diff --git a/pkg/controller/nodes/handler/node_exec_context.go b/pkg/controller/nodes/handler/node_exec_context.go index 8e33a6903..4d84bf990 100644 --- a/pkg/controller/nodes/handler/node_exec_context.go +++ b/pkg/controller/nodes/handler/node_exec_context.go @@ -39,6 +39,8 @@ type NodeExecutionMetadata interface { GetAnnotations() map[string]string GetK8sServiceAccount() string IsInterruptible() bool + //GetQueuingBudgetAllocator() workflow.QueuingBudgetAllocator + GetQueuingBudget() time.Duration } diff --git a/pkg/controller/nodes/node_exec_context.go b/pkg/controller/nodes/node_exec_context.go index 0c0df1745..30b045e45 100644 --- a/pkg/controller/nodes/node_exec_context.go +++ b/pkg/controller/nodes/node_exec_context.go @@ -4,10 +4,11 @@ import ( "context" "fmt" "strconv" - "time" "github.com/lyft/flyteidl/clients/go/events" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/lyft/flytepropeller/pkg/controller/workflow" + "github.com/lyft/flytestdlib/logger" "github.com/lyft/flytestdlib/storage" "k8s.io/apimachinery/pkg/types" @@ -26,10 +27,10 @@ const NodeInterruptibleLabel = "interruptible" type nodeExecMetadata struct { v1alpha1.Meta - nodeExecID *core.NodeExecutionIdentifier - interrutptible bool - nodeLabels map[string]string - queuingBudget time.Duration + nodeExecID *core.NodeExecutionIdentifier + interruptible bool + nodeLabels map[string]string + queuingBudgetAllocator workflow.QueueBudgetHandler } func (e nodeExecMetadata) GetNodeExecutionID() *core.NodeExecutionIdentifier { @@ -45,11 +46,11 @@ func (e nodeExecMetadata) GetOwnerID() types.NamespacedName { } func (e nodeExecMetadata) IsInterruptible() bool { - return e.interrutptible + return e.interruptible } -func (e nodeExecMetadata) GetQueuingBudget() time.Duration { - return e.queuingBudget +func (e nodeExecMetadata) GetQueuingBudgetAllocator() workflow.QueueBudgetHandler { + return e.queuingBudgetAllocator } func (e nodeExecMetadata) GetLabels() map[string]string { @@ -141,15 +142,18 @@ func (e nodeExecContext) MaxDatasetSizeBytes() int64 { return e.maxDatasetSizeBytes } -func newNodeExecContext(_ context.Context, store *storage.DataStore, execContext executors.ExecutionContext, nl executors.NodeLookup, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, interruptible bool, queueingBudget time.Duration, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error, rawOutputPrefix storage.DataReference, outputShardSelector ioutils.ShardSelector) *nodeExecContext { +func newNodeExecContext(ctx context.Context, store *storage.DataStore, execContext executors.ExecutionContext, nl executors.NodeLookup, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error, rawOutputPrefix storage.DataReference, outputShardSelector ioutils.ShardSelector) *nodeExecContext { + + // TODO ssingh: dont initiliaze it here, instead pass this down from caller + queuingBudgetAllocator := workflow.NewDefaultQueueBudgetHandler(nil, nl) + md := nodeExecMetadata{ Meta: execContext, nodeExecID: &core.NodeExecutionIdentifier{ NodeId: node.GetID(), ExecutionId: execContext.GetExecutionID().WorkflowExecutionIdentifier, }, - interrutptible: interruptible, - queuingBudget: queueingBudget, + queuingBudgetAllocator: queuingBudgetAllocator, } // Copy the wf labels before adding node specific labels. @@ -161,7 +165,13 @@ func newNodeExecContext(_ context.Context, store *storage.DataStore, execContext if tr != nil && tr.GetTaskID() != nil { nodeLabels[TaskNameLabel] = utils.SanitizeLabelValue(tr.GetTaskID().Name) } - nodeLabels[NodeInterruptibleLabel] = strconv.FormatBool(interruptible) + + schedulingParameters, err := queuingBudgetAllocator.GetNodeSchedulingParameters(ctx, node.GetID()) + if err != nil { + // TODO: return err + logger.Error(ctx, err) + } + nodeLabels[NodeInterruptibleLabel] = strconv.FormatBool(schedulingParameters.IsInterruptible) md.nodeLabels = nodeLabels return &nodeExecContext{ @@ -205,20 +215,6 @@ func (c *nodeExecutor) newNodeExecContextDefault(ctx context.Context, currentNod return nil } - interruptible := executionContext.IsInterruptible() - if n.IsInterruptible() != nil { - interruptible = *n.IsInterruptible() - } - - s := nl.GetNodeExecutionStatus(ctx, currentNodeID) - - // a node is not considered interruptible if the system failures have exceeded the configured threshold - if interruptible && s.GetSystemFailures() >= c.interruptibleFailureThreshold { - interruptible = false - c.metrics.InterruptedThresholdHit.Inc(ctx) - } - - var queueingBudget time.Duration return newNodeExecContext(ctx, c.store, executionContext, nl, n, s, ioutils.NewCachedInputReader( ctx, @@ -232,8 +228,6 @@ func (c *nodeExecutor) newNodeExecContextDefault(ctx context.Context, currentNod ), ), ), - interruptible, - queueingBudget, c.maxDatasetSizeBytes, &taskEventRecorder{TaskEventRecorder: c.taskRecorder}, tr, diff --git a/pkg/controller/workflow/executor.go b/pkg/controller/workflow/executor.go index 7bbd33a17..20f0d2adf 100644 --- a/pkg/controller/workflow/executor.go +++ b/pkg/controller/workflow/executor.go @@ -436,6 +436,74 @@ func NewExecutor(ctx context.Context, store *storage.DataStore, enQWorkflow v1al } // Interface for the Workflow p. This is the mutable portion for a Workflow -type QueueingBudgetAllocationStrategy interface { - GetQueueingBudget(n core.Node) time.Duration +type QueueBudgetHandler interface { + GetNodeSchedulingParameters(ctx context.Context, id v1alpha1.NodeID) (*NodeQueuingParameters, error) +} + +type NodeQueuingParameters struct { + IsInterruptible bool + MaxQueueTime time.Duration +} + +type defaultQueueBudgetHandler struct { + dag executors.DAGStructure + nl executors.NodeLookup +} + +func (in *defaultQueueBudgetHandler) GetNodeSchedulingParameters(ctx context.Context, id v1alpha1.NodeID) (*NodeQueuingParameters, error) { + + if id == v1alpha1.StartNodeID { + return nil, nil + } + + upstreamNodes, err := in.dag.ToNode(id) + if err != nil { + return nil, err + } + + // TODO init with wf budget or default value + remainingWaitTime := int64(0) + for _, upstreamNodeID := range upstreamNodes { + upstreamNodeStatus := in.nl.GetNodeExecutionStatus(ctx, upstreamNodeID) + + if upstreamNodeStatus.GetPhase() == v1alpha1.NodePhaseSkipped { + // TODO handle skipped parent case: if parent doesn't have queue budget info then get it from its parent. + continue + } + + budget := int64(0) // TODO use default instead of 0 + delay := int64(0) + if upstreamNodeStatus.GetMaxQueueTime() != nil && *upstreamNodeStatus.GetMaxQueueTime() > 0 { + budget = *upstreamNodeStatus.GetMaxQueueTime() + } + if upstreamNodeStatus.GetQueuingDelaySeconds() != nil { + delay = *upstreamNodeStatus.GetQueuingDelaySeconds() + } + + if remainingWaitTime > (budget - delay) { + remainingWaitTime = budget - delay + } + } + + // TODO: fix this + //interruptible := executionContext.IsInterruptible() + //if n.IsInterruptible() != nil { + // interruptible = *n.IsInterruptible() + //} + // + //s := nl.GetNodeExecutionStatus(ctx, currentNodeID) + // + //// a node is not considered interruptible if the system failures have exceeded the configured threshold + //if interruptible && s.GetSystemFailures() >= c.interruptibleFailureThreshold { + // interruptible = false + // c.metrics.InterruptedThresholdHit.Inc(ctx) + //} + // + isInterruptible := false + + return &NodeQueuingParameters{IsInterruptible: isInterruptible, MaxQueueTime: time.Second * time.Duration(remainingWaitTime)}, nil +} + +func NewDefaultQueueBudgetHandler(dag executors.DAGStructure, nl executors.NodeLookup) QueueBudgetHandler { + return &defaultQueueBudgetHandler{dag: dag, nl: nl} } From c94a5dcd6668f91b86e9351d4d4563e5cb941c2d Mon Sep 17 00:00:00 2001 From: Surinder Singh Date: Tue, 28 Apr 2020 13:38:29 -0700 Subject: [PATCH 3/8] update --- pkg/controller/nodes/node_exec_context.go | 18 +++++++++--------- pkg/controller/workflow/executor.go | 4 ++-- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/controller/nodes/node_exec_context.go b/pkg/controller/nodes/node_exec_context.go index 30b045e45..3c9edda08 100644 --- a/pkg/controller/nodes/node_exec_context.go +++ b/pkg/controller/nodes/node_exec_context.go @@ -27,10 +27,10 @@ const NodeInterruptibleLabel = "interruptible" type nodeExecMetadata struct { v1alpha1.Meta - nodeExecID *core.NodeExecutionIdentifier - interruptible bool - nodeLabels map[string]string - queuingBudgetAllocator workflow.QueueBudgetHandler + nodeExecID *core.NodeExecutionIdentifier + interruptible bool + nodeLabels map[string]string + queueBudgetHandler workflow.QueueBudgetHandler } func (e nodeExecMetadata) GetNodeExecutionID() *core.NodeExecutionIdentifier { @@ -50,7 +50,7 @@ func (e nodeExecMetadata) IsInterruptible() bool { } func (e nodeExecMetadata) GetQueuingBudgetAllocator() workflow.QueueBudgetHandler { - return e.queuingBudgetAllocator + return e.queueBudgetHandler } func (e nodeExecMetadata) GetLabels() map[string]string { @@ -144,8 +144,8 @@ func (e nodeExecContext) MaxDatasetSizeBytes() int64 { func newNodeExecContext(ctx context.Context, store *storage.DataStore, execContext executors.ExecutionContext, nl executors.NodeLookup, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error, rawOutputPrefix storage.DataReference, outputShardSelector ioutils.ShardSelector) *nodeExecContext { - // TODO ssingh: dont initiliaze it here, instead pass this down from caller - queuingBudgetAllocator := workflow.NewDefaultQueueBudgetHandler(nil, nl) + // TODO ssingh: dont initialize it here, instead pass this down from caller + queueBudgetHandler := workflow.NewDefaultQueueBudgetHandler(nil, nl) md := nodeExecMetadata{ Meta: execContext, @@ -153,7 +153,7 @@ func newNodeExecContext(ctx context.Context, store *storage.DataStore, execConte NodeId: node.GetID(), ExecutionId: execContext.GetExecutionID().WorkflowExecutionIdentifier, }, - queuingBudgetAllocator: queuingBudgetAllocator, + queueBudgetHandler: queueBudgetHandler, } // Copy the wf labels before adding node specific labels. @@ -166,7 +166,7 @@ func newNodeExecContext(ctx context.Context, store *storage.DataStore, execConte nodeLabels[TaskNameLabel] = utils.SanitizeLabelValue(tr.GetTaskID().Name) } - schedulingParameters, err := queuingBudgetAllocator.GetNodeSchedulingParameters(ctx, node.GetID()) + schedulingParameters, err := queueBudgetHandler.GetNodeQueuingParameters(ctx, node.GetID()) if err != nil { // TODO: return err logger.Error(ctx, err) diff --git a/pkg/controller/workflow/executor.go b/pkg/controller/workflow/executor.go index 20f0d2adf..5b830e54c 100644 --- a/pkg/controller/workflow/executor.go +++ b/pkg/controller/workflow/executor.go @@ -437,7 +437,7 @@ func NewExecutor(ctx context.Context, store *storage.DataStore, enQWorkflow v1al // Interface for the Workflow p. This is the mutable portion for a Workflow type QueueBudgetHandler interface { - GetNodeSchedulingParameters(ctx context.Context, id v1alpha1.NodeID) (*NodeQueuingParameters, error) + GetNodeQueuingParameters(ctx context.Context, id v1alpha1.NodeID) (*NodeQueuingParameters, error) } type NodeQueuingParameters struct { @@ -450,7 +450,7 @@ type defaultQueueBudgetHandler struct { nl executors.NodeLookup } -func (in *defaultQueueBudgetHandler) GetNodeSchedulingParameters(ctx context.Context, id v1alpha1.NodeID) (*NodeQueuingParameters, error) { +func (in *defaultQueueBudgetHandler) GetNodeQueuingParameters(ctx context.Context, id v1alpha1.NodeID) (*NodeQueuingParameters, error) { if id == v1alpha1.StartNodeID { return nil, nil From ffe27a3bfeff8fe6e3752dd7933e4f3c146567d2 Mon Sep 17 00:00:00 2001 From: Surinder Singh Date: Tue, 5 May 2020 11:45:22 -0700 Subject: [PATCH 4/8] . --- pkg/apis/flyteworkflow/v1alpha1/iface.go | 4 +-- .../flyteworkflow/v1alpha1/node_status.go | 12 +++---- pkg/controller/nodes/executor.go | 2 -- .../nodes/handler/node_exec_context.go | 4 +-- pkg/controller/nodes/node_exec_context.go | 35 ++++++++++++------- pkg/controller/workflow/executor.go | 32 ++++++++++------- 6 files changed, 49 insertions(+), 40 deletions(-) diff --git a/pkg/apis/flyteworkflow/v1alpha1/iface.go b/pkg/apis/flyteworkflow/v1alpha1/iface.go index 1d7f33ea7..3014ff694 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -261,8 +261,8 @@ type ExecutableNodeStatus interface { GetSystemFailures() uint32 GetWorkflowNodeStatus() ExecutableWorkflowNodeStatus GetTaskNodeStatus() ExecutableTaskNodeStatus - GetMaxQueueTime() *int64 - GetQueuingDelaySeconds() *int64 + + GetQueuingBudget() *metav1.Duration IsCached() bool } diff --git a/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/pkg/apis/flyteworkflow/v1alpha1/node_status.go index a5aad451d..150298b4a 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -184,8 +184,8 @@ type NodeStatus struct { // Not Persisted DataReferenceConstructor storage.ReferenceConstructor `json:"-"` - QueuingBudgetSeconds *int64 `json:"queuingBudgetSeconds,omitempty"` - QueuingDelaySeconds *int64 `json:"queuingDelaySeconds,omitempty"` + // queuing budget a node can consume across at its attempts + QueuingBudget *metav1.Duration `json:"queuingBudgetSeconds,omitempty"` } func (in *NodeStatus) IsDirty() bool { @@ -423,12 +423,8 @@ func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason st in.SetDirty() } -func (in *NodeStatus) GetQueuingBudgetSeconds() *int64 { - return in.QueuingBudgetSeconds -} - -func (in *NodeStatus) GetQueuingDelaySeconds() *int64 { - return in.QueuingDelaySeconds +func (in *NodeStatus) GetQueuingBudget() *metav1.Duration { + return in.QueuingBudget } func (in *NodeStatus) GetStartedAt() *metav1.Time { diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index cbc142dc5..47e5e3b99 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -328,8 +328,6 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx *node // across execute which is used to emit metrics lastAttemptStartTime := nodeStatus.GetLastAttemptStartedAt() - // pass the max-wait-time from here - // plugin that new interface: calculate remaining budget and invoke that budget-allocator p, err := c.execute(ctx, h, nCtx, nodeStatus) if err != nil { logger.Errorf(ctx, "failed Execute for node. Error: %s", err.Error()) diff --git a/pkg/controller/nodes/handler/node_exec_context.go b/pkg/controller/nodes/handler/node_exec_context.go index 4d84bf990..6523f96eb 100644 --- a/pkg/controller/nodes/handler/node_exec_context.go +++ b/pkg/controller/nodes/handler/node_exec_context.go @@ -39,9 +39,7 @@ type NodeExecutionMetadata interface { GetAnnotations() map[string]string GetK8sServiceAccount() string IsInterruptible() bool - //GetQueuingBudgetAllocator() workflow.QueuingBudgetAllocator - - GetQueuingBudget() time.Duration + GetMaxQueueTime() time.Duration } type NodeExecutionContext interface { diff --git a/pkg/controller/nodes/node_exec_context.go b/pkg/controller/nodes/node_exec_context.go index 3c9edda08..2e9e56360 100644 --- a/pkg/controller/nodes/node_exec_context.go +++ b/pkg/controller/nodes/node_exec_context.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strconv" + "time" "github.com/lyft/flyteidl/clients/go/events" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" @@ -27,10 +28,12 @@ const NodeInterruptibleLabel = "interruptible" type nodeExecMetadata struct { v1alpha1.Meta - nodeExecID *core.NodeExecutionIdentifier - interruptible bool - nodeLabels map[string]string - queueBudgetHandler workflow.QueueBudgetHandler + nodeExecID *core.NodeExecutionIdentifier + nodeLabels map[string]string + + // TODO ssingh merge these two parameters into QueuingParameters + interruptible bool + maxQueueTime time.Duration } func (e nodeExecMetadata) GetNodeExecutionID() *core.NodeExecutionIdentifier { @@ -49,8 +52,8 @@ func (e nodeExecMetadata) IsInterruptible() bool { return e.interruptible } -func (e nodeExecMetadata) GetQueuingBudgetAllocator() workflow.QueueBudgetHandler { - return e.queueBudgetHandler +func (e nodeExecMetadata) GetMaxQueueTime() time.Duration { + return e.maxQueueTime } func (e nodeExecMetadata) GetLabels() map[string]string { @@ -147,13 +150,23 @@ func newNodeExecContext(ctx context.Context, store *storage.DataStore, execConte // TODO ssingh: dont initialize it here, instead pass this down from caller queueBudgetHandler := workflow.NewDefaultQueueBudgetHandler(nil, nl) + // queueBudgetHandler doesn't need to worry about current-attempt-# or time spent in queued state in previous attempts, + // it deduces it from node-queued-at and last-attempt-started-at which includes the total time(execution + waittime) + // spent before this attempt. + param, err := queueBudgetHandler.GetNodeQueuingParameters(ctx, node.GetID()) + if err != nil { + // TODO: return err + logger.Error(ctx, err) + } + md := nodeExecMetadata{ Meta: execContext, nodeExecID: &core.NodeExecutionIdentifier{ NodeId: node.GetID(), ExecutionId: execContext.GetExecutionID().WorkflowExecutionIdentifier, }, - queueBudgetHandler: queueBudgetHandler, + interruptible: param.IsInterruptible, + maxQueueTime: param.MaxQueueTime, } // Copy the wf labels before adding node specific labels. @@ -166,12 +179,7 @@ func newNodeExecContext(ctx context.Context, store *storage.DataStore, execConte nodeLabels[TaskNameLabel] = utils.SanitizeLabelValue(tr.GetTaskID().Name) } - schedulingParameters, err := queueBudgetHandler.GetNodeQueuingParameters(ctx, node.GetID()) - if err != nil { - // TODO: return err - logger.Error(ctx, err) - } - nodeLabels[NodeInterruptibleLabel] = strconv.FormatBool(schedulingParameters.IsInterruptible) + nodeLabels[NodeInterruptibleLabel] = strconv.FormatBool(param.IsInterruptible) md.nodeLabels = nodeLabels return &nodeExecContext{ @@ -215,6 +223,7 @@ func (c *nodeExecutor) newNodeExecContextDefault(ctx context.Context, currentNod return nil } + s := nl.GetNodeExecutionStatus(ctx, currentNodeID) return newNodeExecContext(ctx, c.store, executionContext, nl, n, s, ioutils.NewCachedInputReader( ctx, diff --git a/pkg/controller/workflow/executor.go b/pkg/controller/workflow/executor.go index 5b830e54c..d6814b353 100644 --- a/pkg/controller/workflow/executor.go +++ b/pkg/controller/workflow/executor.go @@ -462,7 +462,7 @@ func (in *defaultQueueBudgetHandler) GetNodeQueuingParameters(ctx context.Contex } // TODO init with wf budget or default value - remainingWaitTime := int64(0) + nodeBudget := time.Second for _, upstreamNodeID := range upstreamNodes { upstreamNodeStatus := in.nl.GetNodeExecutionStatus(ctx, upstreamNodeID) @@ -471,17 +471,25 @@ func (in *defaultQueueBudgetHandler) GetNodeQueuingParameters(ctx context.Contex continue } - budget := int64(0) // TODO use default instead of 0 - delay := int64(0) - if upstreamNodeStatus.GetMaxQueueTime() != nil && *upstreamNodeStatus.GetMaxQueueTime() > 0 { - budget = *upstreamNodeStatus.GetMaxQueueTime() - } - if upstreamNodeStatus.GetQueuingDelaySeconds() != nil { - delay = *upstreamNodeStatus.GetQueuingDelaySeconds() - } + budget := time.Second // TODO assign + + // fix this + //if upstreamNodeStatus.GetMaxQueueTimeSeconds() != nil && *upstreamNodeStatus.GetMaxQueueTimeSeconds() > 0 { + // budget = *upstreamNodeStatus.GetMaxQueueTimeSeconds() + //} - if remainingWaitTime > (budget - delay) { - remainingWaitTime = budget - delay + if upstreamNodeStatus.GetQueuedAt() != nil { + queuedAt := upstreamNodeStatus.GetQueuedAt().Time + if upstreamNodeStatus.GetLastAttemptStartedAt() == nil { + // nothing used + } + lastAttemptStartedAt := upstreamNodeStatus.GetLastAttemptStartedAt().Time + queuingDelay := lastAttemptStartedAt.Sub(queuedAt) + parentRemainingBudget := budget - queuingDelay + + if nodeBudget > parentRemainingBudget { + nodeBudget = parentRemainingBudget + } } } @@ -501,7 +509,7 @@ func (in *defaultQueueBudgetHandler) GetNodeQueuingParameters(ctx context.Contex // isInterruptible := false - return &NodeQueuingParameters{IsInterruptible: isInterruptible, MaxQueueTime: time.Second * time.Duration(remainingWaitTime)}, nil + return &NodeQueuingParameters{IsInterruptible: isInterruptible, MaxQueueTime: time.Second * time.Duration(nodeBudget)}, nil } func NewDefaultQueueBudgetHandler(dag executors.DAGStructure, nl executors.NodeLookup) QueueBudgetHandler { From 7ec57886fabfcb99868e36c2b872a37f895a451b Mon Sep 17 00:00:00 2001 From: Surinder Singh Date: Tue, 5 May 2020 11:48:55 -0700 Subject: [PATCH 5/8] . --- pkg/apis/flyteworkflow/v1alpha1/node_status.go | 2 +- pkg/controller/executors/node_lookup.go | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/pkg/apis/flyteworkflow/v1alpha1/node_status.go index 150298b4a..634f43327 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -185,7 +185,7 @@ type NodeStatus struct { DataReferenceConstructor storage.ReferenceConstructor `json:"-"` // queuing budget a node can consume across at its attempts - QueuingBudget *metav1.Duration `json:"queuingBudgetSeconds,omitempty"` + QueuingBudget *metav1.Duration `json:"queuingBudget,omitempty"` } func (in *NodeStatus) IsDirty() bool { diff --git a/pkg/controller/executors/node_lookup.go b/pkg/controller/executors/node_lookup.go index e216a93bc..e5adee8c7 100644 --- a/pkg/controller/executors/node_lookup.go +++ b/pkg/controller/executors/node_lookup.go @@ -12,8 +12,6 @@ import ( type NodeLookup interface { GetNode(nodeID v1alpha1.NodeID) (v1alpha1.ExecutableNode, bool) GetNodeExecutionStatus(ctx context.Context, id v1alpha1.NodeID) v1alpha1.ExecutableNodeStatus - //GetNodeSchedulingParameters(ctx context.Context, id v1alpha1.NodeID) v1alpha1.NodeSchedulingParameters - } // Implements a de-generate case of NodeLookup, where only one Node is always looked up From c7b7fdf2fa1b871c72e447900067a3efbd8e4c10 Mon Sep 17 00:00:00 2001 From: Miguel Toledo Date: Wed, 6 May 2020 13:05:41 -0700 Subject: [PATCH 6/8] queueBudgetHandler plumbing --- pkg/controller/executors/node.go | 3 ++- pkg/controller/nodes/executor.go | 9 +++++---- pkg/controller/nodes/node_exec_context.go | 9 +++------ pkg/controller/workflow/executor.go | 18 +++++++++++------- 4 files changed, 21 insertions(+), 18 deletions(-) diff --git a/pkg/controller/executors/node.go b/pkg/controller/executors/node.go index f8177ae19..9a4c946bf 100644 --- a/pkg/controller/executors/node.go +++ b/pkg/controller/executors/node.go @@ -3,6 +3,7 @@ package executors import ( "context" "fmt" + "github.com/lyft/flytepropeller/pkg/controller/workflow" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" @@ -70,7 +71,7 @@ type Node interface { // - 1. It finds a blocking node (not ready, or running) // - 2. A node fails and hence the workflow will fail // - 3. The final/end node has completed and the workflow should be stopped - RecursiveNodeHandler(ctx context.Context, execContext ExecutionContext, dag DAGStructure, nl NodeLookup, currentNode v1alpha1.ExecutableNode) (NodeStatus, error) + RecursiveNodeHandler(ctx context.Context, execContext ExecutionContext, dag DAGStructure, nl NodeLookup, queueBudgetHandler workflow.QueueBudgetHandler, currentNode v1alpha1.ExecutableNode) (NodeStatus, error) // This aborts the given node. If the given node is complete then it recursively finds the running nodes and aborts them AbortHandler(ctx context.Context, execContext ExecutionContext, dag DAGStructure, nl NodeLookup, currentNode v1alpha1.ExecutableNode, reason string) error diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index 47e5e3b99..c5b57086a 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -23,6 +23,7 @@ import ( "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" "github.com/lyft/flytepropeller/pkg/controller/config" + "github.com/lyft/flytepropeller/pkg/controller/workflow" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -490,7 +491,7 @@ func (c *nodeExecutor) handleNode(ctx context.Context, dag executors.DAGStructur // The space search for the next node to execute is implemented like a DFS algorithm. handleDownstream visits all the nodes downstream from // the currentNode. Visit a node is the RecursiveNodeHandler. A visit may be partial, complete or may result in a failure. -func (c *nodeExecutor) handleDownstream(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure, nl executors.NodeLookup, currentNode v1alpha1.ExecutableNode) (executors.NodeStatus, error) { +func (c *nodeExecutor) handleDownstream(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure, nl executors.NodeLookup, queueingBudgetHandler workflow.QueueBudgetHandler, currentNode v1alpha1.ExecutableNode) (executors.NodeStatus, error) { logger.Debugf(ctx, "Handling downstream Nodes") // This node is success. Handle all downstream nodes downstreamNodes, err := dag.FromNode(currentNode.GetID()) @@ -512,7 +513,7 @@ func (c *nodeExecutor) handleDownstream(ctx context.Context, execContext executo if !ok { return executors.NodeStatusFailed(errors.Errorf(errors.BadSpecificationError, currentNode.GetID(), "Unable to find Downstream Node [%v]", downstreamNodeName)), nil } - state, err := c.RecursiveNodeHandler(ctx, execContext, dag, nl, downstreamNode) + state, err := c.RecursiveNodeHandler(ctx, execContext, dag, nl, queueingBudgetHandler, downstreamNode) if err != nil { return executors.NodeStatusUndefined, err } @@ -569,7 +570,7 @@ func (c *nodeExecutor) SetInputsForStartNode(ctx context.Context, execContext ex return executors.NodeStatusComplete, nil } -func (c *nodeExecutor) RecursiveNodeHandler(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure, nl executors.NodeLookup, currentNode v1alpha1.ExecutableNode) (executors.NodeStatus, error) { +func (c *nodeExecutor) RecursiveNodeHandler(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure, nl executors.NodeLookup, queuingBudgetHandler workflow.QueueBudgetHandler, currentNode v1alpha1.ExecutableNode) (executors.NodeStatus, error) { currentNodeCtx := contextutils.WithNodeID(ctx, currentNode.GetID()) nodeStatus := nl.GetNodeExecutionStatus(ctx, currentNode.GetID()) @@ -612,7 +613,7 @@ func (c *nodeExecutor) RecursiveNodeHandler(ctx context.Context, execContext exe // Currently we treat either Skip or Success the same way. In this approach only one node will be skipped // at a time. As we iterate down, further nodes will be skipped case v1alpha1.NodePhaseSucceeded, v1alpha1.NodePhaseSkipped: - return c.handleDownstream(ctx, execContext, dag, nl, currentNode) + return c.handleDownstream(ctx, execContext, dag, nl, queueingBudgetHandler, currentNode) case v1alpha1.NodePhaseFailed: logger.Debugf(currentNodeCtx, "Node Failed") return executors.NodeStatusFailed(errors.Errorf(errors.RuntimeExecutionError, currentNode.GetID(), "Node Failed.")), nil diff --git a/pkg/controller/nodes/node_exec_context.go b/pkg/controller/nodes/node_exec_context.go index 2e9e56360..b2c3732d5 100644 --- a/pkg/controller/nodes/node_exec_context.go +++ b/pkg/controller/nodes/node_exec_context.go @@ -15,7 +15,6 @@ import ( "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" - "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/lyft/flytepropeller/pkg/controller/executors" "github.com/lyft/flytepropeller/pkg/controller/nodes/handler" @@ -145,10 +144,7 @@ func (e nodeExecContext) MaxDatasetSizeBytes() int64 { return e.maxDatasetSizeBytes } -func newNodeExecContext(ctx context.Context, store *storage.DataStore, execContext executors.ExecutionContext, nl executors.NodeLookup, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error, rawOutputPrefix storage.DataReference, outputShardSelector ioutils.ShardSelector) *nodeExecContext { - - // TODO ssingh: dont initialize it here, instead pass this down from caller - queueBudgetHandler := workflow.NewDefaultQueueBudgetHandler(nil, nl) +func newNodeExecContext(ctx context.Context, store *storage.DataStore, execContext executors.ExecutionContext, nl executors.NodeLookup, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error, rawOutputPrefix storage.DataReference, outputShardSelector ioutils.ShardSelector, queueBudgetHandler workflow.QueueBudgetHandler) *nodeExecContext { // queueBudgetHandler doesn't need to worry about current-attempt-# or time spent in queued state in previous attempts, // it deduces it from node-queued-at and last-attempt-started-at which includes the total time(execution + waittime) @@ -200,7 +196,7 @@ func newNodeExecContext(ctx context.Context, store *storage.DataStore, execConte } } -func (c *nodeExecutor) newNodeExecContextDefault(ctx context.Context, currentNodeID v1alpha1.NodeID, executionContext executors.ExecutionContext, nl executors.NodeLookup) (*nodeExecContext, error) { +func (c *nodeExecutor) newNodeExecContextDefault(ctx context.Context, currentNodeID v1alpha1.NodeID, executionContext executors.ExecutionContext, nl executors.NodeLookup, queueBudgetHandler workflow.QueueBudgetHandler) (*nodeExecContext, error) { n, ok := nl.GetNode(currentNodeID) if !ok { return nil, fmt.Errorf("failed to find node with ID [%s] in execution [%s]", currentNodeID, executionContext.GetID()) @@ -246,5 +242,6 @@ func (c *nodeExecutor) newNodeExecContextDefault(ctx context.Context, currentNod // https://github.com/lyft/flyte/issues/211 c.defaultDataSandbox, c.shardSelector, + queueBudgetHandler, ), nil } diff --git a/pkg/controller/workflow/executor.go b/pkg/controller/workflow/executor.go index d6814b353..7045b9cb1 100644 --- a/pkg/controller/workflow/executor.go +++ b/pkg/controller/workflow/executor.go @@ -119,7 +119,9 @@ func (c *workflowExecutor) handleRunningWorkflow(ctx context.Context, w *v1alpha if startNode == nil { return StatusFailed(errors.Errorf(errors.IllegalStateError, w.GetID(), "StartNode not found in running workflow?")), nil } - state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, w, w, w, startNode) + + queueingBudgetHandler := NewDefaultQueueBudgetHandler(w, w, w.QueuingBudgetSeconds) + state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, w, w, w, queueingBudgetHandler, startNode) if err != nil { return StatusRunning, err } @@ -146,9 +148,10 @@ func (c *workflowExecutor) handleFailingWorkflow(ctx context.Context, w *v1alpha logger.Errorf(ctx, "Failed to propagate Abort for workflow:%v. Error: %v", w.ExecutionID.WorkflowExecutionIdentifier, err) } + queueingBudgetHandler := NewDefaultQueueBudgetHandler(w, w, w.QueuingBudgetSeconds) errorNode := w.GetOnFailureNode() if errorNode != nil { - state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, w, w, w, errorNode) + state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, w, w, w, queueingBudgetHandler, errorNode) if err != nil { return StatusFailing(nil), err } @@ -446,8 +449,9 @@ type NodeQueuingParameters struct { } type defaultQueueBudgetHandler struct { - dag executors.DAGStructure - nl executors.NodeLookup + dag executors.DAGStructure + nl executors.NodeLookup + wfBudget time.Duration } func (in *defaultQueueBudgetHandler) GetNodeQueuingParameters(ctx context.Context, id v1alpha1.NodeID) (*NodeQueuingParameters, error) { @@ -462,7 +466,7 @@ func (in *defaultQueueBudgetHandler) GetNodeQueuingParameters(ctx context.Contex } // TODO init with wf budget or default value - nodeBudget := time.Second + nodeBudget := in.wfBudget for _, upstreamNodeID := range upstreamNodes { upstreamNodeStatus := in.nl.GetNodeExecutionStatus(ctx, upstreamNodeID) @@ -512,6 +516,6 @@ func (in *defaultQueueBudgetHandler) GetNodeQueuingParameters(ctx context.Contex return &NodeQueuingParameters{IsInterruptible: isInterruptible, MaxQueueTime: time.Second * time.Duration(nodeBudget)}, nil } -func NewDefaultQueueBudgetHandler(dag executors.DAGStructure, nl executors.NodeLookup) QueueBudgetHandler { - return &defaultQueueBudgetHandler{dag: dag, nl: nl} +func NewDefaultQueueBudgetHandler(dag executors.DAGStructure, nl executors.NodeLookup, queueingBudget *int64) QueueBudgetHandler { + return &defaultQueueBudgetHandler{dag: dag, nl: nl, wfBudget: time.Duration(*queueingBudget)} } From 5bb51ad891feab36629edcc6dadedfec6791cfc3 Mon Sep 17 00:00:00 2001 From: Surinder Singh Date: Wed, 6 May 2020 21:59:57 -0700 Subject: [PATCH 7/8] . --- pkg/controller/executors/node.go | 3 +- .../executors/queue_budget_handler.go | 90 +++++++++++++++++++ pkg/controller/nodes/branch/handler.go | 4 +- pkg/controller/nodes/dynamic/handler.go | 3 +- pkg/controller/nodes/executor.go | 13 ++- pkg/controller/nodes/node_exec_context.go | 33 ++++--- .../nodes/subworkflow/subworkflow.go | 7 +- pkg/controller/workflow/executor.go | 86 +----------------- 8 files changed, 128 insertions(+), 111 deletions(-) create mode 100644 pkg/controller/executors/queue_budget_handler.go diff --git a/pkg/controller/executors/node.go b/pkg/controller/executors/node.go index 9a4c946bf..3f8c09e91 100644 --- a/pkg/controller/executors/node.go +++ b/pkg/controller/executors/node.go @@ -3,7 +3,6 @@ package executors import ( "context" "fmt" - "github.com/lyft/flytepropeller/pkg/controller/workflow" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" @@ -71,7 +70,7 @@ type Node interface { // - 1. It finds a blocking node (not ready, or running) // - 2. A node fails and hence the workflow will fail // - 3. The final/end node has completed and the workflow should be stopped - RecursiveNodeHandler(ctx context.Context, execContext ExecutionContext, dag DAGStructure, nl NodeLookup, queueBudgetHandler workflow.QueueBudgetHandler, currentNode v1alpha1.ExecutableNode) (NodeStatus, error) + RecursiveNodeHandler(ctx context.Context, execContext ExecutionContext, dag DAGStructure, nl NodeLookup, queuingBudgetHandler QueuingBudgetHandler, currentNode v1alpha1.ExecutableNode) (NodeStatus, error) // This aborts the given node. If the given node is complete then it recursively finds the running nodes and aborts them AbortHandler(ctx context.Context, execContext ExecutionContext, dag DAGStructure, nl NodeLookup, currentNode v1alpha1.ExecutableNode, reason string) error diff --git a/pkg/controller/executors/queue_budget_handler.go b/pkg/controller/executors/queue_budget_handler.go new file mode 100644 index 000000000..a91e3524f --- /dev/null +++ b/pkg/controller/executors/queue_budget_handler.go @@ -0,0 +1,90 @@ +package executors + +import ( + "context" + "time" + + "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" +) + +// Interface for the Workflow p. This is the mutable portion for a Workflow +type QueuingBudgetHandler interface { + GetNodeQueuingParameters(ctx context.Context, id v1alpha1.NodeID) (*NodeQueuingParameters, error) +} + +type NodeQueuingParameters struct { + IsInterruptible bool + MaxQueueTime time.Duration +} + +type defaultQueuingBudgetHandler struct { + dag DAGStructure + nl NodeLookup + wfBudget time.Duration +} + +func (in *defaultQueuingBudgetHandler) GetNodeQueuingParameters(ctx context.Context, id v1alpha1.NodeID) (*NodeQueuingParameters, error) { + + if id == v1alpha1.StartNodeID { + return nil, nil + } + + upstreamNodes, err := in.dag.ToNode(id) + if err != nil { + return nil, err + } + + // TODO init with wf budget or default value + nodeBudget := in.wfBudget + for _, upstreamNodeID := range upstreamNodes { + upstreamNodeStatus := in.nl.GetNodeExecutionStatus(ctx, upstreamNodeID) + + if upstreamNodeStatus.GetPhase() == v1alpha1.NodePhaseSkipped { + // TODO handle skipped parent case: if parent doesn't have queue budget info then get it from its parent. + continue + } + + budget := time.Second // TODO assign + + // fix this + //if upstreamNodeStatus.GetMaxQueueTimeSeconds() != nil && *upstreamNodeStatus.GetMaxQueueTimeSeconds() > 0 { + // budget = *upstreamNodeStatus.GetMaxQueueTimeSeconds() + //} + + if upstreamNodeStatus.GetQueuedAt() != nil { + queuedAt := upstreamNodeStatus.GetQueuedAt().Time + if upstreamNodeStatus.GetLastAttemptStartedAt() == nil { + // nothing used + } + lastAttemptStartedAt := upstreamNodeStatus.GetLastAttemptStartedAt().Time + queuingDelay := lastAttemptStartedAt.Sub(queuedAt) + parentRemainingBudget := budget - queuingDelay + + if nodeBudget > parentRemainingBudget { + nodeBudget = parentRemainingBudget + } + } + } + + // TODO: fix this + //interruptible := executionContext.IsInterruptible() + //if n.IsInterruptible() != nil { + // interruptible = *n.IsInterruptible() + //} + // + //s := nl.GetNodeExecutionStatus(ctx, currentNodeID) + // + //// a node is not considered interruptible if the system failures have exceeded the configured threshold + //if interruptible && s.GetSystemFailures() >= c.interruptibleFailureThreshold { + // interruptible = false + // c.metrics.InterruptedThresholdHit.Inc(ctx) + //} + // + isInterruptible := false + + return &NodeQueuingParameters{IsInterruptible: isInterruptible, MaxQueueTime: time.Second * time.Duration(nodeBudget)}, nil +} + +func NewDefaultQueuingBudgetHandler(dag DAGStructure, nl NodeLookup, queueingBudget *int64) QueuingBudgetHandler { + return &defaultQueuingBudgetHandler{dag: dag, nl: nl, wfBudget: time.Duration(*queueingBudget)} +} diff --git a/pkg/controller/nodes/branch/handler.go b/pkg/controller/nodes/branch/handler.go index e4b409f52..88301fbf5 100644 --- a/pkg/controller/nodes/branch/handler.go +++ b/pkg/controller/nodes/branch/handler.go @@ -109,7 +109,9 @@ func (b *branchHandler) recurseDownstream(ctx context.Context, nCtx handler.Node // There is no DAGStructure for the branch nodes, the branch taken node is the leaf node. The node itself may be arbitrarily complex, but in that case the node should reference a subworkflow etc // The parent of the BranchTaken Node is the actual Branch Node and all the data is just forwarded from the Branch to the executed node. dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), nCtx.NodeID()) - downstreamStatus, err := b.nodeExecutor.RecursiveNodeHandler(ctx, nCtx.ExecutionContext(), dag, nCtx.ContextualNodeLookup(), branchTakenNode) + + // TODO: pass queuingBudgetHandler + downstreamStatus, err := b.nodeExecutor.RecursiveNodeHandler(ctx, nCtx.ExecutionContext(), dag, nCtx.ContextualNodeLookup(), nil, branchTakenNode) if err != nil { return handler.UnknownTransition, err } diff --git a/pkg/controller/nodes/dynamic/handler.go b/pkg/controller/nodes/dynamic/handler.go index 1c4ceae81..1ae76d8f2 100644 --- a/pkg/controller/nodes/dynamic/handler.go +++ b/pkg/controller/nodes/dynamic/handler.go @@ -436,7 +436,8 @@ func (d dynamicNodeTaskNodeHandler) getLaunchPlanInterfaces(ctx context.Context, func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context, execContext executors.ExecutionContext, dynamicWorkflow v1alpha1.ExecutableWorkflow, nl executors.NodeLookup, nCtx handler.NodeExecutionContext, prevState handler.DynamicNodeState) (handler.Transition, handler.DynamicNodeState, error) { - state, err := d.nodeExecutor.RecursiveNodeHandler(ctx, execContext, dynamicWorkflow, nl, dynamicWorkflow.StartNode()) + // TODO: pass queuingBudgetHandler + state, err := d.nodeExecutor.RecursiveNodeHandler(ctx, execContext, dynamicWorkflow, nl, nil, dynamicWorkflow.StartNode()) if err != nil { return handler.UnknownTransition, prevState, err } diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index c5b57086a..516ac707c 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -23,7 +23,6 @@ import ( "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" "github.com/lyft/flytepropeller/pkg/controller/config" - "github.com/lyft/flytepropeller/pkg/controller/workflow" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -491,7 +490,7 @@ func (c *nodeExecutor) handleNode(ctx context.Context, dag executors.DAGStructur // The space search for the next node to execute is implemented like a DFS algorithm. handleDownstream visits all the nodes downstream from // the currentNode. Visit a node is the RecursiveNodeHandler. A visit may be partial, complete or may result in a failure. -func (c *nodeExecutor) handleDownstream(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure, nl executors.NodeLookup, queueingBudgetHandler workflow.QueueBudgetHandler, currentNode v1alpha1.ExecutableNode) (executors.NodeStatus, error) { +func (c *nodeExecutor) handleDownstream(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure, nl executors.NodeLookup, queueingBudgetHandler executors.QueuingBudgetHandler, currentNode v1alpha1.ExecutableNode) (executors.NodeStatus, error) { logger.Debugf(ctx, "Handling downstream Nodes") // This node is success. Handle all downstream nodes downstreamNodes, err := dag.FromNode(currentNode.GetID()) @@ -570,7 +569,7 @@ func (c *nodeExecutor) SetInputsForStartNode(ctx context.Context, execContext ex return executors.NodeStatusComplete, nil } -func (c *nodeExecutor) RecursiveNodeHandler(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure, nl executors.NodeLookup, queuingBudgetHandler workflow.QueueBudgetHandler, currentNode v1alpha1.ExecutableNode) (executors.NodeStatus, error) { +func (c *nodeExecutor) RecursiveNodeHandler(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure, nl executors.NodeLookup, queuingBudgetHandler executors.QueuingBudgetHandler, currentNode v1alpha1.ExecutableNode) (executors.NodeStatus, error) { currentNodeCtx := contextutils.WithNodeID(ctx, currentNode.GetID()) nodeStatus := nl.GetNodeExecutionStatus(ctx, currentNode.GetID()) @@ -595,7 +594,7 @@ func (c *nodeExecutor) RecursiveNodeHandler(ctx context.Context, execContext exe return executors.NodeStatusRunning, nil } - nCtx, err := c.newNodeExecContextDefault(ctx, currentNode.GetID(), execContext, nl) + nCtx, err := c.newNodeExecContextDefault(ctx, currentNode.GetID(), execContext, nl, queuingBudgetHandler) if err != nil { // NodeExecution creation failure is a permanent fail / system error. // Should a system failure always return an err? @@ -613,7 +612,7 @@ func (c *nodeExecutor) RecursiveNodeHandler(ctx context.Context, execContext exe // Currently we treat either Skip or Success the same way. In this approach only one node will be skipped // at a time. As we iterate down, further nodes will be skipped case v1alpha1.NodePhaseSucceeded, v1alpha1.NodePhaseSkipped: - return c.handleDownstream(ctx, execContext, dag, nl, queueingBudgetHandler, currentNode) + return c.handleDownstream(ctx, execContext, dag, nl, queuingBudgetHandler, currentNode) case v1alpha1.NodePhaseFailed: logger.Debugf(currentNodeCtx, "Node Failed") return executors.NodeStatusFailed(errors.Errorf(errors.RuntimeExecutionError, currentNode.GetID(), "Node Failed.")), nil @@ -637,7 +636,7 @@ func (c *nodeExecutor) FinalizeHandler(ctx context.Context, execContext executor return err } - nCtx, err := c.newNodeExecContextDefault(ctx, currentNode.GetID(), execContext, nl) + nCtx, err := c.newNodeExecContextDefault(ctx, currentNode.GetID(), execContext, nl, nil) if err != nil { return err } @@ -690,7 +689,7 @@ func (c *nodeExecutor) AbortHandler(ctx context.Context, execContext executors.E return err } - nCtx, err := c.newNodeExecContextDefault(ctx, currentNode.GetID(), execContext, nl) + nCtx, err := c.newNodeExecContextDefault(ctx, currentNode.GetID(), execContext, nl, nil) if err != nil { return err } diff --git a/pkg/controller/nodes/node_exec_context.go b/pkg/controller/nodes/node_exec_context.go index b2c3732d5..9a5547ff9 100644 --- a/pkg/controller/nodes/node_exec_context.go +++ b/pkg/controller/nodes/node_exec_context.go @@ -8,7 +8,6 @@ import ( "github.com/lyft/flyteidl/clients/go/events" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" - "github.com/lyft/flytepropeller/pkg/controller/workflow" "github.com/lyft/flytestdlib/logger" "github.com/lyft/flytestdlib/storage" "k8s.io/apimachinery/pkg/types" @@ -144,15 +143,21 @@ func (e nodeExecContext) MaxDatasetSizeBytes() int64 { return e.maxDatasetSizeBytes } -func newNodeExecContext(ctx context.Context, store *storage.DataStore, execContext executors.ExecutionContext, nl executors.NodeLookup, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error, rawOutputPrefix storage.DataReference, outputShardSelector ioutils.ShardSelector, queueBudgetHandler workflow.QueueBudgetHandler) *nodeExecContext { +func newNodeExecContext(ctx context.Context, store *storage.DataStore, execContext executors.ExecutionContext, nl executors.NodeLookup, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error, rawOutputPrefix storage.DataReference, outputShardSelector ioutils.ShardSelector, queuingBudgetHandler executors.QueuingBudgetHandler) *nodeExecContext { - // queueBudgetHandler doesn't need to worry about current-attempt-# or time spent in queued state in previous attempts, - // it deduces it from node-queued-at and last-attempt-started-at which includes the total time(execution + waittime) - // spent before this attempt. - param, err := queueBudgetHandler.GetNodeQueuingParameters(ctx, node.GetID()) - if err != nil { - // TODO: return err - logger.Error(ctx, err) + // queuingBudgetHandler doesn't need to worry about current-attempt-# or time spent in queued state in previous attempts, + // as it only cares about overall time spent between node first qu and start of this attempt. + + isInterruptible := false + maxQueueTime := time.Duration(0) + if queuingBudgetHandler != nil { + param, err := queuingBudgetHandler.GetNodeQueuingParameters(ctx, node.GetID()) + if err != nil { + // TODO: return err + logger.Error(ctx, err) + } + isInterruptible = param.IsInterruptible + maxQueueTime = param.MaxQueueTime } md := nodeExecMetadata{ @@ -161,8 +166,8 @@ func newNodeExecContext(ctx context.Context, store *storage.DataStore, execConte NodeId: node.GetID(), ExecutionId: execContext.GetExecutionID().WorkflowExecutionIdentifier, }, - interruptible: param.IsInterruptible, - maxQueueTime: param.MaxQueueTime, + interruptible: isInterruptible, + maxQueueTime: maxQueueTime, } // Copy the wf labels before adding node specific labels. @@ -175,7 +180,7 @@ func newNodeExecContext(ctx context.Context, store *storage.DataStore, execConte nodeLabels[TaskNameLabel] = utils.SanitizeLabelValue(tr.GetTaskID().Name) } - nodeLabels[NodeInterruptibleLabel] = strconv.FormatBool(param.IsInterruptible) + nodeLabels[NodeInterruptibleLabel] = strconv.FormatBool(isInterruptible) md.nodeLabels = nodeLabels return &nodeExecContext{ @@ -196,7 +201,7 @@ func newNodeExecContext(ctx context.Context, store *storage.DataStore, execConte } } -func (c *nodeExecutor) newNodeExecContextDefault(ctx context.Context, currentNodeID v1alpha1.NodeID, executionContext executors.ExecutionContext, nl executors.NodeLookup, queueBudgetHandler workflow.QueueBudgetHandler) (*nodeExecContext, error) { +func (c *nodeExecutor) newNodeExecContextDefault(ctx context.Context, currentNodeID v1alpha1.NodeID, executionContext executors.ExecutionContext, nl executors.NodeLookup, queuingBudgetHandler executors.QueuingBudgetHandler) (*nodeExecContext, error) { n, ok := nl.GetNode(currentNodeID) if !ok { return nil, fmt.Errorf("failed to find node with ID [%s] in execution [%s]", currentNodeID, executionContext.GetID()) @@ -242,6 +247,6 @@ func (c *nodeExecutor) newNodeExecContextDefault(ctx context.Context, currentNod // https://github.com/lyft/flyte/issues/211 c.defaultDataSandbox, c.shardSelector, - queueBudgetHandler, + queuingBudgetHandler, ), nil } diff --git a/pkg/controller/nodes/subworkflow/subworkflow.go b/pkg/controller/nodes/subworkflow/subworkflow.go index 082425d91..a9bfe8e76 100644 --- a/pkg/controller/nodes/subworkflow/subworkflow.go +++ b/pkg/controller/nodes/subworkflow/subworkflow.go @@ -54,7 +54,8 @@ func (s *subworkflowHandler) startAndHandleSubWorkflow(ctx context.Context, nCtx // Calls the recursive node executor to handle the SubWorkflow and translates the results after the success func (s *subworkflowHandler) handleSubWorkflow(ctx context.Context, nCtx handler.NodeExecutionContext, subworkflow v1alpha1.ExecutableSubWorkflow, nl executors.NodeLookup) (handler.Transition, error) { - state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, nCtx.ExecutionContext(), subworkflow, nl, subworkflow.StartNode()) + // TODO: pass queuingBudgetHandler + state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, nCtx.ExecutionContext(), subworkflow, nl, nil, subworkflow.StartNode()) if err != nil { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoUndefined), err } @@ -116,7 +117,9 @@ func (s *subworkflowHandler) handleSubWorkflow(ctx context.Context, nCtx handler // https://github.com/lyft/flyte/issues/265 func (s *subworkflowHandler) HandleFailureNodeOfSubWorkflow(ctx context.Context, nCtx handler.NodeExecutionContext, subworkflow v1alpha1.ExecutableSubWorkflow, nl executors.NodeLookup) (handler.Transition, error) { if subworkflow.GetOnFailureNode() != nil { - state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, nCtx.ExecutionContext(), subworkflow, nl, subworkflow.GetOnFailureNode()) + + // TODO: pass queuingBudgetHandler + state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, nCtx.ExecutionContext(), subworkflow, nl, nil, subworkflow.GetOnFailureNode()) if err != nil { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoUndefined), err } diff --git a/pkg/controller/workflow/executor.go b/pkg/controller/workflow/executor.go index 7045b9cb1..95a447eba 100644 --- a/pkg/controller/workflow/executor.go +++ b/pkg/controller/workflow/executor.go @@ -120,7 +120,7 @@ func (c *workflowExecutor) handleRunningWorkflow(ctx context.Context, w *v1alpha return StatusFailed(errors.Errorf(errors.IllegalStateError, w.GetID(), "StartNode not found in running workflow?")), nil } - queueingBudgetHandler := NewDefaultQueueBudgetHandler(w, w, w.QueuingBudgetSeconds) + queueingBudgetHandler := executors.NewDefaultQueuingBudgetHandler(w, w, w.QueuingBudgetSeconds) state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, w, w, w, queueingBudgetHandler, startNode) if err != nil { return StatusRunning, err @@ -148,7 +148,7 @@ func (c *workflowExecutor) handleFailingWorkflow(ctx context.Context, w *v1alpha logger.Errorf(ctx, "Failed to propagate Abort for workflow:%v. Error: %v", w.ExecutionID.WorkflowExecutionIdentifier, err) } - queueingBudgetHandler := NewDefaultQueueBudgetHandler(w, w, w.QueuingBudgetSeconds) + queueingBudgetHandler := executors.NewDefaultQueuingBudgetHandler(w, w, w.QueuingBudgetSeconds) errorNode := w.GetOnFailureNode() if errorNode != nil { state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, w, w, w, queueingBudgetHandler, errorNode) @@ -437,85 +437,3 @@ func NewExecutor(ctx context.Context, store *storage.DataStore, enQWorkflow v1al }, }, nil } - -// Interface for the Workflow p. This is the mutable portion for a Workflow -type QueueBudgetHandler interface { - GetNodeQueuingParameters(ctx context.Context, id v1alpha1.NodeID) (*NodeQueuingParameters, error) -} - -type NodeQueuingParameters struct { - IsInterruptible bool - MaxQueueTime time.Duration -} - -type defaultQueueBudgetHandler struct { - dag executors.DAGStructure - nl executors.NodeLookup - wfBudget time.Duration -} - -func (in *defaultQueueBudgetHandler) GetNodeQueuingParameters(ctx context.Context, id v1alpha1.NodeID) (*NodeQueuingParameters, error) { - - if id == v1alpha1.StartNodeID { - return nil, nil - } - - upstreamNodes, err := in.dag.ToNode(id) - if err != nil { - return nil, err - } - - // TODO init with wf budget or default value - nodeBudget := in.wfBudget - for _, upstreamNodeID := range upstreamNodes { - upstreamNodeStatus := in.nl.GetNodeExecutionStatus(ctx, upstreamNodeID) - - if upstreamNodeStatus.GetPhase() == v1alpha1.NodePhaseSkipped { - // TODO handle skipped parent case: if parent doesn't have queue budget info then get it from its parent. - continue - } - - budget := time.Second // TODO assign - - // fix this - //if upstreamNodeStatus.GetMaxQueueTimeSeconds() != nil && *upstreamNodeStatus.GetMaxQueueTimeSeconds() > 0 { - // budget = *upstreamNodeStatus.GetMaxQueueTimeSeconds() - //} - - if upstreamNodeStatus.GetQueuedAt() != nil { - queuedAt := upstreamNodeStatus.GetQueuedAt().Time - if upstreamNodeStatus.GetLastAttemptStartedAt() == nil { - // nothing used - } - lastAttemptStartedAt := upstreamNodeStatus.GetLastAttemptStartedAt().Time - queuingDelay := lastAttemptStartedAt.Sub(queuedAt) - parentRemainingBudget := budget - queuingDelay - - if nodeBudget > parentRemainingBudget { - nodeBudget = parentRemainingBudget - } - } - } - - // TODO: fix this - //interruptible := executionContext.IsInterruptible() - //if n.IsInterruptible() != nil { - // interruptible = *n.IsInterruptible() - //} - // - //s := nl.GetNodeExecutionStatus(ctx, currentNodeID) - // - //// a node is not considered interruptible if the system failures have exceeded the configured threshold - //if interruptible && s.GetSystemFailures() >= c.interruptibleFailureThreshold { - // interruptible = false - // c.metrics.InterruptedThresholdHit.Inc(ctx) - //} - // - isInterruptible := false - - return &NodeQueuingParameters{IsInterruptible: isInterruptible, MaxQueueTime: time.Second * time.Duration(nodeBudget)}, nil -} - -func NewDefaultQueueBudgetHandler(dag executors.DAGStructure, nl executors.NodeLookup, queueingBudget *int64) QueueBudgetHandler { - return &defaultQueueBudgetHandler{dag: dag, nl: nl, wfBudget: time.Duration(*queueingBudget)} -} From ec7a5eee620bfc5910bc4242539f03f7458c88e0 Mon Sep 17 00:00:00 2001 From: Miguel Toledo Date: Thu, 14 May 2020 09:09:43 -0700 Subject: [PATCH 8/8] pass queuebudget handler --- .../executors/queue_budget_handler.go | 36 +++++++++---------- pkg/controller/nodes/branch/handler.go | 13 ++++--- pkg/controller/nodes/dynamic/handler.go | 10 +++--- pkg/controller/nodes/mocks/output_resolver.go | 20 ++++++----- .../nodes/subworkflow/subworkflow.go | 13 ++++--- 5 files changed, 51 insertions(+), 41 deletions(-) diff --git a/pkg/controller/executors/queue_budget_handler.go b/pkg/controller/executors/queue_budget_handler.go index a91e3524f..f3d0f4199 100644 --- a/pkg/controller/executors/queue_budget_handler.go +++ b/pkg/controller/executors/queue_budget_handler.go @@ -34,7 +34,6 @@ func (in *defaultQueuingBudgetHandler) GetNodeQueuingParameters(ctx context.Cont return nil, err } - // TODO init with wf budget or default value nodeBudget := in.wfBudget for _, upstreamNodeID := range upstreamNodes { upstreamNodeStatus := in.nl.GetNodeExecutionStatus(ctx, upstreamNodeID) @@ -44,12 +43,10 @@ func (in *defaultQueuingBudgetHandler) GetNodeQueuingParameters(ctx context.Cont continue } - budget := time.Second // TODO assign - - // fix this - //if upstreamNodeStatus.GetMaxQueueTimeSeconds() != nil && *upstreamNodeStatus.GetMaxQueueTimeSeconds() > 0 { - // budget = *upstreamNodeStatus.GetMaxQueueTimeSeconds() - //} + var budget time.Duration + if upstreamNodeStatus.GetQueuingBudget() != nil && *&upstreamNodeStatus.GetQueuingBudget().Duration > 0 { + budget = *&upstreamNodeStatus.GetQueuingBudget().Duration + } if upstreamNodeStatus.GetQueuedAt() != nil { queuedAt := upstreamNodeStatus.GetQueuedAt().Time @@ -66,25 +63,28 @@ func (in *defaultQueuingBudgetHandler) GetNodeQueuingParameters(ctx context.Cont } } - // TODO: fix this - //interruptible := executionContext.IsInterruptible() - //if n.IsInterruptible() != nil { - // interruptible = *n.IsInterruptible() - //} - // - //s := nl.GetNodeExecutionStatus(ctx, currentNodeID) - // + currNode, exists := in.nl.GetNode(id) + if !exists { + // mtoledo: what should be the error here + return nil, err + } + var interruptible bool + if currNode.IsInterruptible() != nil { + interruptible = *currNode.IsInterruptible() + } + // TODO: Where to get config value from? //// a node is not considered interruptible if the system failures have exceeded the configured threshold - //if interruptible && s.GetSystemFailures() >= c.interruptibleFailureThreshold { + // currNodeStatus := in.nl.GetNodeExecutionStatus(ctx, id) + //if interruptible && currNodeStatus.GetSystemFailures() >= c.interruptibleFailureThreshold { // interruptible = false // c.metrics.InterruptedThresholdHit.Inc(ctx) //} // - isInterruptible := false - return &NodeQueuingParameters{IsInterruptible: isInterruptible, MaxQueueTime: time.Second * time.Duration(nodeBudget)}, nil + return &NodeQueuingParameters{IsInterruptible: interruptible, MaxQueueTime: time.Second * time.Duration(nodeBudget)}, nil } +// instead of *int64 use duration? func NewDefaultQueuingBudgetHandler(dag DAGStructure, nl NodeLookup, queueingBudget *int64) QueuingBudgetHandler { return &defaultQueuingBudgetHandler{dag: dag, nl: nl, wfBudget: time.Duration(*queueingBudget)} } diff --git a/pkg/controller/nodes/branch/handler.go b/pkg/controller/nodes/branch/handler.go index 88301fbf5..da4919635 100644 --- a/pkg/controller/nodes/branch/handler.go +++ b/pkg/controller/nodes/branch/handler.go @@ -32,6 +32,10 @@ func (b *branchHandler) Setup(ctx context.Context, setupContext handler.SetupCon } func (b *branchHandler) HandleBranchNode(ctx context.Context, branchNode v1alpha1.ExecutableBranchNode, nCtx handler.NodeExecutionContext, nl executors.NodeLookup) (handler.Transition, error) { + // Create Queue Budget Handler from nodeContext + // TODO(mtoledo): get dagStructure, ask about duration + queuingBudgetHandler := executors.NewDefaultQueuingBudgetHandler(nil, nl, nil) + if nCtx.NodeStateReader().GetBranchNode().FinalizedNodeID == nil { nodeInputs, err := nCtx.InputReader().Get(ctx) if err != nil { @@ -64,7 +68,7 @@ func (b *branchHandler) HandleBranchNode(ctx context.Context, branchNode v1alpha logger.Debugf(ctx, "Recursively executing branchNode's chosen path") nodeStatus := nl.GetNodeExecutionStatus(ctx, nCtx.NodeID()) - return b.recurseDownstream(ctx, nCtx, nodeStatus, finalNode) + return b.recurseDownstream(ctx, nCtx, nodeStatus, finalNode, queuingBudgetHandler) } // If the branchNodestatus was already evaluated i.e, Node is in Running status @@ -89,7 +93,7 @@ func (b *branchHandler) HandleBranchNode(ctx context.Context, branchNode v1alpha // Recurse downstream nodeStatus := nl.GetNodeExecutionStatus(ctx, nCtx.NodeID()) - return b.recurseDownstream(ctx, nCtx, nodeStatus, branchTakenNode) + return b.recurseDownstream(ctx, nCtx, nodeStatus, branchTakenNode, queuingBudgetHandler) } func (b *branchHandler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) (handler.Transition, error) { @@ -104,14 +108,13 @@ func (b *branchHandler) Handle(ctx context.Context, nCtx handler.NodeExecutionCo return b.HandleBranchNode(ctx, branchNode, nCtx, nl) } -func (b *branchHandler) recurseDownstream(ctx context.Context, nCtx handler.NodeExecutionContext, nodeStatus v1alpha1.ExecutableNodeStatus, branchTakenNode v1alpha1.ExecutableNode) (handler.Transition, error) { +func (b *branchHandler) recurseDownstream(ctx context.Context, nCtx handler.NodeExecutionContext, nodeStatus v1alpha1.ExecutableNodeStatus, branchTakenNode v1alpha1.ExecutableNode, queuingBudgetHandler executors.QueuingBudgetHandler) (handler.Transition, error) { // TODO we should replace the call to RecursiveNodeHandler with a call to SingleNode Handler. The inputs are also already known ahead of time // There is no DAGStructure for the branch nodes, the branch taken node is the leaf node. The node itself may be arbitrarily complex, but in that case the node should reference a subworkflow etc // The parent of the BranchTaken Node is the actual Branch Node and all the data is just forwarded from the Branch to the executed node. dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), nCtx.NodeID()) - // TODO: pass queuingBudgetHandler - downstreamStatus, err := b.nodeExecutor.RecursiveNodeHandler(ctx, nCtx.ExecutionContext(), dag, nCtx.ContextualNodeLookup(), nil, branchTakenNode) + downstreamStatus, err := b.nodeExecutor.RecursiveNodeHandler(ctx, nCtx.ExecutionContext(), dag, nCtx.ContextualNodeLookup(), queuingBudgetHandler, branchTakenNode) if err != nil { return handler.UnknownTransition, err } diff --git a/pkg/controller/nodes/dynamic/handler.go b/pkg/controller/nodes/dynamic/handler.go index 1ae76d8f2..28a6b021b 100644 --- a/pkg/controller/nodes/dynamic/handler.go +++ b/pkg/controller/nodes/dynamic/handler.go @@ -102,7 +102,10 @@ func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, n "DynamicWorkflowBuildFailed", err.Error(), nil)), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: err.Error()}, nil } - trns, newState, err := d.progressDynamicWorkflow(ctx, execContext, dynamicWF, nl, nCtx, prevState) + // get queuing budget + queuingBudgetHandler := executors.NewDefaultQueuingBudgetHandler(dynamicWF, nl, nil) + + trns, newState, err := d.progressDynamicWorkflow(ctx, execContext, dynamicWF, nl, nCtx, prevState, queuingBudgetHandler) if err != nil { return handler.UnknownTransition, prevState, err } @@ -434,10 +437,9 @@ func (d dynamicNodeTaskNodeHandler) getLaunchPlanInterfaces(ctx context.Context, } func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context, execContext executors.ExecutionContext, dynamicWorkflow v1alpha1.ExecutableWorkflow, nl executors.NodeLookup, - nCtx handler.NodeExecutionContext, prevState handler.DynamicNodeState) (handler.Transition, handler.DynamicNodeState, error) { + nCtx handler.NodeExecutionContext, prevState handler.DynamicNodeState, queuingBudgetHandler executors.QueuingBudgetHandler) (handler.Transition, handler.DynamicNodeState, error) { - // TODO: pass queuingBudgetHandler - state, err := d.nodeExecutor.RecursiveNodeHandler(ctx, execContext, dynamicWorkflow, nl, nil, dynamicWorkflow.StartNode()) + state, err := d.nodeExecutor.RecursiveNodeHandler(ctx, execContext, dynamicWorkflow, nl, queuingBudgetHandler, dynamicWorkflow.StartNode()) if err != nil { return handler.UnknownTransition, prevState, err } diff --git a/pkg/controller/nodes/mocks/output_resolver.go b/pkg/controller/nodes/mocks/output_resolver.go index 061e87687..405cb4f74 100644 --- a/pkg/controller/nodes/mocks/output_resolver.go +++ b/pkg/controller/nodes/mocks/output_resolver.go @@ -6,6 +6,8 @@ import ( context "context" core "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + executors "github.com/lyft/flytepropeller/pkg/controller/executors" + mock "github.com/stretchr/testify/mock" v1alpha1 "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" @@ -24,8 +26,8 @@ func (_m OutputResolver_ExtractOutput) Return(values *core.Literal, err error) * return &OutputResolver_ExtractOutput{Call: _m.Call.Return(values, err)} } -func (_m *OutputResolver) OnExtractOutput(ctx context.Context, w v1alpha1.BaseWorkflowWithStatus, n v1alpha1.ExecutableNode, bindToVar string) *OutputResolver_ExtractOutput { - c := _m.On("ExtractOutput", ctx, w, n, bindToVar) +func (_m *OutputResolver) OnExtractOutput(ctx context.Context, nl executors.NodeLookup, n v1alpha1.ExecutableNode, bindToVar string) *OutputResolver_ExtractOutput { + c := _m.On("ExtractOutput", ctx, nl, n, bindToVar) return &OutputResolver_ExtractOutput{Call: c} } @@ -34,13 +36,13 @@ func (_m *OutputResolver) OnExtractOutputMatch(matchers ...interface{}) *OutputR return &OutputResolver_ExtractOutput{Call: c} } -// ExtractOutput provides a mock function with given fields: ctx, w, n, bindToVar -func (_m *OutputResolver) ExtractOutput(ctx context.Context, w v1alpha1.BaseWorkflowWithStatus, n v1alpha1.ExecutableNode, bindToVar string) (*core.Literal, error) { - ret := _m.Called(ctx, w, n, bindToVar) +// ExtractOutput provides a mock function with given fields: ctx, nl, n, bindToVar +func (_m *OutputResolver) ExtractOutput(ctx context.Context, nl executors.NodeLookup, n v1alpha1.ExecutableNode, bindToVar string) (*core.Literal, error) { + ret := _m.Called(ctx, nl, n, bindToVar) var r0 *core.Literal - if rf, ok := ret.Get(0).(func(context.Context, v1alpha1.BaseWorkflowWithStatus, v1alpha1.ExecutableNode, string) *core.Literal); ok { - r0 = rf(ctx, w, n, bindToVar) + if rf, ok := ret.Get(0).(func(context.Context, executors.NodeLookup, v1alpha1.ExecutableNode, string) *core.Literal); ok { + r0 = rf(ctx, nl, n, bindToVar) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*core.Literal) @@ -48,8 +50,8 @@ func (_m *OutputResolver) ExtractOutput(ctx context.Context, w v1alpha1.BaseWork } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, v1alpha1.BaseWorkflowWithStatus, v1alpha1.ExecutableNode, string) error); ok { - r1 = rf(ctx, w, n, bindToVar) + if rf, ok := ret.Get(1).(func(context.Context, executors.NodeLookup, v1alpha1.ExecutableNode, string) error); ok { + r1 = rf(ctx, nl, n, bindToVar) } else { r1 = ret.Error(1) } diff --git a/pkg/controller/nodes/subworkflow/subworkflow.go b/pkg/controller/nodes/subworkflow/subworkflow.go index a9bfe8e76..32c7bff2e 100644 --- a/pkg/controller/nodes/subworkflow/subworkflow.go +++ b/pkg/controller/nodes/subworkflow/subworkflow.go @@ -48,14 +48,17 @@ func (s *subworkflowHandler) startAndHandleSubWorkflow(ctx context.Context, nCtx errorCode, _ := errors.GetErrorCode(startStatus.Err) return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(errorCode, startStatus.Err.Error(), nil)), nil } - return s.handleSubWorkflow(ctx, nCtx, subWorkflow, nl) + + // Get queuing budget + queuingBudgetHandler := executors.NewDefaultQueuingBudgetHandler(subWorkflow, nl, nil) + + return s.handleSubWorkflow(ctx, nCtx, subWorkflow, nl, queuingBudgetHandler) } // Calls the recursive node executor to handle the SubWorkflow and translates the results after the success -func (s *subworkflowHandler) handleSubWorkflow(ctx context.Context, nCtx handler.NodeExecutionContext, subworkflow v1alpha1.ExecutableSubWorkflow, nl executors.NodeLookup) (handler.Transition, error) { +func (s *subworkflowHandler) handleSubWorkflow(ctx context.Context, nCtx handler.NodeExecutionContext, subworkflow v1alpha1.ExecutableSubWorkflow, nl executors.NodeLookup, queuingBudgetHandler executors.QueuingBudgetHandler) (handler.Transition, error) { - // TODO: pass queuingBudgetHandler - state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, nCtx.ExecutionContext(), subworkflow, nl, nil, subworkflow.StartNode()) + state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, nCtx.ExecutionContext(), subworkflow, nl, queuingBudgetHandler, subworkflow.StartNode()) if err != nil { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoUndefined), err } @@ -118,7 +121,7 @@ func (s *subworkflowHandler) handleSubWorkflow(ctx context.Context, nCtx handler func (s *subworkflowHandler) HandleFailureNodeOfSubWorkflow(ctx context.Context, nCtx handler.NodeExecutionContext, subworkflow v1alpha1.ExecutableSubWorkflow, nl executors.NodeLookup) (handler.Transition, error) { if subworkflow.GetOnFailureNode() != nil { - // TODO: pass queuingBudgetHandler + // TODO: pass queuingBudgetHandler when this method is invoked from caller. state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, nCtx.ExecutionContext(), subworkflow, nl, nil, subworkflow.GetOnFailureNode()) if err != nil { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoUndefined), err