Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] [flytekit] Polish - Map Tasks #6139

Open
2 tasks done
wild-endeavor opened this issue Jan 5, 2025 · 7 comments
Open
2 tasks done

[Draft] [flytekit] Polish - Map Tasks #6139

wild-endeavor opened this issue Jan 5, 2025 · 7 comments
Labels
backlogged For internal use. Reserved for contributor team workflow. enhancement New feature or request untriaged This issues has not yet been looked at by the Maintainers

Comments

@wild-endeavor
Copy link
Contributor

wild-endeavor commented Jan 5, 2025

Map Task Polish

This is a series of tickets to improve the flytekit authoring experience. If any changes are not possible to make in a backwards-compatible way, split it out into a separate ticket.

Rename map task

Rename map_task to just map This will interfere with the native Python map but it's okay, as we now recommend users import flytekit as fl

Failure Toleration

The failure toleration parameters for map_task are very powerful but too verbose. Let's mark min_successes and min_success_ratio as deprecated and make a new argument tolerance that is type float | int.

Parallelism

Deprecate the max_parallelism argument of workflow and LaunchPlan (is there one?) make a new one called concurrency to match that of map_task.

Are you sure this issue hasn't been raised already?

  • Yes

Have you read the Code of Conduct?

  • Yes
@wild-endeavor wild-endeavor added enhancement New feature or request untriaged This issues has not yet been looked at by the Maintainers labels Jan 5, 2025
@wild-endeavor wild-endeavor changed the title [Draft] [Core feature] [Draft] [flytekit] Polish - Map Tasks Jan 5, 2025
@wild-endeavor wild-endeavor added the backlogged For internal use. Reserved for contributor team workflow. label Jan 5, 2025
@granthamtaylor
Copy link

Can we also consider ways to include fixed arguments? Currently, defining partial functions seems unnecessarily verbose.

Consider the following map_task:

@fl.task
def my_task(greeting: str, name: str):
    print(f"{greeting}, {name}")

@fl.workflow
def my_workflow(greeting: str="hello", names: list[str]=["bob", "bill"]):

    fl.map_task(partial(my_task, greeting=greeting) min_success_ratio=0.5))(name=names)

I see two strategies here:

  1. Leverage the type hints to determine which arguments need to be fixed, and which are dynamic.
@fl.workflow
def my_new_workflow(greeting: str="hello", names: list[str]=["bob", "bill"]):

    fl.map(my_task, greeting=greeting, name=names, tolerance=0.5)

    # rename `map_task` to `apply`
    # tolerance is of type `float|int`
    # greeting and name are both **kwargs, and are either used as dynamic or static inputs depending on the type hints used at registration time
  1. map could instead be a class class map: ... that has some methods (__init__, fix, run)
@fl.workflow
def my_new_workflow(greeting: str="hello", names: list[str]=["bob", "bill"]):

    fl.map(my_task, tolerance=0.5).fix(greeting=greeting).run(name=names)
  1. Use a context manager
@fl.workflow
def my_new_workflow(greeting: str="hello", names: list[str]=["bob", "bill"]):

    with fl.map(my_task, tolerance=0.5) as mapper:
        mapper(greeting=greeting, name=names)

The downside of option 1 is that there are reserved argument names to map (tolerance, concurrency) which means that the task cannot have arguments of the same name.

2 and 3 are a little bit strange.

@wild-endeavor
Copy link
Contributor Author

@cosmicBboy you mentioned maybe max_concurrency? @granthamtaylor what do you think?

also @granthamtaylor can we move your bullet to a separate issue? (i can copy it.) wanted to keep these tickets to the ones where there was pretty much unanimous support.

@cosmicBboy
Copy link
Contributor

cosmicBboy commented Jan 9, 2025

Hmm, I don't think any of the alternative invocation syntaxes make it easier for DS users to learn, except maybe for (1).

(1) Leverage the type hints to determine which arguments need to be fixed, and which are dynamic

I think we'll find many annoying edgecases here, e.g. support for mapping over two lists of elements of matching length

(2) feels really DSL-y... not a bad thing, but I might as well learn the more generalizable Pythonic syntax for this

(3) seems like an inappropriate use of context managers.

A slight variation to (1) that might be clearer and less error-prone:

@fl.task
def my_task(greeting: str, name: str):
    print(f"{greeting}, {name}")

@fl.workflow
def my_new_workflow(greeting: str="hello", names: list[str]=["bob", "bill"]):
    fl.map(my_task, partial_kwargs={"greeting": greeting}, tolerance=0.5)(names)

    # or
    fl.map(my_task, partial={"greeting": greeting}, tolerance=0.5)(names)
    
    # or
    fl.map(my_task, fixed_kwargs={"greeting": greeting}, tolerance=0.5)(names)

@granthamtaylor
Copy link

I think we'll find many annoying edgecases here, e.g. support for mapping over two lists of elements of matching length

I don't see how this is an edge case. The typing should dictate the behavior, not the lengths of inputs.

What I had meant by #1 is that if a task has an argument of type T and we input an object of type T, then we know for a fact that this input should be fixed. Whereas, if the input is of type List[T] then the input should be dynamic.

This will all be know at registration time. Perhaps all of these inputs could be provided either to the call of the parameterized map: flytekit.map(my_task)(my_value=my_values) or via a dictionary like input: flytekit.map(my_task, kwargs={"my_value": my_values})

Both are a little wanting, and rely upon static typing. However, that is arguably already a precedent with map (Optional[T] outputs define the expected behavior of failed nodes)

@granthamtaylor
Copy link

Deprecate the max_parallelism argument of workflow and LaunchPlan (is there one?)

There is one for Launchplans, but not for workflow. This has also been a common request, and I would like to see it able to be defined for individual workflows without having to wrap a Launchplan around them.

Last I checked, the max_parallelism parameter was broken for Launchplans but that was like three months ago. Might be fixed now.

@cosmicBboy
Copy link
Contributor

cosmicBboy commented Jan 13, 2025

I don't see how this is an edge case. The typing should dictate the behavior, not the lengths of inputs.

A pretty straightforward case is:

@fl.task
def my_task(
    x: int,  # 👈 map over this
    y: list[int],  # 👈 a finite set of values that I don't want to map over
    z: str,
):
    ...

Under proposal (1) in this comment, the workflow would be:

@fl.workflow
def wf(x_list: list[int], y: list[int], z: str):
    fl.map(my_task, x=x_list, y=y, z=z)

We would have to automagically do the following work under the hood:

  • x is defined as an int in the task definition, but list[int] is provided in the map call fl.map(my_task, x=x_list)
  • infer that the x input is being mapped over since x: int in task def and list[int] is passed into the map call.
  • the types for y and z in the fl.map definition match the task function types, so those are fixed inputs

This is possible, but should we do this? It feels a little too magical for me, but curious what others think.

@granthamtaylor
Copy link

It feels a little too magical for me, but curious what others think.

I kind of like that it is so magical. It would also maintain backwards compatibility (one can just use partial).

However, I can see that it is perhaps too magical. Open to feedback!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backlogged For internal use. Reserved for contributor team workflow. enhancement New feature or request untriaged This issues has not yet been looked at by the Maintainers
Projects
Status: Backlog
Development

No branches or pull requests

3 participants