Skip to content

Commit

Permalink
fix: get task variable_map (TODO: fix workflow)
Browse files Browse the repository at this point in the history
Signed-off-by: mao3267 <chenvincent610@gmail.com>
  • Loading branch information
mao3267 committed Jan 16, 2025
1 parent 07878e0 commit 0507fe4
Show file tree
Hide file tree
Showing 24 changed files with 321 additions and 123 deletions.
2 changes: 2 additions & 0 deletions flyteadmin/dataproxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ func (s Service) GetDataFromNodeExecution(ctx context.Context, nodeExecID *core.
var vm *core.VariableMap
if ioType == common.ArtifactTypeI {
lm = resp.GetFullInputs()
vm = resp.GetInputVariableMap()
} else if ioType == common.ArtifactTypeO {
lm = resp.GetFullOutputs()
vm = resp.GetOutputVariableMap()
Expand Down Expand Up @@ -423,6 +424,7 @@ func (s Service) GetDataFromTaskExecution(ctx context.Context, taskExecID *core.

if ioType == common.ArtifactTypeI {
lm = resp.GetFullInputs()
vm = resp.GetInputVariableMap()
} else if ioType == common.ArtifactTypeO {
lm = resp.GetFullOutputs()
vm = resp.GetOutputVariableMap()
Expand Down
8 changes: 4 additions & 4 deletions flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ require (
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/emicklei/go-restful/v3 v3.12.0 // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
Expand Down Expand Up @@ -147,7 +147,7 @@ require (
github.com/lestrrat-go/option v1.0.1 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mattn/go-isatty v0.0.18 // indirect
github.com/mattn/go-sqlite3 v2.0.3+incompatible // indirect
github.com/mattn/goveralls v0.0.6 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand All @@ -159,7 +159,7 @@ require (
github.com/ory/go-convenience v0.1.0 // indirect
github.com/ory/viper v1.7.5 // indirect
github.com/pborman/uuid v1.2.0 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.0-beta.8 // indirect
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
Expand All @@ -171,7 +171,7 @@ require (
github.com/sendgrid/rest v2.6.9+incompatible // indirect
github.com/shamaton/msgpack/v2 v2.2.2 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.11.0 // indirect
Expand Down
18 changes: 9 additions & 9 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP
github.com/elastic/go-sysinfo v1.1.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0=
github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU=
github.com/elazarl/goproxy v0.0.0-20181003060214-f58a169a71a5/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/emicklei/go-restful/v3 v3.12.0 h1:y2DdzBAURM29NFF94q6RaY4vjIH1rtwDapwQtU84iWk=
github.com/emicklei/go-restful/v3 v3.12.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -970,8 +970,8 @@ github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2y
github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.18 h1:DOKFKCQ7FNG2L1rbrmstDN4QVRdS89Nkh85u68Uwp98=
github.com/mattn/go-isatty v0.0.18/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
Expand Down Expand Up @@ -1083,8 +1083,8 @@ github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUr
github.com/pelletier/go-toml v1.6.0/go.mod h1:5N711Q9dKgbdkxHL+MEfF31hpT7l0S0s/t2kKREewys=
github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
github.com/pelletier/go-toml v1.8.0/go.mod h1:D6yutnOGMveHEPV7VQOuvI/gXY61bv+9bAOTRnLElKs=
github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM=
github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pelletier/go-toml/v2 v2.0.0-beta.8 h1:dy81yyLYJDwMTifq24Oi/IslOslRrDSb3jwDggjz3Z0=
github.com/pelletier/go-toml/v2 v2.0.0-beta.8/go.mod h1:r9LEWfGN8R5k0VXJ+0BkIe7MYkRdwZOjgMj2KwnJFUo=
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE=
Expand Down Expand Up @@ -1213,8 +1213,8 @@ github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B
github.com/spf13/afero v1.2.0/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/spf13/afero v1.3.2/go.mod h1:5KUK8ByomD5Ti5Artl0RtHeI5pTF7MIDuXL3yY520V4=
github.com/spf13/afero v1.8.2 h1:xehSyVa0YnHWsJ49JFljMpg1HX19V6NDZ1fkm1Xznbo=
github.com/spf13/afero v1.8.2/go.mod h1:CtAatgMJh6bJEIs48Ay/FOnkljP3WeGUG0MC1RfAqwo=
github.com/spf13/afero v1.9.2 h1:j49Hj62F0n+DaZ1dDCvhABaPNSGNkt32oRFxI33IEMw=
github.com/spf13/afero v1.9.2/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y=
github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
Expand Down Expand Up @@ -1650,10 +1650,10 @@ golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Expand Down
79 changes: 69 additions & 10 deletions flyteadmin/pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,26 +538,84 @@ func (m *NodeExecutionManager) GetNodeExecutionData(

var outputs *core.LiteralMap
var outputURLBlob *admin.UrlBlob
var outputVariableMap *core.VariableMap
group.Go(func() error {
var err error
outputs, outputURLBlob, err = util.GetOutputs(groupCtx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
m.storageClient, nodeExecution.GetClosure())
return err
})

// TODO: Get the output variable map from the node execution model
// group.Go(func() error {
// var err error
// Get the output variable map from workflow model TypedInterface
var inputVariableMap, outputVariableMap *core.VariableMap
group.Go(func() error {
var err error

switch nodeExecution.GetClosure().GetTargetMetadata().(type) {
case *admin.NodeExecutionClosure_WorkflowNodeMetadata:
execID := nodeExecution.GetClosure().GetTargetMetadata().(*admin.NodeExecutionClosure_WorkflowNodeMetadata).WorkflowNodeMetadata.GetExecutionId()
workflowModel, err := m.db.WorkflowRepo().Get(groupCtx, repoInterfaces.Identifier{
Project: execID.GetProject(),
Domain: execID.GetDomain(),
Name: execID.GetName(),
})

if err != nil {
logger.Debugf(groupCtx, "Failed to get workflow model for node execution [%+v] with err %v", request.GetId(), err)
return err
}
workflow, err := transformers.FromWorkflowModel(workflowModel)
if err != nil {
logger.Debugf(groupCtx, "Failed to transform workflow model for node execution [%+v] with err %v", request.GetId(), err)
return err
}

// modelNode, err := m.db.NodeExecutionRepo().Get(groupCtx, repoInterfaces.NodeExecutionResource{
// NodeExecutionIdentifier: request.GetId(),
// })
inputVariableMap = workflow.GetClosure().GetCompiledWorkflow().GetPrimary().GetTemplate().GetInterface().GetInputs()
outputVariableMap = workflow.GetClosure().GetCompiledWorkflow().GetPrimary().GetTemplate().GetInterface().GetOutputs()

// node, err := transformers.FromNodeExecutionModel(modelNode, transformers.DefaultExecutionTransformerOptions)
case *admin.NodeExecutionClosure_TaskNodeMetadata:
execID := nodeExecution.GetId().GetExecutionId()
executionModel, err := m.db.ExecutionRepo().Get(groupCtx, repoInterfaces.Identifier{
Project: execID.GetProject(),
Domain: execID.GetDomain(),
Name: execID.GetName(),
})

if err != nil {
logger.Debugf(groupCtx, "Failed to get execution model for node execution [%+v] with err %v", request.GetId(), err)
return err
}

execution, err := transformers.FromExecutionModel(groupCtx, executionModel, transformers.DefaultExecutionTransformerOptions)

if err != nil {
logger.Debugf(groupCtx, "Failed to transform execution model for node execution [%+v] with err %v", request.GetId(), err)
return err
}

// return err
// })
taskModel, err := m.db.TaskRepo().Get(groupCtx, repoInterfaces.Identifier{
Project: execID.GetProject(),
Domain: execID.GetDomain(),
Name: execution.GetSpec().GetLaunchPlan().GetName(),
Version: execution.GetSpec().GetLaunchPlan().GetVersion(),
})

if err != nil {
logger.Debugf(groupCtx, "Failed to get task model for node execution [%+v] with err %v", request.GetId(), err)
return err
}

task, err := transformers.FromTaskModel(taskModel)

if err != nil {
logger.Debugf(groupCtx, "Failed to transform task model for node execution [%+v] with err %v", request.GetId(), err)
return err
}

inputVariableMap = task.GetClosure().GetCompiledTask().GetTemplate().GetInterface().GetInputs()
outputVariableMap = task.GetClosure().GetCompiledTask().GetTemplate().GetInterface().GetOutputs()
}
return err
})

err = group.Wait()
if err != nil {
Expand All @@ -570,6 +628,7 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
FullInputs: inputs,
FullOutputs: outputs,
FlyteUrls: common.FlyteURLsFromNodeExecutionID(request.GetId(), nodeExecution.GetClosure() != nil && nodeExecution.GetClosure().GetDeckUri() != ""),
InputVariableMap: inputVariableMap,
OutputVariableMap: outputVariableMap,
}

Expand Down
17 changes: 15 additions & 2 deletions flyteadmin/pkg/manager/impl/task_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func (m *TaskExecutionManager) GetTaskExecutionData(

var outputs *core.LiteralMap
var outputURLBlob *admin.UrlBlob
var outputVariableMap *core.VariableMap
var inputVariableMap, outputVariableMap *core.VariableMap
group.Go(func() error {
var err error
outputs, outputURLBlob, err = util.GetOutputs(groupCtx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
Expand All @@ -345,8 +345,20 @@ func (m *TaskExecutionManager) GetTaskExecutionData(
Version: taskExecution.GetId().GetTaskId().GetVersion(),
})

if err != nil {
logger.Debugf(groupCtx, "Failed to get task [%+v] with err %v", taskExecution.GetId().GetTaskId(), err)
return err
}

task, err := transformers.FromTaskModel(taskModel)
outputVariableMap = task.GetClosure().GetCompiledTask().GetTemplate().GetInterface().Outputs

if err != nil {
logger.Debugf(groupCtx, "Failed to transform task model [%+v] with err %v", taskModel, err)
return err
}

inputVariableMap = task.GetClosure().GetCompiledTask().GetTemplate().GetInterface().GetInputs()
outputVariableMap = task.GetClosure().GetCompiledTask().GetTemplate().GetInterface().GetOutputs()

return err
})
Expand All @@ -363,6 +375,7 @@ func (m *TaskExecutionManager) GetTaskExecutionData(
FullOutputs: outputs,
FlyteUrls: common.FlyteURLsFromTaskExecutionID(request.GetId(), false),
OutputVariableMap: outputVariableMap,
InputVariableMap: inputVariableMap,
}

m.metrics.TaskExecutionInputBytes.Observe(float64(response.GetInputs().GetBytes()))
Expand Down
15 changes: 15 additions & 0 deletions flyteidl/clients/go/admin/mocks/isGetDataResponse_VariableMap.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions flyteidl/clients/go/assets/admin.swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions flyteidl/gen/pb-es/flyteidl/admin/node_execution_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions flyteidl/gen/pb-es/flyteidl/admin/task_execution_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0507fe4

Please sign in to comment.