Skip to content

Commit

Permalink
improve error message in the agent plugin (#6114)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <pingsutw@apache.org>
  • Loading branch information
pingsutw authored Dec 18, 2024
1 parent 3c03cff commit 1f533ab
Showing 1 changed file with 12 additions and 11 deletions.
23 changes: 12 additions & 11 deletions flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ func (p *Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContext
webapi.Resource, error) {
taskTemplate, err := taskCtx.TaskReader().Read(ctx)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to read task template with error: %v", err)
}
inputs, err := taskCtx.InputReader().Get(ctx)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to read inputs with error: %v", err)
}

var argTemplate []string
Expand All @@ -98,7 +98,7 @@ func (p *Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContext
argTemplate = taskTemplate.GetContainer().GetArgs()
modifiedArgs, err := template.Render(ctx, taskTemplate.GetContainer().GetArgs(), templateParameters)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to render args with error: %v", err)
}
taskTemplate.GetContainer().Args = modifiedArgs
defer func() {
Expand Down Expand Up @@ -135,7 +135,7 @@ func (p *Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContext
request := &admin.CreateTaskRequest{Inputs: inputs, Template: taskTemplate, OutputPrefix: outputPrefix, TaskExecutionMetadata: &taskExecutionMetadata}
res, err := client.CreateTask(finalCtx, request)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to create task from agent with %v", err)
}

return ResourceMetaWrapper{
Expand All @@ -153,7 +153,8 @@ func (p *Plugin) ExecuteTaskSync(
) (webapi.ResourceMeta, webapi.Resource, error) {
stream, err := client.ExecuteTaskSync(ctx)
if err != nil {
return nil, nil, err
logger.Errorf(ctx, "failed to execute task from agent with %v", err)
return nil, nil, fmt.Errorf("failed to execute task from agent with %v", err)
}

headerProto := &admin.ExecuteTaskSyncRequest{
Expand Down Expand Up @@ -185,8 +186,8 @@ func (p *Plugin) ExecuteTaskSync(

in, err := stream.Recv()
if err != nil {
logger.Errorf(ctx, "failed to receive from server %s", err.Error())
return nil, nil, err
logger.Errorf(ctx, "failed to receive stream from server %s", err.Error())
return nil, nil, fmt.Errorf("failed to receive stream from server %w", err)
}
if in.GetHeader() == nil {
return nil, nil, fmt.Errorf("expected header in the response, but got none")
Expand All @@ -202,7 +203,7 @@ func (p *Plugin) ExecuteTaskSync(
LogLinks: resource.GetLogLinks(),
CustomInfo: resource.GetCustomInfo(),
AgentError: resource.GetAgentError(),
}, err
}, nil
}

func (p *Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest webapi.Resource, err error) {
Expand All @@ -223,7 +224,7 @@ func (p *Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest web
}
res, err := client.GetTask(finalCtx, request)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get task from agent with %v", err)
}

return ResourceWrapper{
Expand Down Expand Up @@ -256,7 +257,7 @@ func (p *Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error
ResourceMeta: metadata.AgentResourceMeta,
}
_, err = client.DeleteTask(finalCtx, request)
return err
return fmt.Errorf("failed to delete task from agent with %v", err)
}

func (p *Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase core.PhaseInfo, err error) {
Expand Down Expand Up @@ -307,7 +308,7 @@ func (p *Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phas
err = writeOutput(ctx, taskCtx, resource.Outputs)
if err != nil {
logger.Errorf(ctx, "failed to write output with err %s", err.Error())
return core.PhaseInfoUndefined, err
return core.PhaseInfoUndefined, fmt.Errorf("failed to write output with err %s", err.Error())
}
return core.PhaseInfoSuccess(taskInfo), nil
}
Expand Down

0 comments on commit 1f533ab

Please sign in to comment.