Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implementation of possibility to add multiple volumes #46

Merged
merged 1 commit into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class NomadConfig {
final String region
final String namespace
final String dockerVolume
final VolumeSpec volumeSpec
final VolumeSpec[] volumeSpec
final AffinitySpec affinitySpec
final ConstraintSpec constraintSpec

Expand All @@ -81,23 +81,46 @@ class NomadConfig {
log.info "dockerVolume config will be deprecated, use volume type:'docker' name:'name' instead"
}

this.volumeSpec = parseVolume(nomadJobOpts)
this.volumeSpec = parseVolumes(nomadJobOpts)
this.affinitySpec = parseAffinity(nomadJobOpts)
this.constraintSpec = parseConstraint(nomadJobOpts)
}

VolumeSpec parseVolume(Map nomadJobOpts){
VolumeSpec[] parseVolumes(Map nomadJobOpts){
List<VolumeSpec> ret = []
if( nomadJobOpts.volume && nomadJobOpts.volume instanceof Closure){
def volumeSpec = new VolumeSpec()
def closure = (nomadJobOpts.volume as Closure)
def clone = closure.rehydrate(volumeSpec, closure.owner, closure.thisObject)
clone.resolveStrategy = Closure.DELEGATE_FIRST
clone()
volumeSpec.validate()
volumeSpec
}else{
null
volumeSpec.workDir(true)
ret.add volumeSpec
}

if( nomadJobOpts.volumes && nomadJobOpts.volumes instanceof List){
nomadJobOpts.volumes.each{ closure ->
if( closure instanceof Closure){
def volumeSpec = new VolumeSpec()
def clone = closure.rehydrate(volumeSpec, closure.owner, closure.thisObject)
clone.resolveStrategy = Closure.DELEGATE_FIRST
clone()
ret.add volumeSpec
}
}
}

if( ret.size() && !ret.find{ it.workDir } ){
ret.first().workDir(true)
}

ret*.validate()

if( ret.findAll{ it.workDir}.size() > 1 ){
throw new IllegalArgumentException("No more than a workdir volume allowed")
}

return ret as VolumeSpec[]
}

AffinitySpec parseAffinity(Map nomadJobOpts) {
Expand Down
27 changes: 24 additions & 3 deletions plugins/nf-nomad/src/main/nextflow/nomad/config/VolumeSpec.groovy
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package nextflow.nomad.config

import nextflow.nomad.NomadConfig

class VolumeSpec {

final static public String VOLUME_DOCKER_TYPE = "docker"
Expand All @@ -14,6 +12,8 @@ class VolumeSpec {

private String type
private String name
private String path
private boolean workDir = false

String getType() {
return type
Expand All @@ -23,6 +23,14 @@ class VolumeSpec {
return name
}

boolean getWorkDir() {
return workDir
}

String getPath() {
return path
}

VolumeSpec type(String type){
this.type = type
this
Expand All @@ -33,12 +41,25 @@ class VolumeSpec {
this
}

VolumeSpec workDir(boolean b){
this.workDir = b
this
}

VolumeSpec path(String path){
this.path = path
this
}

void validate(){
if( !VOLUME_TYPES.contains(type) ) {
throw new IllegalArgumentException("Volume type $type is not supported")
}
if( !this.name ){
throw new IllegalArgumentException("Volume name is required")
throw new IllegalArgumentException("Volume name is required (type $type)")
}
if( !this.workDir && !this.path ){
throw new IllegalArgumentException("Volume path is required in secondary volumes")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,24 +122,26 @@ class NomadService implements Closeable{
)


if( config.jobOpts.volumeSpec && config.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_CSI_TYPE){
if( config.jobOpts.volumeSpec ) {
taskGroup.volumes = [:]
taskGroup.volumes[config.jobOpts.volumeSpec.name]= new VolumeRequest(
type: config.jobOpts.volumeSpec.type,
source: config.jobOpts.volumeSpec.name,
attachmentMode: "file-system",
accessMode: "multi-node-multi-writer"
)
}

if( config.jobOpts.volumeSpec && config.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_HOST_TYPE){
taskGroup.volumes = [:]
taskGroup.volumes[config.jobOpts.volumeSpec.name]= new VolumeRequest(
type: config.jobOpts.volumeSpec.type,
source: config.jobOpts.volumeSpec.name,
)
config.jobOpts.volumeSpec.eachWithIndex { volumeSpec , idx->
if (volumeSpec && volumeSpec.type == VolumeSpec.VOLUME_CSI_TYPE) {
taskGroup.volumes["vol_${idx}".toString()] = new VolumeRequest(
type: volumeSpec.type,
source: volumeSpec.name,
attachmentMode: "file-system",
accessMode: "multi-node-multi-writer"
)
}

if (volumeSpec && volumeSpec.type == VolumeSpec.VOLUME_HOST_TYPE) {
taskGroup.volumes["vol_${idx}".toString()] = new VolumeRequest(
type: volumeSpec.type,
source: volumeSpec.name,
)
}
}
}

return taskGroup
}

Expand Down Expand Up @@ -176,11 +178,15 @@ class NomadService implements Closeable{
}

if( config.jobOpts.volumeSpec){
String destinationDir = workingDir.split(File.separator).dropRight(2).join(File.separator)
taskDef.volumeMounts = [ new VolumeMount(
destination: destinationDir,
volume: config.jobOpts.volumeSpec.name
)]
taskDef.volumeMounts = []
config.jobOpts.volumeSpec.eachWithIndex { volumeSpec, idx ->
String destinationDir = volumeSpec.workDir ?
workingDir.split(File.separator).dropRight(2).join(File.separator) : volumeSpec.path
taskDef.volumeMounts.add new VolumeMount(
destination: destinationDir,
volume: "vol_${idx}".toString()
)
}
}

if( config.jobOpts.affinitySpec ){
Expand Down
82 changes: 76 additions & 6 deletions plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ class NomadConfigSpec extends Specification {

then:
config.jobOpts.volumeSpec
config.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_DOCKER_TYPE
config.jobOpts.volumeSpec.name == "test"
config.jobOpts.volumeSpec[0].type == VolumeSpec.VOLUME_DOCKER_TYPE
config.jobOpts.volumeSpec[0].name == "test"

when:
def config2 = new NomadConfig([
Expand All @@ -141,8 +141,8 @@ class NomadConfigSpec extends Specification {

then:
config2.jobOpts.volumeSpec
config2.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_CSI_TYPE
config2.jobOpts.volumeSpec.name == "test"
config2.jobOpts.volumeSpec[0].type == VolumeSpec.VOLUME_CSI_TYPE
config2.jobOpts.volumeSpec[0].name == "test"

when:
def config3 = new NomadConfig([
Expand All @@ -151,8 +151,8 @@ class NomadConfigSpec extends Specification {

then:
config3.jobOpts.volumeSpec
config3.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_HOST_TYPE
config3.jobOpts.volumeSpec.name == "test"
config3.jobOpts.volumeSpec[0].type == VolumeSpec.VOLUME_HOST_TYPE
config3.jobOpts.volumeSpec[0].name == "test"

when:
new NomadConfig([
Expand Down Expand Up @@ -198,4 +198,74 @@ class NomadConfigSpec extends Specification {
config.jobOpts.constraintSpec.getOperator() == '>'
config.jobOpts.constraintSpec.getValue() == '3'
}

void "should instantiate multiple volumes spec if specified"() {
when:
def config = new NomadConfig([
jobs: [
volumes : [
{ type "docker" name "test" }
]
]
])

then:
config.jobOpts.volumeSpec
config.jobOpts.volumeSpec[0].type == VolumeSpec.VOLUME_DOCKER_TYPE
config.jobOpts.volumeSpec[0].name == "test"
config.jobOpts.volumeSpec[0].workDir

when:
new NomadConfig([
jobs: [
volumes : [
{ type "csi" name "test" },
{ type "docker" name "test" },
]
]
])

then:
thrown(IllegalArgumentException)

when:
def config2 = new NomadConfig([
jobs: [
volumes : [
{ type "csi" name "test" },
{ type "docker" name "test" path '/data' },
]
]
])

then:
config2.jobOpts.volumeSpec.size()==2
config2.jobOpts.volumeSpec[0].type == VolumeSpec.VOLUME_CSI_TYPE
config2.jobOpts.volumeSpec[0].name == "test"
config2.jobOpts.volumeSpec[1].type == VolumeSpec.VOLUME_DOCKER_TYPE
config2.jobOpts.volumeSpec[1].name == "test"

config.jobOpts.volumeSpec[0].workDir
config.jobOpts.volumeSpec.findAll{ it.workDir}.size() == 1

when:
def config3 = new NomadConfig([
jobs: [
volumes : [
{ type "csi" name "test" path '/data'},
{ type "docker" name "test" path '/data'},
],
volume : { type "host" name "test" },
]
])

then:
config3.jobOpts.volumeSpec.size()==3
config3.jobOpts.volumeSpec[0].type == VolumeSpec.VOLUME_HOST_TYPE
config3.jobOpts.volumeSpec[1].type == VolumeSpec.VOLUME_CSI_TYPE
config3.jobOpts.volumeSpec[2].type == VolumeSpec.VOLUME_DOCKER_TYPE

config.jobOpts.volumeSpec[0].workDir
config.jobOpts.volumeSpec.findAll{ it.workDir}.size() == 1
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,9 @@ class NomadServiceSpec extends Specification{
body.Job.TaskGroups[0].Tasks[0].Config.args == args.drop(1)

body.Job.TaskGroups[0].Volumes.size() == 1
body.Job.TaskGroups[0].Volumes['test'] == [AccessMode:"multi-node-multi-writer", AttachmentMode:"file-system", Source:"test", Type:"csi"]
body.Job.TaskGroups[0].Volumes['vol_0'] == [AccessMode:"multi-node-multi-writer", AttachmentMode:"file-system", Source:"test", Type:"csi"]
body.Job.TaskGroups[0].Tasks[0].VolumeMounts.size() == 1
body.Job.TaskGroups[0].Tasks[0].VolumeMounts[0] == [Destination:"/a", Volume:"test"]
body.Job.TaskGroups[0].Tasks[0].VolumeMounts[0] == [Destination:"/a", Volume:"vol_0"]

}

Expand Down
23 changes: 23 additions & 0 deletions validation/multiple-volumes/2-volumes.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
plugins {
id 'nf-nomad@latest'
}

process {
executor = "nomad"
}

nomad {

client {
address = "http://localhost:4646"
}

jobs {
deleteOnCompletion = false
volumes = [
{ type "host" name "scratchdir" },
{ type "host" name "scratchdir" path "/var/data" }, // can mount same volume in different path
]
}

}
26 changes: 26 additions & 0 deletions validation/multiple-volumes/3-volumes.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
plugins {
id 'nf-nomad@latest'
}

process {
executor = "nomad"
}

nomad {

client {
address = "http://localhost:4646"
}

jobs {
deleteOnCompletion = false

volume = { type "host" name "scratchdir" }

volumes = [
{ type "host" name "scratchdir" path "/var/data1" },
{ type "host" name "scratchdir" path "/var/data2" }
]
}

}
18 changes: 18 additions & 0 deletions validation/multiple-volumes/main.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/usr/bin/env nextflow

process sayHello {
container 'ubuntu:20.04'

input:
val x
output:
stdout
script:
"""
echo '$x world!'
"""
}

workflow {
Channel.of('Bonjour', 'Ciao', 'Hello', 'Hola') | sayHello | view
}
Loading