diff --git a/Project.toml b/Project.toml index a5033c2..cf215d1 100644 --- a/Project.toml +++ b/Project.toml @@ -6,12 +6,14 @@ version = "0.4.8" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f" +SlurmClusterManager = "c82cd089-7bf7-41d7-976b-6b5d413cbe0a" Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" [compat] Distributed = "< 0.0.1, 1" Logging = "< 0.0.1, 1" Pkg = "< 0.0.1, 1" +SlurmClusterManager = "0.1.3" Sockets = "< 0.0.1, 1" julia = "1.2" diff --git a/src/ClusterManagers.jl b/src/ClusterManagers.jl index 43b4860..b3912a8 100755 --- a/src/ClusterManagers.jl +++ b/src/ClusterManagers.jl @@ -4,16 +4,23 @@ using Distributed using Sockets using Pkg +import SlurmClusterManager + export launch, manage, kill, init_worker, connect import Distributed: launch, manage, kill, init_worker, connect +# Bring some other names into scope, just for convenience: +using Distributed: addprocs + + worker_cookie() = begin Distributed.init_multi(); cluster_cookie() end worker_arg() = `--worker=$(worker_cookie())` - # PBS doesn't have the same semantics as SGE wrt to file accumulate, # a different solution will have to be found include("qsub.jl") + +include("auto_detect.jl") include("scyld.jl") include("condor.jl") include("slurm.jl") diff --git a/src/auto_detect.jl b/src/auto_detect.jl new file mode 100644 index 0000000..da20912 --- /dev/null +++ b/src/auto_detect.jl @@ -0,0 +1,103 @@ +function addprocs_autodetect_current_scheduler(; kwargs...) + sched = _autodetect_is_slurm() + if sched == :slurm + return addprocs(SlurmClusterManager.SlurmManager(); kwargs...) + elseif sched == :sge + np = _sge_get_number_of_tasks() + return addprocs_sge(np; kwargs...) + elseif sched == :pbs + np = _torque_get_numtasks() + return addprocs_pbs(np; kwargs...) + end + error("Unable to auto-detect cluster scheduler: $(sched)") +end + +function autodetect_current_scheduler() + if _autodetect_is_slurm() + return :slurm + elseif _autodetect_is_sge() + return :sge + elseif _autodetect_is_pbs() + return :pbs + end + return nothing +end + +##### Slurm: + +function _autodetect_is_slurm() + has_SLURM_JOB_ID = _has_env_nonempty("SLURM_JOB_ID") + has_SLURM_JOBID = _has_env_nonempty("SLURM_JOBID") + res = has_SLURM_JOB_ID || has_SLURM_JOBID + return res +end + +##### SGE (Sun Grid Engine): + +function _autodetect_is_sge() + # https://docs.oracle.com/cd/E19957-01/820-0699/chp4-21/index.html + has_SGE_O_HOST = _has_env_nonempty("SGE_O_HOST") + return has_SGE_O_HOST + + # Important note: + # The "job ID" environment variable in SGE is just named `JOB_ID`. + # This is obviously too vague, because the variable name is not specific to SGE. + # Therefore, we can't use that variable for our SGE auto-detection. +end + +function _sge_get_numtasks() + msg = "Because this is Sun Grid Engine (SGE), ClusterManagers.jl is not able " * + "to correctly auto-detect the number of tasks. " * + "Therefore, ClusterManagers.jl will instead use the value of the " * + "NHOSTS environment variable: $(np)" + @warn msg + + # https://docs.oracle.com/cd/E19957-01/820-0699/chp4-21/index.html + name = "NHOSTS" + value_int = _getenv_parse_int(name) + return value_int +end + +##### PBS and Torque: + +function _autodetect_is_pbs() + # https://docs.adaptivecomputing.com/torque/2-5-12/help.htm#topics/2-jobs/exportedBatchEnvVar.htm + has_PBS_JOBID = _has_env_nonempty("PBS_JOBID") + return has_PBS_JOBID +end + +function _torque_get_numtasks() + # https://docs.adaptivecomputing.com/torque/2-5-12/help.htm#topics/2-jobs/exportedBatchEnvVar.htm + name = "PBS_TASKNUM" + value_int = _getenv_parse_int(name) + return value_int + + @info "Using auto-detected num_tasks: $(np)" +end + +##### General utility functions: + +function _getenv_parse_int(name::AbstractString) + if !haskey(ENV, name) + msg = "Environment variable is not defined: $(name)" + error(msg) + end + original_value = ENV[name] + if isempty(original_value) + msg = "Environment variable is defined, but is empty: $(name)" + error(msg) + end + stripped_value_str = strip(original_value) + if isempty(stripped_value) + msg = "Environment variable is defined, but contains only whitespace: $(name)" + error(msg) + end + value_int = tryparse(Int, stripped_value_str) + if !(value_int isa Int) + msg = + "Environment variable \"$(name)\" is defined, " * + "but its value \"$(stripped_value_str)\" could not be parsed as an integer." + error(msg) + end + return value_int +end diff --git a/src/qsub.jl b/src/qsub.jl index 12f714c..e9feadd 100644 --- a/src/qsub.jl +++ b/src/qsub.jl @@ -41,7 +41,7 @@ function launch(manager::Union{PBSManager, SGEManager, QRSHManager}, for i in 1:np config = WorkerConfig() config.io, io_proc = stream_proc[i] - config.userdata = Dict{Symbol, Any}(:task => i, + config.userdata = Dict{Symbol, Any}(:task => i, :process => io_proc) push!(instances_arr, config) notify(c)