From 56300191fdda3567d5f3a57940de2ff64f3eba45 Mon Sep 17 00:00:00 2001 From: Henrik Bengtsson Date: Fri, 18 Sep 2020 14:30:14 -0700 Subject: [PATCH] availableWorkers() now supports LSF/OpenLava [#118] --- NEWS | 5 +++-- R/availableWorkers.R | 10 +++++++++- R/options.R | 2 +- man/availableWorkers.Rd | 5 ++++- man/future.options.Rd | 2 +- tests/availableCores.R | 11 +++++++++++ tests/availableWorkers.R | 14 +++++++++++++- 7 files changed, 42 insertions(+), 7 deletions(-) diff --git a/NEWS b/NEWS index 68883b58..8d422a9d 100644 --- a/NEWS +++ b/NEWS @@ -37,8 +37,9 @@ NEW FEATURES: --slurm-cpus-per-task=n' is specified and SLURM_JOB_NUM_NODES=1, then it falls back to using 'SLURM_CPUS_ON_NODE', e.g. when using '--ntasks=n'. - * Now availableCores() supports LSF/OpenLava. Specifically, environment - variable 'LSB_DJOB_NUMPROC' is acknowledged. + * Now availableCores() and availableWorkers() supports LSF/OpenLava. + Specifically, they acknowledge environment variable 'LSB_DJOB_NUMPROC' + and 'LSB_HOSTS', respectively. PERFORMANCE: diff --git a/R/availableWorkers.R b/R/availableWorkers.R index 32261860..2e9b889e 100644 --- a/R/availableWorkers.R +++ b/R/availableWorkers.R @@ -38,6 +38,8 @@ #' An example of a job submission that results in this is #' `qsub -pe mpi 8` (or `qsub -pe ompi 8`), which #' requests eight cores on a any number of machines. +#' \item `"LSF"` - +#' Query LSF/OpenLava environment variable \env{LSB_HOSTS}. #' \item `"custom"` - #' If option \option{future.availableWorkers.custom} is set and a function, #' then this function will be called (without arguments) and it's value @@ -52,7 +54,7 @@ #' @importFrom utils file_test #' @export #' @keywords internal -availableWorkers <- function(methods = getOption("future.availableWorkers.methods", c("mc.cores", "_R_CHECK_LIMIT_CORES_", "PBS", "SGE", "Slurm", "custom", "system", "fallback")), na.rm = TRUE, default = "localhost", which = c("auto", "min", "max", "all")) { +availableWorkers <- function(methods = getOption("future.availableWorkers.methods", c("mc.cores", "_R_CHECK_LIMIT_CORES_", "PBS", "SGE", "Slurm", "LSF", "custom", "system", "fallback")), na.rm = TRUE, default = "localhost", which = c("auto", "min", "max", "all")) { ## Local functions getenv <- function(name) { as.character(trim(Sys.getenv(name, NA_character_))) @@ -135,6 +137,12 @@ availableWorkers <- function(methods = getOption("future.availableWorkers.method ## TODO: Parse 'data' into a hostnames /HB 2020-09-18 ## ... next + } else if (method == "LSF") { + data <- getenv("LSB_HOSTS") + if (is.na(data)) next + w <- strsplit(data, split = " ", fixed = TRUE)[[1]] + w <- w[nzchar(w)] + if (length(w) == 0L) next } else if (method == "custom") { fcn <- getOption("future.availableWorkers.custom", NULL) if (!is.function(fcn)) next diff --git a/R/options.R b/R/options.R index 0e0bb1ae..63fdbc40 100644 --- a/R/options.R +++ b/R/options.R @@ -54,7 +54,7 @@ #' #' \item{\option{future.availableCores.system}:}{(integer) Number of "system" cores used instead of what is reported by \code{\link{availableCores}(which = "system")}. If not specified, this option is set according to system environment variable \env{R_FUTURE_AVAILABLECORES_SYSTEM} when the \pkg{future} package is _loaded_. This option allows you to effectively override what `parallel::detectCores()` reports the system has.} #' -#' \item{\option{future.availableWorkers.methods}:}{(character vector) Default lookup methods for [availableWorkers()]. (Default: `c("mc.cores", "_R_CHECK_LIMIT_CORES_", "PBS", "SGE", "Slurm", "custom", "system", "fallback")`)} +#' \item{\option{future.availableWorkers.methods}:}{(character vector) Default lookup methods for [availableWorkers()]. (Default: `c("mc.cores", "_R_CHECK_LIMIT_CORES_", "PBS", "SGE", "Slurm", "LSF", "custom", "system", "fallback")`)} #' #' \item{\option{future.availableWorkers.custom}:}{(function) If set and a function, then this function will be called (without arguments) by [availableWorkers()] where its value, coerced to a character vector, is interpreted as hostnames of available workers.} #' diff --git a/man/availableWorkers.Rd b/man/availableWorkers.Rd index 4270938a..2973df0e 100644 --- a/man/availableWorkers.Rd +++ b/man/availableWorkers.Rd @@ -6,7 +6,8 @@ \usage{ availableWorkers( methods = getOption("future.availableWorkers.methods", c("mc.cores", - "_R_CHECK_LIMIT_CORES_", "PBS", "SGE", "Slurm", "custom", "system", "fallback")), + "_R_CHECK_LIMIT_CORES_", "PBS", "SGE", "Slurm", "LSF", "custom", "system", + "fallback")), na.rm = TRUE, default = "localhost", which = c("auto", "min", "max", "all") @@ -55,6 +56,8 @@ Query Sun/Oracle Grid Engine (SGE) environment variable An example of a job submission that results in this is \verb{qsub -pe mpi 8} (or \verb{qsub -pe ompi 8}), which requests eight cores on a any number of machines. +\item \code{"LSF"} - +Query LSF/OpenLava environment variable \env{LSB_HOSTS}. \item \code{"custom"} - If option \option{future.availableWorkers.custom} is set and a function, then this function will be called (without arguments) and it's value diff --git a/man/future.options.Rd b/man/future.options.Rd index d9ec677f..89f83c73 100644 --- a/man/future.options.Rd +++ b/man/future.options.Rd @@ -94,7 +94,7 @@ If this option is not specified, environment variable \env{R_FUTURE_STARTUP_SCRI \item{\option{future.availableCores.system}:}{(integer) Number of "system" cores used instead of what is reported by \code{\link{availableCores}(which = "system")}. If not specified, this option is set according to system environment variable \env{R_FUTURE_AVAILABLECORES_SYSTEM} when the \pkg{future} package is \emph{loaded}. This option allows you to effectively override what \code{parallel::detectCores()} reports the system has.} -\item{\option{future.availableWorkers.methods}:}{(character vector) Default lookup methods for \code{\link[=availableWorkers]{availableWorkers()}}. (Default: \code{c("mc.cores", "_R_CHECK_LIMIT_CORES_", "PBS", "SGE", "Slurm", "custom", "system", "fallback")})} +\item{\option{future.availableWorkers.methods}:}{(character vector) Default lookup methods for \code{\link[=availableWorkers]{availableWorkers()}}. (Default: \code{c("mc.cores", "_R_CHECK_LIMIT_CORES_", "PBS", "SGE", "Slurm", "LSF", "custom", "system", "fallback")})} \item{\option{future.availableWorkers.custom}:}{(function) If set and a function, then this function will be called (without arguments) by \code{\link[=availableWorkers]{availableWorkers()}} where its value, coerced to a character vector, is interpreted as hostnames of available workers.} diff --git a/tests/availableCores.R b/tests/availableCores.R index c373c312..2d458966 100644 --- a/tests/availableCores.R +++ b/tests/availableCores.R @@ -30,6 +30,7 @@ stopifnot(length(n) == 1, is.numeric(n), is.finite(n), n >= 1) print(availableCores(methods = "PBS")) print(availableCores(methods = "SGE")) print(availableCores(methods = "Slurm")) +print(availableCores(methods = "LSF")) ## Any R options and system environment variable print(availableCores(methods = c("width", "FOO_BAR_ENV"), @@ -41,6 +42,16 @@ res <- try(availableCores(methods = "FOO_BAR_ENV"), silent = TRUE) stopifnot(inherits(res, "try-error")) +ncores0 <- 42L + +message("*** LSF ...") +Sys.setenv(LSB_DJOB_NUMPROC = as.character(ncores0)) +ncores <- availableCores(methods = "LSF") +print(ncores) +stopifnot(ncores == ncores0) +message("*** LSF ... done") + + message("*** Internal detectCores() ...") ## Option 'future.availableCores.system' diff --git a/tests/availableWorkers.R b/tests/availableWorkers.R index 5066c11f..9edd9fda 100644 --- a/tests/availableWorkers.R +++ b/tests/availableWorkers.R @@ -25,6 +25,7 @@ stopifnot(is.character(w), length(w) >= 1) print(availableWorkers(methods = "PBS")) print(availableWorkers(methods = "SGE")) print(availableWorkers(methods = "Slurm")) +print(availableWorkers(methods = "LSF")) @@ -39,6 +40,18 @@ data0 <- as.data.frame(table(workers0), stringsAsFactors = FALSE) colnames(data0) <- c("node", "count") data0 <- data0[order(data0$node, data0$count), ] + +message("*** LSF ...") + +Sys.setenv(LSB_HOSTS = paste(workers0, collapse = " ")) +workers <- availableWorkers(methods = "LSF") +print(workers) +stopifnot(length(workers) == length(workers0)) + +message("*** LSF ... done") + + + message("*** read_pbs_nodefile() ...") workers <- workers0 @@ -89,7 +102,6 @@ res <- tryCatch({ }, warning = identity) stopifnot(inherits(res, "warning")) - message("*** read_pbs_nodefile() ... DONE")