Skip to content

Commit

Permalink
add Constraints (Node and Attr) (#66)
Browse files Browse the repository at this point in the history
* implement ConstraintNodeSpec

Signed-off-by: Jorge Aguilera <jorge@edn.es>

* implement ConstraintAttrSpec

Signed-off-by: Jorge Aguilera <jorge@edn.es>

* Constraints refactor to introduce a specific models package (#68)

* refactor the package organization to create a separate package for models

* Refactor the test folder and add license headers

* move contraintsbuilder to model

Signed-off-by: Jorge Aguilera <jorge@edn.es>

* refactor and new "raw" dsl to include open constraints

Signed-off-by: Jorge Aguilera <jorge@edn.es>

* iterate upon the constraint config [ci skip]

* fix nomadlab typos

Signed-off-by: Jorge Aguilera <jorge@edn.es>

* use cloud cache in sun-nomadlab [ci skip]

---------

Signed-off-by: Jorge Aguilera <jorge@edn.es>
Co-authored-by: Abhinav Sharma <abhi18av@users.noreply.github.com>
Co-authored-by: Abhinav Sharma <abhi18av@gmail.com>
  • Loading branch information
3 people authored Jul 15, 2024
1 parent 6ee5725 commit f9d8834
Show file tree
Hide file tree
Showing 20 changed files with 908 additions and 62 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version=0.1.1
version=0.1.2
github_organization=nextflow-io
47 changes: 35 additions & 12 deletions plugins/nf-nomad/src/main/nextflow/nomad/config/NomadJobOpts.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ package nextflow.nomad.config

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.nomad.models.JobAffinity
import nextflow.nomad.models.JobConstraint
import nextflow.nomad.models.JobConstraints
import nextflow.nomad.models.JobVolume


/**
Expand All @@ -37,9 +41,11 @@ class NomadJobOpts{
String region
String namespace
String dockerVolume
VolumeSpec[] volumeSpec
AffinitySpec affinitySpec
ConstraintSpec constraintSpec
JobVolume[] volumeSpec
JobAffinity affinitySpec
JobConstraint constraintSpec

JobConstraints constraintsSpec

NomadJobOpts(Map nomadJobOpts, Map<String,String> env=null){
assert nomadJobOpts!=null
Expand Down Expand Up @@ -69,12 +75,13 @@ class NomadJobOpts{
this.volumeSpec = parseVolumes(nomadJobOpts)
this.affinitySpec = parseAffinity(nomadJobOpts)
this.constraintSpec = parseConstraint(nomadJobOpts)
this.constraintsSpec = parseConstraints(nomadJobOpts)
}

VolumeSpec[] parseVolumes(Map nomadJobOpts){
List<VolumeSpec> ret = []
JobVolume[] parseVolumes(Map nomadJobOpts){
List<JobVolume> ret = []
if( nomadJobOpts.volume && nomadJobOpts.volume instanceof Closure){
def volumeSpec = new VolumeSpec()
def volumeSpec = new JobVolume()
def closure = (nomadJobOpts.volume as Closure)
def clone = closure.rehydrate(volumeSpec, closure.owner, closure.thisObject)
clone.resolveStrategy = Closure.DELEGATE_FIRST
Expand All @@ -86,7 +93,7 @@ class NomadJobOpts{
if( nomadJobOpts.volumes && nomadJobOpts.volumes instanceof List){
nomadJobOpts.volumes.each{ closure ->
if( closure instanceof Closure){
def volumeSpec = new VolumeSpec()
def volumeSpec = new JobVolume()
def clone = closure.rehydrate(volumeSpec, closure.owner, closure.thisObject)
clone.resolveStrategy = Closure.DELEGATE_FIRST
clone()
Expand All @@ -105,12 +112,13 @@ class NomadJobOpts{
throw new IllegalArgumentException("No more than a workdir volume allowed")
}

return ret as VolumeSpec[]
return ret as JobVolume[]
}

AffinitySpec parseAffinity(Map nomadJobOpts) {
JobAffinity parseAffinity(Map nomadJobOpts) {
if (nomadJobOpts.affinity && nomadJobOpts.affinity instanceof Closure) {
def affinitySpec = new AffinitySpec()
log.info "affinity config will be deprecated, use affinities closure instead"
def affinitySpec = new JobAffinity()
def closure = (nomadJobOpts.affinity as Closure)
def clone = closure.rehydrate(affinitySpec, closure.owner, closure.thisObject)
clone.resolveStrategy = Closure.DELEGATE_FIRST
Expand All @@ -122,9 +130,10 @@ class NomadJobOpts{
}
}

ConstraintSpec parseConstraint(Map nomadJobOpts){
JobConstraint parseConstraint(Map nomadJobOpts){
if (nomadJobOpts.constraint && nomadJobOpts.constraint instanceof Closure) {
def constraintSpec = new ConstraintSpec()
log.info "constraint config will be deprecated, use constraints closure instead"
def constraintSpec = new JobConstraint()
def closure = (nomadJobOpts.constraint as Closure)
def clone = closure.rehydrate(constraintSpec, closure.owner, closure.thisObject)
clone.resolveStrategy = Closure.DELEGATE_FIRST
Expand All @@ -135,4 +144,18 @@ class NomadJobOpts{
null
}
}

JobConstraints parseConstraints(Map nomadJobOpts){
if (nomadJobOpts.constraints && nomadJobOpts.constraints instanceof Closure) {
def constraintsSpec = new JobConstraints()
def closure = (nomadJobOpts.constraints as Closure)
def clone = closure.rehydrate(constraintsSpec, closure.owner, closure.thisObject)
clone.resolveStrategy = Closure.DELEGATE_FIRST
clone()
constraintsSpec.validate()
constraintsSpec
}else{
null
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import groovy.util.logging.Slf4j
import io.nomadproject.client.ApiClient
import io.nomadproject.client.api.JobsApi
import io.nomadproject.client.model.*
import nextflow.nomad.models.ConstraintsBuilder
import nextflow.nomad.models.JobConstraints
import nextflow.nomad.config.NomadConfig
import nextflow.nomad.config.VolumeSpec
import nextflow.nomad.models.JobVolume
import nextflow.processor.TaskRun
import nextflow.util.MemoryUnit
import nextflow.exception.ProcessSubmitException
Expand Down Expand Up @@ -135,7 +137,7 @@ class NomadService implements Closeable{
if( config.jobOpts().volumeSpec ) {
taskGroup.volumes = [:]
config.jobOpts().volumeSpec.eachWithIndex { volumeSpec , idx->
if (volumeSpec && volumeSpec.type == VolumeSpec.VOLUME_CSI_TYPE) {
if (volumeSpec && volumeSpec.type == JobVolume.VOLUME_CSI_TYPE) {
taskGroup.volumes["vol_${idx}".toString()] = new VolumeRequest(
type: volumeSpec.type,
source: volumeSpec.name,
Expand All @@ -145,7 +147,7 @@ class NomadService implements Closeable{
)
}

if (volumeSpec && volumeSpec.type == VolumeSpec.VOLUME_HOST_TYPE) {
if (volumeSpec && volumeSpec.type == JobVolume.VOLUME_HOST_TYPE) {
taskGroup.volumes["vol_${idx}".toString()] = new VolumeRequest(
type: volumeSpec.type,
source: volumeSpec.name,
Expand Down Expand Up @@ -182,7 +184,8 @@ class NomadService implements Closeable{

volumes(task, taskDef, workingDir)
affinity(task, taskDef)
constrains(task, taskDef)
constraint(task, taskDef)
constraints(task, taskDef)

return taskDef
}
Expand Down Expand Up @@ -233,7 +236,7 @@ class NomadService implements Closeable{
taskDef
}

protected Task constrains(TaskRun task, Task taskDef){
protected Task constraint(TaskRun task, Task taskDef){
if( config.jobOpts().constraintSpec ){
def constraint = new Constraint()
if(config.jobOpts().constraintSpec.attribute){
Expand All @@ -251,8 +254,32 @@ class NomadService implements Closeable{
taskDef
}

protected Task constraints(TaskRun task, Task taskDef){
def constraints = [] as List<Constraint>

if( config.jobOpts().constraintsSpec ){
def list = ConstraintsBuilder.constraintsSpecToList(config.jobOpts().constraintsSpec)
constraints.addAll(list)
}

if( task.processor?.config?.get(TaskDirectives.CONSTRAINTS) &&
task.processor?.config?.get(TaskDirectives.CONSTRAINTS) instanceof Closure) {
Closure closure = task.processor?.config?.get(TaskDirectives.CONSTRAINTS) as Closure
JobConstraints constraintsSpec = JobConstraints.parse(closure)
def list = ConstraintsBuilder.constraintsSpecToList(constraintsSpec)
constraints.addAll(list)
}

if( constraints.size()) {
taskDef.constraints(constraints)
}
taskDef
}



protected Job assignDatacenters(TaskRun task, Job job){
def datacenters = task.processor?.config?.get("datacenters")
def datacenters = task.processor?.config?.get(TaskDirectives.DATACENTERS)
if( datacenters ){
if( datacenters instanceof List<String>) {
job.datacenters( datacenters as List<String>)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ class TaskDirectives {

public static final String DATACENTERS = "datacenters"

public static final String CONSTRAINTS = "constraints"

public static final List<String> ALL = [
DATACENTERS
DATACENTERS,
CONSTRAINTS
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package nextflow.nomad.models

import io.nomadproject.client.model.Constraint
import nextflow.nomad.models.JobConstraintsAttr
import nextflow.nomad.models.JobConstraintsNode
import nextflow.nomad.models.JobConstraints

class ConstraintsBuilder {

static List<Constraint> constraintsSpecToList(JobConstraints spec){
def constraints = [] as List<Constraint>
if( spec?.nodeSpecs ){
def nodes = spec.nodeSpecs
?.collect({ nodeConstraints(it)})
?.flatten() as List<Constraint>
constraints.addAll(nodes)
}
if( spec?.attrSpecs ){
def nodes = spec.attrSpecs
?.collect({ attrConstraints(it)})
?.flatten() as List<Constraint>
constraints.addAll(nodes)
}
return constraints
}

protected static List<Constraint> nodeConstraints(JobConstraintsNode nodeSpec){
def ret = nodeSpec.raws?.collect{ triple->
return new Constraint()
.ltarget('${'+triple.left+'}')
.operand(triple.middle)
.rtarget(triple.right)
} as List<Constraint>
ret
}

protected static List<Constraint> attrConstraints(JobConstraintsAttr nodeSpec) {
def ret = nodeSpec.raws?.collect{ triple->
return new Constraint()
.ltarget('${'+triple.left+'}')
.operand(triple.middle)
.rtarget(triple.right)
} as List<Constraint>
ret
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
* limitations under the License.
*/

package nextflow.nomad.config
package nextflow.nomad.models
/**
* Nomad Job Affinity Spec
*
* @author Jorge Aguilera <jagedn@gmail.com>
*/
class AffinitySpec{
class JobAffinity {

private String attribute
private String operator
Expand All @@ -44,22 +44,22 @@ class AffinitySpec{
return weight
}

AffinitySpec attribute(String attribute){
JobAffinity attribute(String attribute){
this.attribute=attribute
this
}

AffinitySpec operator(String operator){
JobAffinity operator(String operator){
this.operator = operator
this
}

AffinitySpec value(String value){
JobAffinity value(String value){
this.value = value
this
}

AffinitySpec weight(int weight){
JobAffinity weight(int weight){
this.weight = weight
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
* limitations under the License.
*/

package nextflow.nomad.config
package nextflow.nomad.models
/**
* Nomad Job Constraint Spec
*
* @author Jorge Aguilera <jagedn@gmail.com>
*/

class ConstraintSpec {
class JobConstraint {

private String attribute
private String operator
Expand All @@ -40,17 +40,17 @@ class ConstraintSpec {
return value
}

ConstraintSpec attribute(String attribute){
JobConstraint attribute(String attribute){
this.attribute=attribute
this
}

ConstraintSpec operator(String operator){
JobConstraint operator(String operator){
this.operator = operator
this
}

ConstraintSpec value(String value){
JobConstraint value(String value){
this.value = value
this
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2023-, Stellenbosch University, South Africa
* Copyright 2024, Evaluacion y Desarrollo de Negocios, Spain
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.nomad.models

/**
* Nomad Job Constraint Spec
*
* @author Jorge Aguilera <jagedn@gmail.com>
*/

class JobConstraints {

List<JobConstraintsNode> nodeSpecs = []
List<JobConstraintsAttr> attrSpecs = []

JobConstraints node(@DelegatesTo(JobConstraintsNode)Closure closure){
JobConstraintsNode constraintSpec = new JobConstraintsNode()
def clone = closure.rehydrate(constraintSpec, closure.owner, closure.thisObject)
clone.resolveStrategy = Closure.DELEGATE_FIRST
clone()
nodeSpecs << constraintSpec
this
}

JobConstraints attr(@DelegatesTo(JobConstraintsAttr)Closure closure){
JobConstraintsAttr constraintSpec = new JobConstraintsAttr()
def clone = closure.rehydrate(constraintSpec, closure.owner, closure.thisObject)
clone.resolveStrategy = Closure.DELEGATE_FIRST
clone()
attrSpecs << constraintSpec
this
}

void validate(){

}

static JobConstraints parse(@DelegatesTo(JobConstraints)Closure closure){
JobConstraints constraintsSpec = new JobConstraints()
def clone = closure.rehydrate(constraintsSpec, closure.owner, closure.thisObject)
clone.resolveStrategy = Closure.DELEGATE_FIRST
clone()
constraintsSpec.validate()
constraintsSpec
}
}
Loading

0 comments on commit f9d8834

Please sign in to comment.