diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy deleted file mode 100644 index df4f02f..0000000 --- a/plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Copyright 2023-, Stellenbosch University, South Africa - * Copyright 2024, Evaluacion y Desarrollo de Negocios, Spain - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package nextflow.nomad - -import groovy.transform.CompileStatic -import groovy.util.logging.Slf4j -import nextflow.nomad.config.AffinitySpec -import nextflow.nomad.config.ConstraintSpec -import nextflow.nomad.config.VolumeSpec - -/** - * Nomad Config - * - * @author Jorge Aguilera - */ - -@Slf4j -@CompileStatic -class NomadConfig { - final static protected API_VERSION = "v1" - - final NomadClientOpts clientOpts - final NomadJobOpts jobOpts - final NomadDebug debug - - NomadConfig(Map nomadConfigMap) { - clientOpts = new NomadClientOpts((nomadConfigMap?.client ?: Collections.emptyMap()) as Map) - jobOpts = new NomadJobOpts((nomadConfigMap?.jobs ?: Collections.emptyMap()) as Map) - debug = new NomadDebug((nomadConfigMap?.debug ?: Collections.emptyMap()) as Map) - } - - class NomadClientOpts{ - final String address - final String token - - NomadClientOpts(Map nomadClientOpts){ - def tmp = (nomadClientOpts.address?.toString() ?: "http://127.0.0.1:4646") - if( !tmp.endsWith("/")) - tmp +="/" - this.address = tmp + API_VERSION - token = nomadClientOpts.token ?: null - } - } - - class NomadJobOpts{ - final boolean deleteOnCompletion - final List datacenters - final String region - final String namespace - final String dockerVolume - final VolumeSpec[] volumeSpec - final AffinitySpec affinitySpec - final ConstraintSpec constraintSpec - - NomadJobOpts(Map nomadJobOpts){ - deleteOnCompletion = nomadJobOpts.containsKey("deleteOnCompletion") ? - nomadJobOpts.deleteOnCompletion : false - if( nomadJobOpts.containsKey("datacenters") ) { - datacenters = ((nomadJobOpts.datacenters instanceof List ? - nomadJobOpts.datacenters : nomadJobOpts.datacenters.toString().split(",")) - as List).findAll{it.size()}.unique() - }else{ - datacenters = [] - } - region = nomadJobOpts.region ?: null - namespace = nomadJobOpts.namespace ?: null - dockerVolume = nomadJobOpts.dockerVolume ?: null - if( dockerVolume ){ - log.info "dockerVolume config will be deprecated, use volume type:'docker' name:'name' instead" - } - - this.volumeSpec = parseVolumes(nomadJobOpts) - this.affinitySpec = parseAffinity(nomadJobOpts) - this.constraintSpec = parseConstraint(nomadJobOpts) - } - - VolumeSpec[] parseVolumes(Map nomadJobOpts){ - List ret = [] - if( nomadJobOpts.volume && nomadJobOpts.volume instanceof Closure){ - def volumeSpec = new VolumeSpec() - def closure = (nomadJobOpts.volume as Closure) - def clone = closure.rehydrate(volumeSpec, closure.owner, closure.thisObject) - clone.resolveStrategy = Closure.DELEGATE_FIRST - clone() - volumeSpec.workDir(true) - ret.add volumeSpec - } - - if( nomadJobOpts.volumes && nomadJobOpts.volumes instanceof List){ - nomadJobOpts.volumes.each{ closure -> - if( closure instanceof Closure){ - def volumeSpec = new VolumeSpec() - def clone = closure.rehydrate(volumeSpec, closure.owner, closure.thisObject) - clone.resolveStrategy = Closure.DELEGATE_FIRST - clone() - ret.add volumeSpec - } - } - } - - if( ret.size() && !ret.find{ it.workDir } ){ - ret.first().workDir(true) - } - - ret*.validate() - - if( ret.findAll{ it.workDir}.size() > 1 ){ - throw new IllegalArgumentException("No more than a workdir volume allowed") - } - - return ret as VolumeSpec[] - } - - AffinitySpec parseAffinity(Map nomadJobOpts) { - if (nomadJobOpts.affinity && nomadJobOpts.affinity instanceof Closure) { - def affinitySpec = new AffinitySpec() - def closure = (nomadJobOpts.affinity as Closure) - def clone = closure.rehydrate(affinitySpec, closure.owner, closure.thisObject) - clone.resolveStrategy = Closure.DELEGATE_FIRST - clone() - affinitySpec.validate() - affinitySpec - } else { - null - } - } - - ConstraintSpec parseConstraint(Map nomadJobOpts){ - if (nomadJobOpts.constraint && nomadJobOpts.constraint instanceof Closure) { - def constraintSpec = new ConstraintSpec() - def closure = (nomadJobOpts.constraint as Closure) - def clone = closure.rehydrate(constraintSpec, closure.owner, closure.thisObject) - clone.resolveStrategy = Closure.DELEGATE_FIRST - clone() - constraintSpec.validate() - constraintSpec - } else { - null - } - } - } - - static class NomadDebug { - - @Delegate - Map target - - NomadDebug(Map debug) { - this.target = debug ?: Collections.emptyMap() - } - - boolean getJson() { Boolean.valueOf( target.json as String ) } - } -} diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/config/AffinitySpec.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/config/AffinitySpec.groovy index 0acee10..7d8277e 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/config/AffinitySpec.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/config/AffinitySpec.groovy @@ -1,5 +1,26 @@ -package nextflow.nomad.config +/* + * Copyright 2023-, Stellenbosch University, South Africa + * Copyright 2024, Evaluacion y Desarrollo de Negocios, Spain + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nextflow.nomad.config +/** + * Nomad Job Affinity Spec + * + * @author Jorge Aguilera + */ class AffinitySpec{ private String attribute diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/config/ConstraintSpec.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/config/ConstraintSpec.groovy index 905b956..266bfd2 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/config/ConstraintSpec.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/config/ConstraintSpec.groovy @@ -1,4 +1,26 @@ +/* + * Copyright 2023-, Stellenbosch University, South Africa + * Copyright 2024, Evaluacion y Desarrollo de Negocios, Spain + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package nextflow.nomad.config +/** + * Nomad Job Constraint Spec + * + * @author Jorge Aguilera + */ class ConstraintSpec { diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/config/NomadClientOpts.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/config/NomadClientOpts.groovy new file mode 100644 index 0000000..a990c45 --- /dev/null +++ b/plugins/nf-nomad/src/main/nextflow/nomad/config/NomadClientOpts.groovy @@ -0,0 +1,56 @@ +/* + * Copyright 2023-, Stellenbosch University, South Africa + * Copyright 2024, Evaluacion y Desarrollo de Negocios, Spain + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nextflow.nomad.config + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j + + +/** + * Nomad Config + * + * @author Jorge Aguilera + * @author Abhinav Sharma + */ + +@Slf4j +@CompileStatic +class NomadClientOpts{ + + final static protected API_VERSION = "v1" + + private Map sysEnv + + final String address + final String token + + NomadClientOpts(Map nomadClientOpts, Map env=null){ + assert nomadClientOpts!=null + + sysEnv = env==null ? new HashMap(System.getenv()) : env + + def tmp = (nomadClientOpts.address?.toString() ?: sysEnv.get('NOMAD_ADDR')) + + if( !tmp.endsWith("/")) + tmp +="/" + this.address = tmp + API_VERSION + this.token = nomadClientOpts.token ?: sysEnv.get('NOMAD_TOKEN') + + //TODO: Add mTLS properties and env vars + // https://developer.hashicorp.com/nomad/docs/commands#mtls-environment-variables + } +} \ No newline at end of file diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/config/NomadConfig.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/config/NomadConfig.groovy new file mode 100644 index 0000000..e3a1736 --- /dev/null +++ b/plugins/nf-nomad/src/main/nextflow/nomad/config/NomadConfig.groovy @@ -0,0 +1,62 @@ +/* + * Copyright 2023-, Stellenbosch University, South Africa + * Copyright 2024, Evaluacion y Desarrollo de Negocios, Spain + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nextflow.nomad.config + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j + + +/** + * Nomad Config + * + * @author Jorge Aguilera + * @author Abhinav Sharma + */ + +@Slf4j +@CompileStatic +class NomadConfig { + + private NomadClientOpts clientOpts + private NomadJobOpts jobOpts + private NomadDebug debug + + NomadConfig(Map nomadConfigMap) { + this.clientOpts = new NomadClientOpts((nomadConfigMap?.client ?: Collections.emptyMap()) as Map) + this.jobOpts = new NomadJobOpts((nomadConfigMap?.jobs ?: Collections.emptyMap()) as Map) + this.debug = new NomadDebug((nomadConfigMap?.debug ?: Collections.emptyMap()) as Map) + } + + + NomadClientOpts clientOpts() { clientOpts } + + NomadJobOpts jobOpts() { jobOpts } + + NomadDebug debug() { debug } + + static class NomadDebug { + + @Delegate + Map target + + NomadDebug(Map debug) { + this.target = debug ?: Collections.emptyMap() + } + + boolean getJson() { Boolean.valueOf( target.json as String ) } + } +} diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/config/NomadJobOpts.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/config/NomadJobOpts.groovy new file mode 100644 index 0000000..4c918bb --- /dev/null +++ b/plugins/nf-nomad/src/main/nextflow/nomad/config/NomadJobOpts.groovy @@ -0,0 +1,136 @@ +/* + * Copyright 2023-, Stellenbosch University, South Africa + * Copyright 2024, Evaluacion y Desarrollo de Negocios, Spain + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.nomad.config + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j + + +/** + * Nomad JobOpts + * + * @author Jorge Aguilera + * @author Abhinav Sharma + */ +@Slf4j +@CompileStatic +class NomadJobOpts{ + private Map sysEnv + + boolean deleteOnCompletion + List datacenters + String region + String namespace + String dockerVolume + VolumeSpec[] volumeSpec + AffinitySpec affinitySpec + ConstraintSpec constraintSpec + + NomadJobOpts(Map nomadJobOpts, Map env=null){ + assert nomadJobOpts!=null + + sysEnv = env==null ? new HashMap(System.getenv()) : env + + deleteOnCompletion = nomadJobOpts.containsKey("deleteOnCompletion") ? + nomadJobOpts.deleteOnCompletion : false + if( nomadJobOpts.containsKey("datacenters") ) { + datacenters = ((nomadJobOpts.datacenters instanceof List ? + nomadJobOpts.datacenters : nomadJobOpts.datacenters.toString().split(",")) + as List).findAll{it.size()}.unique() + }else{ + datacenters = List.of(sysEnv.get('NOMAD_DC')) + } + + region = nomadJobOpts.region ?: sysEnv.get('NOMAD_REGION') + namespace = nomadJobOpts.namespace ?: sysEnv.get('NOMAD_NAMESPACE') + + dockerVolume = nomadJobOpts.dockerVolume ?: null + if( dockerVolume ){ + log.info "dockerVolume config will be deprecated, use volume type:'docker' name:'name' instead" + } + + this.volumeSpec = parseVolumes(nomadJobOpts) + this.affinitySpec = parseAffinity(nomadJobOpts) + this.constraintSpec = parseConstraint(nomadJobOpts) + } + + VolumeSpec[] parseVolumes(Map nomadJobOpts){ + List ret = [] + if( nomadJobOpts.volume && nomadJobOpts.volume instanceof Closure){ + def volumeSpec = new VolumeSpec() + def closure = (nomadJobOpts.volume as Closure) + def clone = closure.rehydrate(volumeSpec, closure.owner, closure.thisObject) + clone.resolveStrategy = Closure.DELEGATE_FIRST + clone() + volumeSpec.workDir(true) + ret.add volumeSpec + } + + if( nomadJobOpts.volumes && nomadJobOpts.volumes instanceof List){ + nomadJobOpts.volumes.each{ closure -> + if( closure instanceof Closure){ + def volumeSpec = new VolumeSpec() + def clone = closure.rehydrate(volumeSpec, closure.owner, closure.thisObject) + clone.resolveStrategy = Closure.DELEGATE_FIRST + clone() + ret.add volumeSpec + } + } + } + + if( ret.size() && !ret.find{ it.workDir } ){ + ret.first().workDir(true) + } + + ret*.validate() + + if( ret.findAll{ it.workDir}.size() > 1 ){ + throw new IllegalArgumentException("No more than a workdir volume allowed") + } + + return ret as VolumeSpec[] + } + + AffinitySpec parseAffinity(Map nomadJobOpts) { + if (nomadJobOpts.affinity && nomadJobOpts.affinity instanceof Closure) { + def affinitySpec = new AffinitySpec() + def closure = (nomadJobOpts.affinity as Closure) + def clone = closure.rehydrate(affinitySpec, closure.owner, closure.thisObject) + clone.resolveStrategy = Closure.DELEGATE_FIRST + clone() + affinitySpec.validate() + affinitySpec + } else { + null + } + } + + ConstraintSpec parseConstraint(Map nomadJobOpts){ + if (nomadJobOpts.constraint && nomadJobOpts.constraint instanceof Closure) { + def constraintSpec = new ConstraintSpec() + def closure = (nomadJobOpts.constraint as Closure) + def clone = closure.rehydrate(constraintSpec, closure.owner, closure.thisObject) + clone.resolveStrategy = Closure.DELEGATE_FIRST + clone() + constraintSpec.validate() + constraintSpec + } else { + null + } + } +} \ No newline at end of file diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/config/VolumeSpec.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/config/VolumeSpec.groovy index 6f55ced..bf8ff5c 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/config/VolumeSpec.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/config/VolumeSpec.groovy @@ -1,5 +1,26 @@ -package nextflow.nomad.config +/* + * Copyright 2023-, Stellenbosch University, South Africa + * Copyright 2024, Evaluacion y Desarrollo de Negocios, Spain + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nextflow.nomad.config +/** + * Nomad Volume Spec + * + * @author Jorge Aguilera + */ class VolumeSpec { final static public String VOLUME_DOCKER_TYPE = "docker" diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadExecutor.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadExecutor.groovy index d4e59bb..96a6b3b 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadExecutor.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadExecutor.groovy @@ -20,7 +20,7 @@ import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import nextflow.executor.Executor import nextflow.fusion.FusionHelper -import nextflow.nomad.NomadConfig +import nextflow.nomad.config.NomadConfig import nextflow.processor.TaskHandler import nextflow.processor.TaskMonitor import nextflow.processor.TaskPollingMonitor diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy index d69452d..b47b19e 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy @@ -22,11 +22,11 @@ import groovy.util.logging.Slf4j import io.nomadproject.client.ApiClient import io.nomadproject.client.api.JobsApi import io.nomadproject.client.model.* -import nextflow.nomad.NomadConfig +import nextflow.nomad.config.NomadConfig import nextflow.nomad.config.VolumeSpec import nextflow.processor.TaskRun import nextflow.util.MemoryUnit -import org.yaml.snakeyaml.Yaml +import nextflow.exception.ProcessSubmitException import java.nio.file.Path @@ -40,26 +40,27 @@ import java.nio.file.Path @CompileStatic class NomadService implements Closeable{ - private final NomadConfig config + NomadConfig config - private final JobsApi jobsApi + JobsApi jobsApi NomadService(NomadConfig config) { this.config = config + //TODO: Accommodate these connection level options in clientOpts() final CONNECTION_TIMEOUT_MILLISECONDS = 60000 final READ_TIMEOUT_MILLISECONDS = 60000 final WRITE_TIMEOUT_MILLISECONDS = 60000 ApiClient apiClient = new ApiClient( connectTimeout: CONNECTION_TIMEOUT_MILLISECONDS, readTimeout: READ_TIMEOUT_MILLISECONDS, writeTimeout: WRITE_TIMEOUT_MILLISECONDS) - apiClient.basePath = config.clientOpts.address - log.debug "[NOMAD] Client Address: ${config.clientOpts.address}" + apiClient.basePath = config.clientOpts().address + log.debug "[NOMAD] Client Address: ${config.clientOpts().address}" - if( config.clientOpts.token ){ - log.debug "[NOMAD] Client Token: ${config.clientOpts.token?.take(5)}.." - apiClient.apiKey = config.clientOpts.token + if( config.clientOpts().token ){ + log.debug "[NOMAD] Client Token: ${config.clientOpts().token?.take(5)}.." + apiClient.apiKey = config.clientOpts().token } - this.jobsApi = new JobsApi(apiClient); + this.jobsApi = new JobsApi(apiClient) } protected Resources getResources(TaskRun task) { @@ -86,15 +87,15 @@ class NomadService implements Closeable{ job.ID = id job.name = task.name job.type = "batch" - job.datacenters = this.config.jobOpts.datacenters - job.namespace = this.config.jobOpts.namespace + job.datacenters = this.config.jobOpts().datacenters + job.namespace = this.config.jobOpts().namespace job.taskGroups = [createTaskGroup(task, args, env)] assignDatacenters(task, job) - JobRegisterRequest jobRegisterRequest = new JobRegisterRequest(); - jobRegisterRequest.setJob(job); + JobRegisterRequest jobRegisterRequest = new JobRegisterRequest() + jobRegisterRequest.setJob(job) if( saveJsonPath ) try { saveJsonPath.text = job.toString() @@ -103,11 +104,18 @@ class NomadService implements Closeable{ log.debug "WARN: unable to save request json -- cause: ${e.message ?: e}" } - JobRegisterResponse jobRegisterResponse = jobsApi.registerJob(jobRegisterRequest, config.jobOpts.region, config.jobOpts.namespace, null, null) - jobRegisterResponse.evalID + + try { + JobRegisterResponse jobRegisterResponse = jobsApi.registerJob(jobRegisterRequest, config.jobOpts().region, config.jobOpts().namespace, null, null) + jobRegisterResponse.evalID + } catch (Throwable e) { + throw new ProcessSubmitException("[NOMAD] Failed to submit ${job.name} -- Cause: ${e.message ?: e}", e) + } + } TaskGroup createTaskGroup(TaskRun taskRun, List args, Mapenv){ + //NOTE: Force a single-allocation with no-retries per nomad job definition final TASK_RESCHEDULE_ATTEMPTS = 0 final TASK_RESTART_ATTEMPTS = 0 @@ -123,9 +131,9 @@ class NomadService implements Closeable{ ) - if( config.jobOpts.volumeSpec ) { + if( config.jobOpts().volumeSpec ) { taskGroup.volumes = [:] - config.jobOpts.volumeSpec.eachWithIndex { volumeSpec , idx-> + config.jobOpts().volumeSpec.eachWithIndex { volumeSpec , idx-> if (volumeSpec && volumeSpec.type == VolumeSpec.VOLUME_CSI_TYPE) { taskGroup.volumes["vol_${idx}".toString()] = new VolumeRequest( type: volumeSpec.type, @@ -177,19 +185,19 @@ class NomadService implements Closeable{ } protected Task volumes(TaskRun task, Task taskDef, String workingDir){ - if( config.jobOpts.dockerVolume){ + if( config.jobOpts().dockerVolume){ String destinationDir = workingDir.split(File.separator).dropRight(2).join(File.separator) taskDef.config.mount = [ type : "volume", target : destinationDir, - source : config.jobOpts.dockerVolume, + source : config.jobOpts().dockerVolume, readonly : false ] } - if( config.jobOpts.volumeSpec){ + if( config.jobOpts().volumeSpec){ taskDef.volumeMounts = [] - config.jobOpts.volumeSpec.eachWithIndex { volumeSpec, idx -> + config.jobOpts().volumeSpec.eachWithIndex { volumeSpec, idx -> String destinationDir = volumeSpec.workDir ? workingDir.split(File.separator).dropRight(2).join(File.separator) : volumeSpec.path taskDef.volumeMounts.add new VolumeMount( @@ -203,19 +211,19 @@ class NomadService implements Closeable{ } protected Task affinity(TaskRun task, Task taskDef) { - if (config.jobOpts.affinitySpec) { + if (config.jobOpts().affinitySpec) { def affinity = new Affinity() - if (config.jobOpts.affinitySpec.attribute) { - affinity.ltarget(config.jobOpts.affinitySpec.attribute) + if (config.jobOpts().affinitySpec.attribute) { + affinity.ltarget(config.jobOpts().affinitySpec.attribute) } - affinity.operand(config.jobOpts.affinitySpec.operator ?: "=") + affinity.operand(config.jobOpts().affinitySpec.operator ?: "=") - if (config.jobOpts.affinitySpec.value) { - affinity.rtarget(config.jobOpts.affinitySpec.value) + if (config.jobOpts().affinitySpec.value) { + affinity.rtarget(config.jobOpts().affinitySpec.value) } - if (config.jobOpts.affinitySpec.weight != null) { - affinity.weight(config.jobOpts.affinitySpec.weight) + if (config.jobOpts().affinitySpec.weight != null) { + affinity.weight(config.jobOpts().affinitySpec.weight) } taskDef.affinities([affinity]) } @@ -223,16 +231,16 @@ class NomadService implements Closeable{ } protected Task constrains(TaskRun task, Task taskDef){ - if( config.jobOpts.constraintSpec ){ + if( config.jobOpts().constraintSpec ){ def constraint = new Constraint() - if(config.jobOpts.constraintSpec.attribute){ - constraint.ltarget(config.jobOpts.constraintSpec.attribute) + if(config.jobOpts().constraintSpec.attribute){ + constraint.ltarget(config.jobOpts().constraintSpec.attribute) } - constraint.operand(config.jobOpts.constraintSpec.operator ?: "=") + constraint.operand(config.jobOpts().constraintSpec.operator ?: "=") - if(config.jobOpts.constraintSpec.value){ - constraint.rtarget(config.jobOpts.constraintSpec.value) + if(config.jobOpts().constraintSpec.value){ + constraint.rtarget(config.jobOpts().constraintSpec.value) } taskDef.constraints([constraint]) } @@ -258,8 +266,8 @@ class NomadService implements Closeable{ job } - String state(String jobId){ - List allocations = jobsApi.getJobAllocations(jobId, config.jobOpts.region, config.jobOpts.namespace, null, null, null, null, null, null, null, null) + String getJobState(String jobId){ + List allocations = jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null, null) AllocationListStub last = allocations?.sort{ it.modifyIndex }?.last() @@ -271,12 +279,14 @@ class NomadService implements Closeable{ boolean checkIfRunning(String jobId){ - Job job = jobsApi.getJob(jobId, config.jobOpts.region, config.jobOpts.namespace, null, null, null, null, null, null, null) + Job job = jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null) + log.debug "[NOMAD] checkIfRunning jobID=$job.ID; status=$job.status" job.status == "running" } - boolean checkIfCompleted(String jobId){ - Job job = jobsApi.getJob(jobId, config.jobOpts.region, config.jobOpts.namespace, null, null, null, null, null, null, null) + boolean checkIfDead(String jobId){ + Job job = jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null) + log.debug "[NOMAD] checkIfDead jobID=$job.ID; status=$job.status" job.status == "dead" } @@ -289,6 +299,13 @@ class NomadService implements Closeable{ } protected void purgeJob(String jobId, boolean purge){ - jobsApi.deleteJob(jobId,config.jobOpts.region, config.jobOpts.namespace,null,null,purge, true) + log.debug "[NOMAD] purgeJob with jobId=${jobId}" + jobsApi.deleteJob(jobId,config.jobOpts().region, config.jobOpts().namespace,null,null,purge, true) + } + + String getClientOfJob(String jobId) { + List allocations = jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null, null) + AllocationListStub jobAllocation = allocations.first() + return jobAllocation.nodeName } } diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadTaskHandler.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadTaskHandler.groovy index 31176ae..a0a7298 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadTaskHandler.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadTaskHandler.groovy @@ -16,19 +16,21 @@ */ package nextflow.nomad.executor -import groovy.transform.CompileDynamic + import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import nextflow.exception.ProcessSubmitException import nextflow.exception.ProcessUnrecoverableException import nextflow.executor.BashWrapperBuilder import nextflow.fusion.FusionAwareTask -import nextflow.nomad.NomadConfig +import nextflow.nomad.config.NomadConfig import nextflow.nomad.NomadHelper import nextflow.processor.TaskHandler import nextflow.processor.TaskRun import nextflow.processor.TaskStatus +import nextflow.trace.TraceRecord import nextflow.util.Escape +import nextflow.SysEnv import java.nio.file.Path @@ -47,6 +49,8 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask { private String jobName + private String clientName = null + private String state private long timestamp @@ -68,32 +72,35 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask { @Override boolean checkIfRunning() { + if(isActive()) { + determineClientNode() + } nomadService.checkIfRunning(this.jobName) } @Override boolean checkIfCompleted() { - if (!nomadService.checkIfCompleted(this.jobName)) { + if (!nomadService.checkIfDead(this.jobName)) { return false } state = taskState0(this.jobName) - final isDone = [ + final isFinished = [ "complete", "failed", "dead", "lost"].contains(state) - log.debug "[NOMAD] Task status $task.name taskId=${this.jobName}; state=$state completed=$isDone" + log.debug "[NOMAD] checkIfCompleted task.name=$task.name; state=$state completed=$isFinished" - if (isDone) { + if (isFinished) { // finalize the task task.exitStatus = readExitFile() task.stdout = outputFile task.stderr = errorFile this.status = TaskStatus.COMPLETED - if (state == "Failed" || state == "Lost" || state == "Unknown") + if (state == "failed" || state == "lost" || state == "unknown") task.error = new ProcessUnrecoverableException() if (shouldDelete()) { @@ -137,7 +144,7 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask { } protected Path debugPath() { - boolean debug = config.debug?.getJson() + boolean debug = config.debug()?.getJson() return debug ? task.workDir.resolve('.job.json') : null } @@ -164,6 +171,11 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask { if (fusionEnabled()) { ret += fusionLauncher().fusionEnv() } + + //Add debug env variable + if( SysEnv.containsKey('NXF_DEBUG') ) + ret.put('NXF_DEBUG', SysEnv.get('NXF_DEBUG') ) + return ret } @@ -171,8 +183,10 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask { final now = System.currentTimeMillis() final delta = now - timestamp; if (!status || delta >= 1_000) { - def newState = nomadService.state(jobName) - log.debug "[NOMAD] Task: $taskName state=$state newState=$newState" + + def newState = nomadService.getJobState(jobName) + log.debug "[NOMAD] Check jobState: jobName=$jobName currentState=$state newState=$newState" + if (newState) { state = newState timestamp = now @@ -192,6 +206,26 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask { } private Boolean shouldDelete() { - config.jobOpts.deleteOnCompletion + config.jobOpts().deleteOnCompletion } + + + + private void determineClientNode(){ + try { + if ( !clientName ) + clientName = nomadService.getClientOfJob( jobName ) + log.debug "[NOMAD] determineClientNode: jobName:$jobName; clientName:$clientName" + } catch ( Exception e ){ + log.warn ("[NOMAD] Unable to get the client name of job $jobName -- see the log file for details", e) + } + } + + TraceRecord getTraceRecord() { + final result = super.getTraceRecord() + result.put('native_id', jobName) + result.put( 'hostname', clientName ) + return result + } + } diff --git a/plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy b/plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy index 28090cc..916527d 100644 --- a/plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy +++ b/plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy @@ -17,13 +17,16 @@ package nextflow.nomad +import nextflow.nomad.config.NomadConfig import nextflow.nomad.config.VolumeSpec import spock.lang.Specification +import spock.lang.Unroll /** * Unit test for Nomad Config * * @author : Jorge Aguilera + * @author : Abhinav Sharma */ class NomadConfigSpec extends Specification { @@ -36,12 +39,18 @@ class NomadConfigSpec extends Specification { config.clientOpts } - void "should use localhost as default address"() { - given: - def config = new NomadConfig([:]) + @Unroll + void "should derive the default address"() { expect: - config.clientOpts.address == "http://127.0.0.1:4646/v1" + new NomadConfig([ + client:[address: ADDRESS] + ]).clientOpts.address == EXPECTED + + where: + ADDRESS | EXPECTED + null | "${System.getenv('NOMAD_ADDR')}/v1" + "http://nomad" | "http://nomad/v1" } void "should use address if provided"() { @@ -74,14 +83,14 @@ class NomadConfigSpec extends Specification { config.clientOpts.token == "theToken" } - void "should use an empty list if no datacenters is provided"() { + void "should use the NOMAD_DC variable if no datacenters are provided"() { given: def config = new NomadConfig([ jobs: [:] ]) expect: - !config.jobOpts.datacenters.size() + config.jobOpts.datacenters } void "should use datacenters #dc with size #size if provided"() { diff --git a/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy b/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy index 7c2ac7d..ad58f5b 100644 --- a/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy +++ b/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy @@ -19,7 +19,7 @@ package nextflow.nomad.executor import groovy.json.JsonOutput import groovy.json.JsonSlurper import nextflow.executor.Executor -import nextflow.nomad.NomadConfig +import nextflow.nomad.config.NomadConfig import nextflow.processor.TaskBean import nextflow.processor.TaskConfig import nextflow.processor.TaskProcessor @@ -65,7 +65,7 @@ class NomadServiceSpec extends Specification{ mockWebServer.enqueue(new MockResponse() .addHeader("Content-Type", "application/json")); - def state = service.state("theId") + def state = service.getJobState("theId") def recordedRequest = mockWebServer.takeRequest(); then: @@ -214,7 +214,7 @@ class NomadServiceSpec extends Specification{ """) .addHeader("Content-Type", "application/json")); - state = service.state("theId") + state = service.getJobState("theId") recordedRequest = mockWebServer.takeRequest(); then: @@ -281,7 +281,6 @@ class NomadServiceSpec extends Specification{ body.Job body.Job.ID == id body.Job.Name == name - body.Job.Datacenters == [] body.Job.Type == "batch" body.Job.TaskGroups.size() == 1 body.Job.TaskGroups[0].Name == "group" @@ -357,7 +356,6 @@ class NomadServiceSpec extends Specification{ body.Job body.Job.ID == id body.Job.Name == name - body.Job.Datacenters == [] body.Job.Type == "batch" body.Job.TaskGroups.size() == 1 body.Job.TaskGroups[0].Name == "group" diff --git a/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadTaskHandlerSpec.groovy b/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadTaskHandlerSpec.groovy index 35e1725..42d8411 100644 --- a/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadTaskHandlerSpec.groovy +++ b/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadTaskHandlerSpec.groovy @@ -19,7 +19,7 @@ package nextflow.nomad.executor import nextflow.exception.ProcessSubmitException import nextflow.executor.Executor -import nextflow.nomad.NomadConfig +import nextflow.nomad.config.NomadConfig import nextflow.processor.* import spock.lang.Specification