Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERFORMANCE: Earlier signaling of error for resolve() and value() #622

Open
HenrikBengtsson opened this issue May 10, 2022 · 1 comment
Open
Labels
enhancement Frontend API Part of the Future API that users of futures rely on

Comments

@HenrikBengtsson
Copy link
Collaborator

HenrikBengtsson commented May 10, 2022

The below example shows how created, but not yet launched futures, continue to launch even after one produces an error:

library(future)
library(progressr)
progressr::handlers(global = TRUE)

plan(sequential)

my_fcn <- function(n) {
  p <- progressr::progressor(n)
  fs <- lapply(seq_len(n), FUN = function(ii) {
    future({
      if (ii %in% c(2L, 4L)) {
        p(sprintf("ii = %d <= FAIL", ii), class = "sticky") ## signal progress
        stop(sprintf("Boom! (ii = %d)", ii))
      }
      p(sprintf("ii = %d", ii), class = "sticky") ## signal progress
      Sys.sleep(1.0)
      ii
    })
  })
  value(fs)
}

y <- my_fcn(5)
#> ii = 1                                                                        
#> ii = 2 <= FAIL                                                                
#> ii = 3                                                                        
#> ii = 4 <= FAIL                                                                       
#> ii = 5                                                                        
#> Error in eval(quote({ : Boom! (ii = 2)

The question is, should the error have precedence over launching yet-to-be-launched futures? That probably depends on the use case, but at least there should be an option to do so. For example, in higher-level map-reduce functions it makes sense to do so. If so, the above function could exit early, i.e.

y <- my_fcn(5)
#> ii = 1                                                                        
#> ii = 2 <= FAIL                                                                
#> Error in eval(quote({ : Boom! (ii = 2)

Imagine what a difference this would make if we called my_fcn(1000).

Internally, value(fs) calls:

fs <- resolve(fs, result = TRUE, stdout = TRUE, signal = TRUE, force = TRUE)

which won't return until all futures have been resolved, but it won't signal errors (although it will signal all other type of conditions). That's done later by value(fs). So, if we want to have value(fs) to error early, and avoid launching the remaining futures, we need to add this feature to resolve() first.

Now, if we allow resolve() to signal errors, and thereby terminating early, we have to make sure any parallel futures that have already been launched are let to finish first, before the error is signaled. We do not want to have any stray futures remaining when we exit resolve() or value() (... or the calling future.apply, furrr, ... function)

Related to the above is the promise to relay stdout or conditions in order, regardless of future backend, i.e. parallel output should be the same as sequential output. This requires that all preceding futures are resolved, so their output can be relayed, before signaling an error. This is in most cases already covered by the previous consideration. However, if we use a randomize ordering of futures at the map-reduce level, this is not necessarily true, e.g. future_lapply(..., future.chunk.size = structure(1L, ordering = "random"). Unfortunately, this reordering is orchestrated by future.apply and not future, so resolved cannot possibly know the true, intended order. Luckily, resolve() for lists takes an optional argument idxs, which allows us to process the futures in the order given by idxs. We could make use of that at the map-reduce level to make sure all output is relayed and in order.

Care needs to be taken to make sure the order of relayed stdout and conditions are preserved if there's an error, just as we already do. However, if we allow an error to terminate the resolve process, we should probably be conservative and collect and relay

@HenrikBengtsson HenrikBengtsson added the Frontend API Part of the Future API that users of futures rely on label May 10, 2022
@HenrikBengtsson
Copy link
Collaborator Author

HenrikBengtsson commented May 10, 2022

Note that the proposed improvement only affects the higher-level map-reduce functions when each parallel worker processes more than one chunk, e.g. future.chunk.size = 1L of future.apply. In contrast, the default is to split up the iterations uniformly across all workers and have one chunk (=future) per worker, which means they will all run. Using the above example, the default corresponds to:

library(future.apply)
library(progressr)
progressr::handlers(global = TRUE)

plan(sequential)

my_fcn <- function(n) {
  p <- progressr::progressor(n)
  future_lapply(seq_len(n), FUN = function(ii) {
    if (ii %in% c(2L, 4L)) {
      p(sprintf("ii = %d <= FAIL", ii), class = "sticky") ## signal progress
      stop(sprintf("Boom! (ii = %d)", ii))
    }
    p(sprintf("ii = %d", ii), class = "sticky") ## signal progress
    Sys.sleep(1.0)
    ii
  })
}

y <- my_fcn(5)
#> ii = 1                                                                        
#> ii = 2 <= FAIL                                                                
#> Error in ...future.FUN(...future.X_jj, ...) : Boom! (ii = 2)

which signals the error without launching the remaining futures. In contrast, if we use future.chunk.size = 1L, we have the case illustrated in #622 (comment), e.g.

my_fcn <- function(n) {
  p <- progressr::progressor(n)
  future_lapply(seq_len(n), FUN = function(ii) {
    if (ii %in% c(2L, 4L)) {
      p(sprintf("ii = %d <= FAIL", ii), class = "sticky") ## signal progress
      stop(sprintf("Boom! (ii = %d)", ii))
    }
    p(sprintf("ii = %d", ii), class = "sticky") ## signal progress
    Sys.sleep(1.0)
    ii
  }, future.chunk.size = 1L)
}

y <- my_fcn(5)
#> ii = 1                                                                        
#> ii = 2 <= FAIL                                                                
#> ii = 3                                                                        
#> ii = 4 <= FAIL                                                                
#> ii = 5                                                                        
#> Error in ...future.FUN(...future.X_jj, ...) : Boom! (ii = 2)                  

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Frontend API Part of the Future API that users of futures rely on
Projects
None yet
Development

No branches or pull requests

1 participant