Skip to content

Commit

Permalink
Added availableWorkers(), cf. availableCores() [#118]
Browse files Browse the repository at this point in the history
  • Loading branch information
HenrikBengtsson committed Jan 5, 2017
1 parent 6782d2b commit 72f8884
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 1 deletion.
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export(MultisessionFuture)
export(UniprocessFuture)
export(as.cluster)
export(availableCores)
export(availableWorkers)
export(backtrace)
export(cluster)
export(eager)
Expand Down
1 change: 1 addition & 0 deletions NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ Package: future
===============

Version: 1.2.0-9000 [2017-01-05]
o Added availableWorkers(), cf. availableCores().
o Added support for controlling whether a future is resolved eagerly
or lazily when creating the future, e.g. future(..., lazy = TRUE)
futureAssign(..., lazy = TRUE), and x %<-% { ... } %lazy% TRUE.
Expand Down
4 changes: 4 additions & 0 deletions R/availableCores.R
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@
#' setting the number of workers when specifying the future strategy,
#' e.g. \code{plan(multiprocess, workers=9)}.
#'
#' @seealso
#' To get the number of available workers regardless of machine,
#' see \code{\link{availableWorkers}()}.
#'
#' @export
#' @keywords internal
availableCores <- function(constraints=NULL, methods=getOption("future.availableCores.methods", c("system", "mc.cores+1", "_R_CHECK_LIMIT_CORES_", "PBS", "SGE", "Slurm")), na.rm=TRUE, default=c(current=1L), which=c("min", "max", "all")) {
Expand Down
122 changes: 122 additions & 0 deletions R/availableWorkers.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#' Get set of available workers
#'
#' @param methods A character vector specifying how to infer the number
#' of available cores.
#'
#' @param na.rm If TRUE, only non-missing settings are considered/returned.
#'
#' @param default The default set of workers.
#'
#' @param which A character specifying which set / sets to return.
#' If \code{"first"}, the first non-empty set found.
#' If \code{"min"}, the minimum value is returned.
#' If \code{"max"}, the maximum value is returned (be careful!)
#' If \code{"all"}, all values are returned.#'
#'
#' @return Return a character vector of workers, which typically consists
#' of names of machines / compute nodes, but may also be IP numbers.
#'
#' @details
#' The default set of workers for each method is
#' \code{rep("localhost", times = availableCores(method))}, which means
#' that each will at least use as many parallel workers on the current
#' machine that \code{\link{availableCores}()} allows for that method.
#'
#' In addition, the following settings ("methods") are also acknowledged:
#' \itemize{
#' \item \code{"PBS"} -
#' Query Torque/PBS environment variable \env{PBS_NODEFILE}.
#' If this is set and specifies an existing file, then the set
#' of workers is read from that file, where one worker (node)
#' is given per line.
#' }
#'
#' @seealso
#' To get the number of available workers on the current machine,
#' see \code{\link{availableCores}()}.
#'
#' @importFrom utils file_test
#' @export
#' @keywords internal
availableWorkers <- function(methods=getOption("future.availableCores.methods", c("PBS", "SGE", "Slurm", "_R_CHECK_LIMIT_CORES_", "mc.cores+1", "system")), na.rm=TRUE, default="localhost", which=c("first", "min", "max", "all")) {
## Local functions
getenv <- function(name) {
as.character(trim(Sys.getenv(name, NA_character_)))
} # getenv()

getopt <- function(name) {
as.character(getOption(name, NA_character_))
} # getopt()

split <- function(s) {
x <- unlist(strsplit(s, split = "[,]", fixed = FALSE), use.names = FALSE)
x <- trim(x)
x <- x[nzchar(x)]
x
}

which <- match.arg(which)
stopifnot(is.character(default), length(default) >= 1, !anyNA(default))


## Default is to use the current machine
ncores <- availableCores(methods = methods, na.rm = FALSE, which = "all")
workers <- lapply(ncores, FUN = function(n) {
if (length(n) == 0 || is.na(n)) n <- 0L
rep("localhost", times = n)
})

## Acknowledge known HPC settings
for (method in methods) {
if (method == "PBS") {
pathname <- getenv("PBS_NODEFILE")
if (is.na(pathname)) next
if (!file_test("-f", pathname)) {
warning(sprintf("Environent variable %s was set but no such file %s exists", sQuote("PBS_NODEFILE"), sQuote(pathname)))
next
}
## One node per line
w <- readLines(pathname)
} else {
## Fall back to querying option and system environment variable
## with the given name
w <- getopt(method)
if (is.na(w)) w <- getenv(method)
if (is.na(w)) next
w <- split(w)
}

## Drop missing or empty values?
if (na.rm) w <- w[!is.na(w)]

workers[[method]] <- w
}

nnodes <- unlist(lapply(workers, FUN = length), use.names = FALSE)
if (which == "first") {
idx <- which(is.finite(nnodes) & nnodes > 0L, useNames = FALSE)[1]
workers <- if (is.na(idx)) character(0L) else workers[[idx]]
} else if (which == "min") {
idx <- which.min(nnodes)
workers <- workers[[idx]]
} else if (which == "max") {
idx <- which.max(nnodes)
workers <- workers[[idx]]
}

## Fall back to default?
if (is.character(workers) && length(workers) == 0) workers <- default

## Sanity checks
min_count <- as.integer(na.rm)
if (is.list(workers)) {
lapply(workers, FUN = function(w) {
stopifnot(is.character(w), length(w) >= 0L, all(nchar(w) > 0))
})
} else {
stopifnot(is.character(workers), length(workers) >= min_count, all(nchar(workers) > 0))
}

workers
} # availableWorkers()

6 changes: 5 additions & 1 deletion man/availableCores.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 53 additions & 0 deletions man/availableWorkers.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 52 additions & 0 deletions tests/availableWorkers.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
source("incl/start.R")

message("*** availableWorkers() ...")

## The default
w <- availableWorkers()
print(w)
stopifnot(is.character(w), length(w) >= 1)

## Minimium of all known settings (default)
print(availableWorkers(which="min"))

## Maximum of all known settings (should never be used)
print(availableWorkers(which="max"))

## All known settings
print(availableWorkers(na.rm=FALSE, which="all"))

## System settings
w <- availableWorkers(methods="system")
print(w)
stopifnot(is.character(w), length(w) >= 1)

## Predefined ones for known cluster schedulers
print(availableWorkers(methods="PBS"))
print(availableWorkers(methods="SGE"))
print(availableWorkers(methods="Slurm"))

## PBS settings
pathname <- tempfile()
workers0 <- c("n1", "n2", "n3", "n1", "n6")
writeLines(workers0, con = pathname)
Sys.setenv(PBS_NODEFILE = pathname)
workers <- availableWorkers(methods="PBS")
print(workers)
stopifnot(length(workers) == length(workers0), all(workers == workers0))


## Any R options and system environment variable
print(availableWorkers(methods=c("width", "FOO_BAR_ENV"),
na.rm=FALSE, which="all"))

## Exception handling
Sys.setenv("FOO_BAR_ENV"="0")
res <- try(availableWorkers(methods="FOO_BAR_ENV"), silent=TRUE)
stopifnot(inherits(res, "try-error"))


message("*** availableWorkers() ... DONE")

source("incl/end.R")

0 comments on commit 72f8884

Please sign in to comment.