diff --git a/NAMESPACE b/NAMESPACE index 53154a0f..df1201ac 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -73,6 +73,7 @@ export(MultisessionFuture) export(UniprocessFuture) export(as.cluster) export(availableCores) +export(availableWorkers) export(backtrace) export(cluster) export(eager) diff --git a/NEWS b/NEWS index e14b74fc..0d5ef63b 100644 --- a/NEWS +++ b/NEWS @@ -2,6 +2,7 @@ Package: future =============== Version: 1.2.0-9000 [2017-01-05] +o Added availableWorkers(), cf. availableCores(). o Added support for controlling whether a future is resolved eagerly or lazily when creating the future, e.g. future(..., lazy = TRUE) futureAssign(..., lazy = TRUE), and x %<-% { ... } %lazy% TRUE. diff --git a/R/availableCores.R b/R/availableCores.R index 6db7be38..34c9fa8a 100644 --- a/R/availableCores.R +++ b/R/availableCores.R @@ -78,6 +78,10 @@ #' setting the number of workers when specifying the future strategy, #' e.g. \code{plan(multiprocess, workers=9)}. #' +#' @seealso +#' To get the number of available workers regardless of machine, +#' see \code{\link{availableWorkers}()}. +#' #' @export #' @keywords internal availableCores <- function(constraints=NULL, methods=getOption("future.availableCores.methods", c("system", "mc.cores+1", "_R_CHECK_LIMIT_CORES_", "PBS", "SGE", "Slurm")), na.rm=TRUE, default=c(current=1L), which=c("min", "max", "all")) { diff --git a/R/availableWorkers.R b/R/availableWorkers.R new file mode 100644 index 00000000..0ea89b48 --- /dev/null +++ b/R/availableWorkers.R @@ -0,0 +1,122 @@ +#' Get set of available workers +#' +#' @param methods A character vector specifying how to infer the number +#' of available cores. +#' +#' @param na.rm If TRUE, only non-missing settings are considered/returned. +#' +#' @param default The default set of workers. +#' +#' @param which A character specifying which set / sets to return. +#' If \code{"first"}, the first non-empty set found. +#' If \code{"min"}, the minimum value is returned. +#' If \code{"max"}, the maximum value is returned (be careful!) +#' If \code{"all"}, all values are returned.#' +#' +#' @return Return a character vector of workers, which typically consists +#' of names of machines / compute nodes, but may also be IP numbers. +#' +#' @details +#' The default set of workers for each method is +#' \code{rep("localhost", times = availableCores(method))}, which means +#' that each will at least use as many parallel workers on the current +#' machine that \code{\link{availableCores}()} allows for that method. +#' +#' In addition, the following settings ("methods") are also acknowledged: +#' \itemize{ +#' \item \code{"PBS"} - +#' Query Torque/PBS environment variable \env{PBS_NODEFILE}. +#' If this is set and specifies an existing file, then the set +#' of workers is read from that file, where one worker (node) +#' is given per line. +#' } +#' +#' @seealso +#' To get the number of available workers on the current machine, +#' see \code{\link{availableCores}()}. +#' +#' @importFrom utils file_test +#' @export +#' @keywords internal +availableWorkers <- function(methods=getOption("future.availableCores.methods", c("PBS", "SGE", "Slurm", "_R_CHECK_LIMIT_CORES_", "mc.cores+1", "system")), na.rm=TRUE, default="localhost", which=c("first", "min", "max", "all")) { + ## Local functions + getenv <- function(name) { + as.character(trim(Sys.getenv(name, NA_character_))) + } # getenv() + + getopt <- function(name) { + as.character(getOption(name, NA_character_)) + } # getopt() + + split <- function(s) { + x <- unlist(strsplit(s, split = "[,]", fixed = FALSE), use.names = FALSE) + x <- trim(x) + x <- x[nzchar(x)] + x + } + + which <- match.arg(which) + stopifnot(is.character(default), length(default) >= 1, !anyNA(default)) + + + ## Default is to use the current machine + ncores <- availableCores(methods = methods, na.rm = FALSE, which = "all") + workers <- lapply(ncores, FUN = function(n) { + if (length(n) == 0 || is.na(n)) n <- 0L + rep("localhost", times = n) + }) + + ## Acknowledge known HPC settings + for (method in methods) { + if (method == "PBS") { + pathname <- getenv("PBS_NODEFILE") + if (is.na(pathname)) next + if (!file_test("-f", pathname)) { + warning(sprintf("Environent variable %s was set but no such file %s exists", sQuote("PBS_NODEFILE"), sQuote(pathname))) + next + } + ## One node per line + w <- readLines(pathname) + } else { + ## Fall back to querying option and system environment variable + ## with the given name + w <- getopt(method) + if (is.na(w)) w <- getenv(method) + if (is.na(w)) next + w <- split(w) + } + + ## Drop missing or empty values? + if (na.rm) w <- w[!is.na(w)] + + workers[[method]] <- w + } + + nnodes <- unlist(lapply(workers, FUN = length), use.names = FALSE) + if (which == "first") { + idx <- which(is.finite(nnodes) & nnodes > 0L, useNames = FALSE)[1] + workers <- if (is.na(idx)) character(0L) else workers[[idx]] + } else if (which == "min") { + idx <- which.min(nnodes) + workers <- workers[[idx]] + } else if (which == "max") { + idx <- which.max(nnodes) + workers <- workers[[idx]] + } + + ## Fall back to default? + if (is.character(workers) && length(workers) == 0) workers <- default + + ## Sanity checks + min_count <- as.integer(na.rm) + if (is.list(workers)) { + lapply(workers, FUN = function(w) { + stopifnot(is.character(w), length(w) >= 0L, all(nchar(w) > 0)) + }) + } else { + stopifnot(is.character(workers), length(workers) >= min_count, all(nchar(workers) > 0)) + } + + workers +} # availableWorkers() + diff --git a/man/availableCores.Rd b/man/availableCores.Rd index 62f8e002..cd9c2c8f 100644 --- a/man/availableCores.Rd +++ b/man/availableCores.Rd @@ -27,7 +27,7 @@ settings are available.} \item{which}{A character specifying which settings to return. If \code{"min"}, the minimum value is returned. If \code{"max"}, the maximum value is returned (be careful!) -If \code{"all"}, all values are returned.} +If \code{"all"}, all values are returned.#'} } \value{ Return a positive (>=1) integer. @@ -91,5 +91,9 @@ Having said this, it is almost always better to do this by explicitly setting the number of workers when specifying the future strategy, e.g. \code{plan(multiprocess, workers=9)}. } +\seealso{ +To get the number of available workers regardless of machine, +see \code{\link{availableWorkers}()}. +} \keyword{internal} diff --git a/man/availableWorkers.Rd b/man/availableWorkers.Rd new file mode 100644 index 00000000..1379d478 --- /dev/null +++ b/man/availableWorkers.Rd @@ -0,0 +1,53 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/availableWorkers.R +\name{availableWorkers} +\alias{availableWorkers} +\title{Get set of available workers} +\usage{ +availableWorkers(methods = getOption("future.availableCores.methods", c("PBS", + "SGE", "Slurm", "_R_CHECK_LIMIT_CORES_", "mc.cores+1", "system")), + na.rm = TRUE, default = "localhost", which = c("first", "min", "max", + "all")) +} +\arguments{ +\item{methods}{A character vector specifying how to infer the number +of available cores.} + +\item{na.rm}{If TRUE, only non-missing settings are considered/returned.} + +\item{default}{The default set of workers.} + +\item{which}{A character specifying which set / sets to return. +If \code{"first"}, the first non-empty set found. +If \code{"min"}, the minimum value is returned. +If \code{"max"}, the maximum value is returned (be careful!) +If \code{"all"}, all values are returned.#'} +} +\value{ +Return a character vector of workers, which typically consists +of names of machines / compute nodes, but may also be IP numbers. +} +\description{ +Get set of available workers +} +\details{ +The default set of workers for each method is +\code{rep("localhost", times = availableCores(method))}, which means +that each will at least use as many parallel workers on the current +machine that \code{\link{availableCores}()} allows for that method. + +In addition, the following settings ("methods") are also acknowledged: +\itemize{ + \item \code{"PBS"} - + Query Torque/PBS environment variable \env{PBS_NODEFILE}. + If this is set and specifies an existing file, then the set + of workers is read from that file, where one worker (node) + is given per line. +} +} +\seealso{ +To get the number of available workers on the current machine, +see \code{\link{availableCores}()}. +} +\keyword{internal} + diff --git a/tests/availableWorkers.R b/tests/availableWorkers.R new file mode 100644 index 00000000..b2a636cf --- /dev/null +++ b/tests/availableWorkers.R @@ -0,0 +1,52 @@ +source("incl/start.R") + +message("*** availableWorkers() ...") + +## The default +w <- availableWorkers() +print(w) +stopifnot(is.character(w), length(w) >= 1) + +## Minimium of all known settings (default) +print(availableWorkers(which="min")) + +## Maximum of all known settings (should never be used) +print(availableWorkers(which="max")) + +## All known settings +print(availableWorkers(na.rm=FALSE, which="all")) + +## System settings +w <- availableWorkers(methods="system") +print(w) +stopifnot(is.character(w), length(w) >= 1) + +## Predefined ones for known cluster schedulers +print(availableWorkers(methods="PBS")) +print(availableWorkers(methods="SGE")) +print(availableWorkers(methods="Slurm")) + +## PBS settings +pathname <- tempfile() +workers0 <- c("n1", "n2", "n3", "n1", "n6") +writeLines(workers0, con = pathname) +Sys.setenv(PBS_NODEFILE = pathname) +workers <- availableWorkers(methods="PBS") +print(workers) +stopifnot(length(workers) == length(workers0), all(workers == workers0)) + + +## Any R options and system environment variable +print(availableWorkers(methods=c("width", "FOO_BAR_ENV"), + na.rm=FALSE, which="all")) + +## Exception handling +Sys.setenv("FOO_BAR_ENV"="0") +res <- try(availableWorkers(methods="FOO_BAR_ENV"), silent=TRUE) +stopifnot(inherits(res, "try-error")) + + +message("*** availableWorkers() ... DONE") + +source("incl/end.R") +