Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

[WIP] Queueing Budget #124

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/imdario/mergo v0.3.8 // indirect
github.com/jmespath/go-jmespath v0.3.0 // indirect
github.com/lyft/datacatalog v0.2.1
github.com/lyft/flyteidl v0.17.24
github.com/lyft/flyteidl v0.17.27
github.com/lyft/flyteplugins v0.3.21
github.com/lyft/flytestdlib v0.3.3
github.com/magiconair/properties v1.8.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ github.com/lyft/flyteidl v0.17.9 h1:JXT9PovHqS9V3YN74x9zWT0kvIEL48c2uNoujF1KMes=
github.com/lyft/flyteidl v0.17.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.24 h1:N5mmk2/0062VjbIeUXLHWVZwkxGW20RdZtshaea2nL0=
github.com/lyft/flyteidl v0.17.24/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.27 h1:0EdSHauzdPEYmubYib/XC6fLb+srzP4yDRN1P9o4W/I=
github.com/lyft/flyteidl v0.17.27/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.3.21 h1:0PaQ5CZkUY07cNiBPcxdL1Pm26A0QRwoFw1VT6ly8tU=
github.com/lyft/flyteplugins v0.3.21/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ type ExecutableNodeStatus interface {
GetWorkflowNodeStatus() ExecutableWorkflowNodeStatus
GetTaskNodeStatus() ExecutableTaskNodeStatus

GetQueuingBudget() *metav1.Duration

IsCached() bool
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ type NodeStatus struct {

// Not Persisted
DataReferenceConstructor storage.ReferenceConstructor `json:"-"`

// queuing budget a node can consume across at its attempts
QueuingBudget *metav1.Duration `json:"queuingBudget,omitempty"`
}

func (in *NodeStatus) IsDirty() bool {
Expand Down Expand Up @@ -420,6 +423,10 @@ func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason st
in.SetDirty()
}

func (in *NodeStatus) GetQueuingBudget() *metav1.Duration {
return in.QueuingBudget
}

func (in *NodeStatus) GetStartedAt() *metav1.Time {
return in.StartedAt
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ type FlyteWorkflow struct {

// non-Serialized fields
DataReferenceConstructor storage.ReferenceConstructor `json:"-"`

// Description
// +optional
QueuingBudgetSeconds *int64
}

type NodeDefaults struct {
Expand Down
17 changes: 12 additions & 5 deletions pkg/compiler/transformers/k8s/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li
interruptible = wf.GetMetadataDefaults().GetInterruptible()
}

var queuingBudgetSeconds *int64
if wf.GetMetadata() != nil && wf.GetMetadata().GetQueuingBudget() != nil {
budgetSeconds := wf.GetMetadata().GetQueuingBudget().GetSeconds()
queuingBudgetSeconds = &budgetSeconds
}

obj := &v1alpha1.FlyteWorkflow{
TypeMeta: v1.TypeMeta{
Kind: v1alpha1.FlyteWorkflowKind,
Expand All @@ -173,11 +179,12 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li
Namespace: namespace,
Labels: map[string]string{},
},
Inputs: &v1alpha1.Inputs{LiteralMap: inputs},
WorkflowSpec: primarySpec,
SubWorkflows: subwfs,
Tasks: buildTasks(tasks, errs.NewScope()),
NodeDefaults: v1alpha1.NodeDefaults{Interruptible: interruptible},
Inputs: &v1alpha1.Inputs{LiteralMap: inputs},
WorkflowSpec: primarySpec,
SubWorkflows: subwfs,
Tasks: buildTasks(tasks, errs.NewScope()),
NodeDefaults: v1alpha1.NodeDefaults{Interruptible: interruptible},
QueuingBudgetSeconds: queuingBudgetSeconds,
}

var err error
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/executors/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Node interface {
// - 1. It finds a blocking node (not ready, or running)
// - 2. A node fails and hence the workflow will fail
// - 3. The final/end node has completed and the workflow should be stopped
RecursiveNodeHandler(ctx context.Context, execContext ExecutionContext, dag DAGStructure, nl NodeLookup, currentNode v1alpha1.ExecutableNode) (NodeStatus, error)
RecursiveNodeHandler(ctx context.Context, execContext ExecutionContext, dag DAGStructure, nl NodeLookup, queuingBudgetHandler QueuingBudgetHandler, currentNode v1alpha1.ExecutableNode) (NodeStatus, error)

// This aborts the given node. If the given node is complete then it recursively finds the running nodes and aborts them
AbortHandler(ctx context.Context, execContext ExecutionContext, dag DAGStructure, nl NodeLookup, currentNode v1alpha1.ExecutableNode, reason string) error
Expand Down
90 changes: 90 additions & 0 deletions pkg/controller/executors/queue_budget_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package executors

import (
"context"
"time"

"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
)

// Interface for the Workflow p. This is the mutable portion for a Workflow
type QueuingBudgetHandler interface {
GetNodeQueuingParameters(ctx context.Context, id v1alpha1.NodeID) (*NodeQueuingParameters, error)
}

type NodeQueuingParameters struct {
IsInterruptible bool
MaxQueueTime time.Duration
}

type defaultQueuingBudgetHandler struct {
dag DAGStructure
nl NodeLookup
wfBudget time.Duration
}

func (in *defaultQueuingBudgetHandler) GetNodeQueuingParameters(ctx context.Context, id v1alpha1.NodeID) (*NodeQueuingParameters, error) {

if id == v1alpha1.StartNodeID {
return nil, nil
}

upstreamNodes, err := in.dag.ToNode(id)
if err != nil {
return nil, err
}

nodeBudget := in.wfBudget
for _, upstreamNodeID := range upstreamNodes {
upstreamNodeStatus := in.nl.GetNodeExecutionStatus(ctx, upstreamNodeID)

if upstreamNodeStatus.GetPhase() == v1alpha1.NodePhaseSkipped {
// TODO handle skipped parent case: if parent doesn't have queue budget info then get it from its parent.
continue
}

var budget time.Duration
if upstreamNodeStatus.GetQueuingBudget() != nil && *&upstreamNodeStatus.GetQueuingBudget().Duration > 0 {
budget = *&upstreamNodeStatus.GetQueuingBudget().Duration
}

if upstreamNodeStatus.GetQueuedAt() != nil {
queuedAt := upstreamNodeStatus.GetQueuedAt().Time
if upstreamNodeStatus.GetLastAttemptStartedAt() == nil {
// nothing used
}
lastAttemptStartedAt := upstreamNodeStatus.GetLastAttemptStartedAt().Time
queuingDelay := lastAttemptStartedAt.Sub(queuedAt)
parentRemainingBudget := budget - queuingDelay

if nodeBudget > parentRemainingBudget {
nodeBudget = parentRemainingBudget
}
}
}

currNode, exists := in.nl.GetNode(id)
if !exists {
// mtoledo: what should be the error here
return nil, err
}
var interruptible bool
if currNode.IsInterruptible() != nil {
interruptible = *currNode.IsInterruptible()
}
// TODO: Where to get config value from?
//// a node is not considered interruptible if the system failures have exceeded the configured threshold
// currNodeStatus := in.nl.GetNodeExecutionStatus(ctx, id)
//if interruptible && currNodeStatus.GetSystemFailures() >= c.interruptibleFailureThreshold {
// interruptible = false
// c.metrics.InterruptedThresholdHit.Inc(ctx)
//}
//

return &NodeQueuingParameters{IsInterruptible: interruptible, MaxQueueTime: time.Second * time.Duration(nodeBudget)}, nil
}

// instead of *int64 use duration?
func NewDefaultQueuingBudgetHandler(dag DAGStructure, nl NodeLookup, queueingBudget *int64) QueuingBudgetHandler {
return &defaultQueuingBudgetHandler{dag: dag, nl: nl, wfBudget: time.Duration(*queueingBudget)}
}
13 changes: 9 additions & 4 deletions pkg/controller/nodes/branch/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func (b *branchHandler) Setup(ctx context.Context, setupContext handler.SetupCon
}

func (b *branchHandler) HandleBranchNode(ctx context.Context, branchNode v1alpha1.ExecutableBranchNode, nCtx handler.NodeExecutionContext, nl executors.NodeLookup) (handler.Transition, error) {
// Create Queue Budget Handler from nodeContext
// TODO(mtoledo): get dagStructure, ask about duration
queuingBudgetHandler := executors.NewDefaultQueuingBudgetHandler(nil, nl, nil)

if nCtx.NodeStateReader().GetBranchNode().FinalizedNodeID == nil {
nodeInputs, err := nCtx.InputReader().Get(ctx)
if err != nil {
Expand Down Expand Up @@ -64,7 +68,7 @@ func (b *branchHandler) HandleBranchNode(ctx context.Context, branchNode v1alpha

logger.Debugf(ctx, "Recursively executing branchNode's chosen path")
nodeStatus := nl.GetNodeExecutionStatus(ctx, nCtx.NodeID())
return b.recurseDownstream(ctx, nCtx, nodeStatus, finalNode)
return b.recurseDownstream(ctx, nCtx, nodeStatus, finalNode, queuingBudgetHandler)
}

// If the branchNodestatus was already evaluated i.e, Node is in Running status
Expand All @@ -89,7 +93,7 @@ func (b *branchHandler) HandleBranchNode(ctx context.Context, branchNode v1alpha

// Recurse downstream
nodeStatus := nl.GetNodeExecutionStatus(ctx, nCtx.NodeID())
return b.recurseDownstream(ctx, nCtx, nodeStatus, branchTakenNode)
return b.recurseDownstream(ctx, nCtx, nodeStatus, branchTakenNode, queuingBudgetHandler)
}

func (b *branchHandler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) (handler.Transition, error) {
Expand All @@ -104,12 +108,13 @@ func (b *branchHandler) Handle(ctx context.Context, nCtx handler.NodeExecutionCo
return b.HandleBranchNode(ctx, branchNode, nCtx, nl)
}

func (b *branchHandler) recurseDownstream(ctx context.Context, nCtx handler.NodeExecutionContext, nodeStatus v1alpha1.ExecutableNodeStatus, branchTakenNode v1alpha1.ExecutableNode) (handler.Transition, error) {
func (b *branchHandler) recurseDownstream(ctx context.Context, nCtx handler.NodeExecutionContext, nodeStatus v1alpha1.ExecutableNodeStatus, branchTakenNode v1alpha1.ExecutableNode, queuingBudgetHandler executors.QueuingBudgetHandler) (handler.Transition, error) {
// TODO we should replace the call to RecursiveNodeHandler with a call to SingleNode Handler. The inputs are also already known ahead of time
// There is no DAGStructure for the branch nodes, the branch taken node is the leaf node. The node itself may be arbitrarily complex, but in that case the node should reference a subworkflow etc
// The parent of the BranchTaken Node is the actual Branch Node and all the data is just forwarded from the Branch to the executed node.
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), nCtx.NodeID())
downstreamStatus, err := b.nodeExecutor.RecursiveNodeHandler(ctx, nCtx.ExecutionContext(), dag, nCtx.ContextualNodeLookup(), branchTakenNode)

downstreamStatus, err := b.nodeExecutor.RecursiveNodeHandler(ctx, nCtx.ExecutionContext(), dag, nCtx.ContextualNodeLookup(), queuingBudgetHandler, branchTakenNode)
if err != nil {
return handler.UnknownTransition, err
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/controller/nodes/dynamic/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, n
"DynamicWorkflowBuildFailed", err.Error(), nil)), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: err.Error()}, nil
}

trns, newState, err := d.progressDynamicWorkflow(ctx, execContext, dynamicWF, nl, nCtx, prevState)
// get queuing budget
queuingBudgetHandler := executors.NewDefaultQueuingBudgetHandler(dynamicWF, nl, nil)

trns, newState, err := d.progressDynamicWorkflow(ctx, execContext, dynamicWF, nl, nCtx, prevState, queuingBudgetHandler)
if err != nil {
return handler.UnknownTransition, prevState, err
}
Expand Down Expand Up @@ -434,9 +437,9 @@ func (d dynamicNodeTaskNodeHandler) getLaunchPlanInterfaces(ctx context.Context,
}

func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context, execContext executors.ExecutionContext, dynamicWorkflow v1alpha1.ExecutableWorkflow, nl executors.NodeLookup,
nCtx handler.NodeExecutionContext, prevState handler.DynamicNodeState) (handler.Transition, handler.DynamicNodeState, error) {
nCtx handler.NodeExecutionContext, prevState handler.DynamicNodeState, queuingBudgetHandler executors.QueuingBudgetHandler) (handler.Transition, handler.DynamicNodeState, error) {

state, err := d.nodeExecutor.RecursiveNodeHandler(ctx, execContext, dynamicWorkflow, nl, dynamicWorkflow.StartNode())
state, err := d.nodeExecutor.RecursiveNodeHandler(ctx, execContext, dynamicWorkflow, nl, queuingBudgetHandler, dynamicWorkflow.StartNode())
if err != nil {
return handler.UnknownTransition, prevState, err
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ func (c *nodeExecutor) handleNode(ctx context.Context, dag executors.DAGStructur

// The space search for the next node to execute is implemented like a DFS algorithm. handleDownstream visits all the nodes downstream from
// the currentNode. Visit a node is the RecursiveNodeHandler. A visit may be partial, complete or may result in a failure.
func (c *nodeExecutor) handleDownstream(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure, nl executors.NodeLookup, currentNode v1alpha1.ExecutableNode) (executors.NodeStatus, error) {
func (c *nodeExecutor) handleDownstream(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure, nl executors.NodeLookup, queueingBudgetHandler executors.QueuingBudgetHandler, currentNode v1alpha1.ExecutableNode) (executors.NodeStatus, error) {
logger.Debugf(ctx, "Handling downstream Nodes")
// This node is success. Handle all downstream nodes
downstreamNodes, err := dag.FromNode(currentNode.GetID())
Expand All @@ -512,7 +512,7 @@ func (c *nodeExecutor) handleDownstream(ctx context.Context, execContext executo
if !ok {
return executors.NodeStatusFailed(errors.Errorf(errors.BadSpecificationError, currentNode.GetID(), "Unable to find Downstream Node [%v]", downstreamNodeName)), nil
}
state, err := c.RecursiveNodeHandler(ctx, execContext, dag, nl, downstreamNode)
state, err := c.RecursiveNodeHandler(ctx, execContext, dag, nl, queueingBudgetHandler, downstreamNode)
if err != nil {
return executors.NodeStatusUndefined, err
}
Expand Down Expand Up @@ -569,7 +569,7 @@ func (c *nodeExecutor) SetInputsForStartNode(ctx context.Context, execContext ex
return executors.NodeStatusComplete, nil
}

func (c *nodeExecutor) RecursiveNodeHandler(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure, nl executors.NodeLookup, currentNode v1alpha1.ExecutableNode) (executors.NodeStatus, error) {
func (c *nodeExecutor) RecursiveNodeHandler(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure, nl executors.NodeLookup, queuingBudgetHandler executors.QueuingBudgetHandler, currentNode v1alpha1.ExecutableNode) (executors.NodeStatus, error) {
currentNodeCtx := contextutils.WithNodeID(ctx, currentNode.GetID())
nodeStatus := nl.GetNodeExecutionStatus(ctx, currentNode.GetID())

Expand All @@ -594,7 +594,7 @@ func (c *nodeExecutor) RecursiveNodeHandler(ctx context.Context, execContext exe
return executors.NodeStatusRunning, nil
}

nCtx, err := c.newNodeExecContextDefault(ctx, currentNode.GetID(), execContext, nl)
nCtx, err := c.newNodeExecContextDefault(ctx, currentNode.GetID(), execContext, nl, queuingBudgetHandler)
if err != nil {
// NodeExecution creation failure is a permanent fail / system error.
// Should a system failure always return an err?
Expand All @@ -612,7 +612,7 @@ func (c *nodeExecutor) RecursiveNodeHandler(ctx context.Context, execContext exe
// Currently we treat either Skip or Success the same way. In this approach only one node will be skipped
// at a time. As we iterate down, further nodes will be skipped
case v1alpha1.NodePhaseSucceeded, v1alpha1.NodePhaseSkipped:
return c.handleDownstream(ctx, execContext, dag, nl, currentNode)
return c.handleDownstream(ctx, execContext, dag, nl, queuingBudgetHandler, currentNode)
case v1alpha1.NodePhaseFailed:
logger.Debugf(currentNodeCtx, "Node Failed")
return executors.NodeStatusFailed(errors.Errorf(errors.RuntimeExecutionError, currentNode.GetID(), "Node Failed.")), nil
Expand All @@ -636,7 +636,7 @@ func (c *nodeExecutor) FinalizeHandler(ctx context.Context, execContext executor
return err
}

nCtx, err := c.newNodeExecContextDefault(ctx, currentNode.GetID(), execContext, nl)
nCtx, err := c.newNodeExecContextDefault(ctx, currentNode.GetID(), execContext, nl, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -689,7 +689,7 @@ func (c *nodeExecutor) AbortHandler(ctx context.Context, execContext executors.E
return err
}

nCtx, err := c.newNodeExecContextDefault(ctx, currentNode.GetID(), execContext, nl)
nCtx, err := c.newNodeExecContextDefault(ctx, currentNode.GetID(), execContext, nl, nil)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/nodes/handler/node_exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package handler

import (
"context"
"time"

"github.com/lyft/flyteidl/clients/go/events"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
Expand Down Expand Up @@ -38,6 +39,7 @@ type NodeExecutionMetadata interface {
GetAnnotations() map[string]string
GetK8sServiceAccount() string
IsInterruptible() bool
GetMaxQueueTime() time.Duration
}

type NodeExecutionContext interface {
Expand Down
20 changes: 11 additions & 9 deletions pkg/controller/nodes/mocks/output_resolver.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading