diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy index 0716b1f..0e9edff 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy @@ -60,7 +60,7 @@ class NomadConfig { final String region final String namespace final String dockerVolume - final VolumeSpec volumeSpec + final VolumeSpec[] volumeSpec final AffinitySpec affinitySpec final ConstraintSpec constraintSpec @@ -81,23 +81,46 @@ class NomadConfig { log.info "dockerVolume config will be deprecated, use volume type:'docker' name:'name' instead" } - this.volumeSpec = parseVolume(nomadJobOpts) + this.volumeSpec = parseVolumes(nomadJobOpts) this.affinitySpec = parseAffinity(nomadJobOpts) this.constraintSpec = parseConstraint(nomadJobOpts) } - VolumeSpec parseVolume(Map nomadJobOpts){ + VolumeSpec[] parseVolumes(Map nomadJobOpts){ + List ret = [] if( nomadJobOpts.volume && nomadJobOpts.volume instanceof Closure){ def volumeSpec = new VolumeSpec() def closure = (nomadJobOpts.volume as Closure) def clone = closure.rehydrate(volumeSpec, closure.owner, closure.thisObject) clone.resolveStrategy = Closure.DELEGATE_FIRST clone() - volumeSpec.validate() - volumeSpec - }else{ - null + volumeSpec.workDir(true) + ret.add volumeSpec + } + + if( nomadJobOpts.volumes && nomadJobOpts.volumes instanceof List){ + nomadJobOpts.volumes.each{ closure -> + if( closure instanceof Closure){ + def volumeSpec = new VolumeSpec() + def clone = closure.rehydrate(volumeSpec, closure.owner, closure.thisObject) + clone.resolveStrategy = Closure.DELEGATE_FIRST + clone() + ret.add volumeSpec + } + } + } + + if( ret.size() && !ret.find{ it.workDir } ){ + ret.first().workDir(true) } + + ret*.validate() + + if( ret.findAll{ it.workDir}.size() > 1 ){ + throw new IllegalArgumentException("No more than a workdir volume allowed") + } + + return ret as VolumeSpec[] } AffinitySpec parseAffinity(Map nomadJobOpts) { diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/config/VolumeSpec.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/config/VolumeSpec.groovy index 480ea7b..6f55ced 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/config/VolumeSpec.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/config/VolumeSpec.groovy @@ -1,7 +1,5 @@ package nextflow.nomad.config -import nextflow.nomad.NomadConfig - class VolumeSpec { final static public String VOLUME_DOCKER_TYPE = "docker" @@ -14,6 +12,8 @@ class VolumeSpec { private String type private String name + private String path + private boolean workDir = false String getType() { return type @@ -23,6 +23,14 @@ class VolumeSpec { return name } + boolean getWorkDir() { + return workDir + } + + String getPath() { + return path + } + VolumeSpec type(String type){ this.type = type this @@ -33,12 +41,25 @@ class VolumeSpec { this } + VolumeSpec workDir(boolean b){ + this.workDir = b + this + } + + VolumeSpec path(String path){ + this.path = path + this + } + void validate(){ if( !VOLUME_TYPES.contains(type) ) { throw new IllegalArgumentException("Volume type $type is not supported") } if( !this.name ){ - throw new IllegalArgumentException("Volume name is required") + throw new IllegalArgumentException("Volume name is required (type $type)") + } + if( !this.workDir && !this.path ){ + throw new IllegalArgumentException("Volume path is required in secondary volumes") } } } diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy index 0061488..9029506 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy @@ -122,24 +122,26 @@ class NomadService implements Closeable{ ) - if( config.jobOpts.volumeSpec && config.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_CSI_TYPE){ + if( config.jobOpts.volumeSpec ) { taskGroup.volumes = [:] - taskGroup.volumes[config.jobOpts.volumeSpec.name]= new VolumeRequest( - type: config.jobOpts.volumeSpec.type, - source: config.jobOpts.volumeSpec.name, - attachmentMode: "file-system", - accessMode: "multi-node-multi-writer" - ) - } - - if( config.jobOpts.volumeSpec && config.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_HOST_TYPE){ - taskGroup.volumes = [:] - taskGroup.volumes[config.jobOpts.volumeSpec.name]= new VolumeRequest( - type: config.jobOpts.volumeSpec.type, - source: config.jobOpts.volumeSpec.name, - ) + config.jobOpts.volumeSpec.eachWithIndex { volumeSpec , idx-> + if (volumeSpec && volumeSpec.type == VolumeSpec.VOLUME_CSI_TYPE) { + taskGroup.volumes["vol_${idx}".toString()] = new VolumeRequest( + type: volumeSpec.type, + source: volumeSpec.name, + attachmentMode: "file-system", + accessMode: "multi-node-multi-writer" + ) + } + + if (volumeSpec && volumeSpec.type == VolumeSpec.VOLUME_HOST_TYPE) { + taskGroup.volumes["vol_${idx}".toString()] = new VolumeRequest( + type: volumeSpec.type, + source: volumeSpec.name, + ) + } + } } - return taskGroup } @@ -176,11 +178,15 @@ class NomadService implements Closeable{ } if( config.jobOpts.volumeSpec){ - String destinationDir = workingDir.split(File.separator).dropRight(2).join(File.separator) - taskDef.volumeMounts = [ new VolumeMount( - destination: destinationDir, - volume: config.jobOpts.volumeSpec.name - )] + taskDef.volumeMounts = [] + config.jobOpts.volumeSpec.eachWithIndex { volumeSpec, idx -> + String destinationDir = volumeSpec.workDir ? + workingDir.split(File.separator).dropRight(2).join(File.separator) : volumeSpec.path + taskDef.volumeMounts.add new VolumeMount( + destination: destinationDir, + volume: "vol_${idx}".toString() + ) + } } if( config.jobOpts.affinitySpec ){ diff --git a/plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy b/plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy index 81fcf03..28090cc 100644 --- a/plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy +++ b/plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy @@ -131,8 +131,8 @@ class NomadConfigSpec extends Specification { then: config.jobOpts.volumeSpec - config.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_DOCKER_TYPE - config.jobOpts.volumeSpec.name == "test" + config.jobOpts.volumeSpec[0].type == VolumeSpec.VOLUME_DOCKER_TYPE + config.jobOpts.volumeSpec[0].name == "test" when: def config2 = new NomadConfig([ @@ -141,8 +141,8 @@ class NomadConfigSpec extends Specification { then: config2.jobOpts.volumeSpec - config2.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_CSI_TYPE - config2.jobOpts.volumeSpec.name == "test" + config2.jobOpts.volumeSpec[0].type == VolumeSpec.VOLUME_CSI_TYPE + config2.jobOpts.volumeSpec[0].name == "test" when: def config3 = new NomadConfig([ @@ -151,8 +151,8 @@ class NomadConfigSpec extends Specification { then: config3.jobOpts.volumeSpec - config3.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_HOST_TYPE - config3.jobOpts.volumeSpec.name == "test" + config3.jobOpts.volumeSpec[0].type == VolumeSpec.VOLUME_HOST_TYPE + config3.jobOpts.volumeSpec[0].name == "test" when: new NomadConfig([ @@ -198,4 +198,74 @@ class NomadConfigSpec extends Specification { config.jobOpts.constraintSpec.getOperator() == '>' config.jobOpts.constraintSpec.getValue() == '3' } + + void "should instantiate multiple volumes spec if specified"() { + when: + def config = new NomadConfig([ + jobs: [ + volumes : [ + { type "docker" name "test" } + ] + ] + ]) + + then: + config.jobOpts.volumeSpec + config.jobOpts.volumeSpec[0].type == VolumeSpec.VOLUME_DOCKER_TYPE + config.jobOpts.volumeSpec[0].name == "test" + config.jobOpts.volumeSpec[0].workDir + + when: + new NomadConfig([ + jobs: [ + volumes : [ + { type "csi" name "test" }, + { type "docker" name "test" }, + ] + ] + ]) + + then: + thrown(IllegalArgumentException) + + when: + def config2 = new NomadConfig([ + jobs: [ + volumes : [ + { type "csi" name "test" }, + { type "docker" name "test" path '/data' }, + ] + ] + ]) + + then: + config2.jobOpts.volumeSpec.size()==2 + config2.jobOpts.volumeSpec[0].type == VolumeSpec.VOLUME_CSI_TYPE + config2.jobOpts.volumeSpec[0].name == "test" + config2.jobOpts.volumeSpec[1].type == VolumeSpec.VOLUME_DOCKER_TYPE + config2.jobOpts.volumeSpec[1].name == "test" + + config.jobOpts.volumeSpec[0].workDir + config.jobOpts.volumeSpec.findAll{ it.workDir}.size() == 1 + + when: + def config3 = new NomadConfig([ + jobs: [ + volumes : [ + { type "csi" name "test" path '/data'}, + { type "docker" name "test" path '/data'}, + ], + volume : { type "host" name "test" }, + ] + ]) + + then: + config3.jobOpts.volumeSpec.size()==3 + config3.jobOpts.volumeSpec[0].type == VolumeSpec.VOLUME_HOST_TYPE + config3.jobOpts.volumeSpec[1].type == VolumeSpec.VOLUME_CSI_TYPE + config3.jobOpts.volumeSpec[2].type == VolumeSpec.VOLUME_DOCKER_TYPE + + config.jobOpts.volumeSpec[0].workDir + config.jobOpts.volumeSpec.findAll{ it.workDir}.size() == 1 + } } diff --git a/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy b/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy index 12b7896..902235e 100644 --- a/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy +++ b/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy @@ -372,9 +372,9 @@ class NomadServiceSpec extends Specification{ body.Job.TaskGroups[0].Tasks[0].Config.args == args.drop(1) body.Job.TaskGroups[0].Volumes.size() == 1 - body.Job.TaskGroups[0].Volumes['test'] == [AccessMode:"multi-node-multi-writer", AttachmentMode:"file-system", Source:"test", Type:"csi"] + body.Job.TaskGroups[0].Volumes['vol_0'] == [AccessMode:"multi-node-multi-writer", AttachmentMode:"file-system", Source:"test", Type:"csi"] body.Job.TaskGroups[0].Tasks[0].VolumeMounts.size() == 1 - body.Job.TaskGroups[0].Tasks[0].VolumeMounts[0] == [Destination:"/a", Volume:"test"] + body.Job.TaskGroups[0].Tasks[0].VolumeMounts[0] == [Destination:"/a", Volume:"vol_0"] } diff --git a/validation/multiple-volumes/2-volumes.config b/validation/multiple-volumes/2-volumes.config new file mode 100644 index 0000000..b622801 --- /dev/null +++ b/validation/multiple-volumes/2-volumes.config @@ -0,0 +1,23 @@ +plugins { + id 'nf-nomad@latest' +} + +process { + executor = "nomad" +} + +nomad { + + client { + address = "http://localhost:4646" + } + + jobs { + deleteOnCompletion = false + volumes = [ + { type "host" name "scratchdir" }, + { type "host" name "scratchdir" path "/var/data" }, // can mount same volume in different path + ] + } + +} diff --git a/validation/multiple-volumes/3-volumes.config b/validation/multiple-volumes/3-volumes.config new file mode 100644 index 0000000..d409ce6 --- /dev/null +++ b/validation/multiple-volumes/3-volumes.config @@ -0,0 +1,26 @@ +plugins { + id 'nf-nomad@latest' +} + +process { + executor = "nomad" +} + +nomad { + + client { + address = "http://localhost:4646" + } + + jobs { + deleteOnCompletion = false + + volume = { type "host" name "scratchdir" } + + volumes = [ + { type "host" name "scratchdir" path "/var/data1" }, + { type "host" name "scratchdir" path "/var/data2" } + ] + } + +} diff --git a/validation/multiple-volumes/main.nf b/validation/multiple-volumes/main.nf new file mode 100644 index 0000000..87f26f8 --- /dev/null +++ b/validation/multiple-volumes/main.nf @@ -0,0 +1,18 @@ +#!/usr/bin/env nextflow + +process sayHello { + container 'ubuntu:20.04' + + input: + val x + output: + stdout + script: + """ + echo '$x world!' + """ +} + +workflow { + Channel.of('Bonjour', 'Ciao', 'Hello', 'Hola') | sayHello | view +} \ No newline at end of file