Skip to content

Commit

Permalink
add spread feature (#88)
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Aguilera <jorge@edn.es>
  • Loading branch information
jagedn authored Sep 16, 2024
1 parent 0a3f62e commit e8f28ab
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import groovy.util.logging.Slf4j
import nextflow.nomad.models.JobAffinity
import nextflow.nomad.models.JobConstraint
import nextflow.nomad.models.JobConstraints
import nextflow.nomad.models.JobSpreads
import nextflow.nomad.models.JobVolume


Expand All @@ -45,6 +46,7 @@ class NomadJobOpts{
JobAffinity affinitySpec
JobConstraint constraintSpec
JobConstraints constraintsSpec
JobSpreads spreadsSpec

Integer rescheduleAttempts
Integer restartAttempts
Expand Down Expand Up @@ -85,6 +87,7 @@ class NomadJobOpts{
this.constraintSpec = parseConstraint(nomadJobOpts)
this.constraintsSpec = parseConstraints(nomadJobOpts)
this.secretOpts = parseSecrets(nomadJobOpts)
this.spreadsSpec = parseSpreads(nomadJobOpts)
}

JobVolume[] parseVolumes(Map nomadJobOpts){
Expand Down Expand Up @@ -177,4 +180,15 @@ class NomadJobOpts{
}
}

JobSpreads parseSpreads(Map nomadJobOpts){
if( nomadJobOpts.spreads && nomadJobOpts.spreads instanceof Closure){
def spec = new JobSpreads()
def closure = (nomadJobOpts.spreads as Closure)
def clone = closure.rehydrate(spec, closure.owner, closure.thisObject)
clone.resolveStrategy = Closure.DELEGATE_FIRST
clone()
spec.validate()
spec
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import io.nomadproject.client.model.*
import nextflow.nomad.models.ConstraintsBuilder
import nextflow.nomad.models.JobConstraints
import nextflow.nomad.config.NomadConfig
import nextflow.nomad.models.JobSpreads
import nextflow.nomad.models.JobVolume
import nextflow.nomad.models.SpreadsBuilder
import nextflow.processor.TaskRun
import nextflow.util.MemoryUnit
import nextflow.exception.ProcessSubmitException
Expand Down Expand Up @@ -99,6 +101,7 @@ class NomadService implements Closeable{
job.taskGroups = [createTaskGroup(task, args, env)]

assignDatacenters(task, job)
spreads(task, job)

JobRegisterRequest jobRegisterRequest = new JobRegisterRequest()
jobRegisterRequest.setJob(job)
Expand Down Expand Up @@ -313,6 +316,27 @@ class NomadService implements Closeable{
job
}

protected Job spreads(TaskRun task, Job jobDef){
def spreads = [] as List<Spread>
if( config.jobOpts().spreadsSpec ){
def list = SpreadsBuilder.spreadsSpecToList(config.jobOpts().spreadsSpec)
spreads.addAll(list)
}
if( task.processor?.config?.get(TaskDirectives.SPREAD) &&
task.processor?.config?.get(TaskDirectives.SPREAD) instanceof Map) {
Map map = task.processor?.config?.get(TaskDirectives.SPREAD) as Map
JobSpreads spreadSpec = new JobSpreads()
spreadSpec.spread(map)
def list = SpreadsBuilder.spreadsSpecToList(spreadSpec)
spreads.addAll(list)
}

spreads.each{
jobDef.addSpreadsItem(it)
}
jobDef
}

String getJobState(String jobId){
try {
List<AllocationListStub> allocations = jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ class TaskDirectives {

public static final String SECRETS = "secret"

public static final String SPREAD = "spread"

public static final List<String> ALL = [
DATACENTERS,
CONSTRAINTS,
SECRETS
SECRETS,
SPREAD
]
}
64 changes: 64 additions & 0 deletions plugins/nf-nomad/src/main/nextflow/nomad/models/JobSpreads.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2023-, Stellenbosch University, South Africa
* Copyright 2024, Evaluacion y Desarrollo de Negocios, Spain
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package nextflow.nomad.models

import org.apache.commons.lang3.tuple.Pair
import org.apache.commons.lang3.tuple.Triple

/**
* Nomad Job Spread Spec
*
* @author Jorge Aguilera <jorge@edn.es>
*/

class JobSpreads {

private List<Triple<String, Integer, List<Pair<String, Integer>>>> raws= []

List<Triple<String, Integer, List<Pair<String, Integer>>>> getRaws() {
return raws
}

JobSpreads setSpread(Map map){
spread(map)
}

JobSpreads spread(Map map){
if( map.containsKey("name") && map.containsKey("weight")){
def name = map.name as String
def weight = map.weight as int
def targets = [] as List<Pair>
if( map.containsKey("targets") && map.targets instanceof Map){
(map.targets as Map).entrySet().each{entry->
def target = entry.key as String
if( entry.value.toString().isNumber() ){
def targetW = entry.value as int
targets.add( Pair.of(target, targetW))
}
}
}
raws.add Triple.of(name, weight, targets)
}
this
}

void validate(){

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package nextflow.nomad.models

import groovy.transform.CompileStatic
import io.nomadproject.client.model.Spread
import io.nomadproject.client.model.SpreadTarget

/**
* Nomad Job Spread Spec Builder
*
* @author Jorge Aguilera <jorge@edn.es>
*/

@CompileStatic
class SpreadsBuilder {

static List<Spread> spreadsSpecToList(JobSpreads spreads){
def ret = [] as List<Spread>

spreads.raws.each{raw->
def targets = [] as List<SpreadTarget>
raw.right.each {
targets.add( new SpreadTarget(value: it.left, percent: it.right) )
}
ret.add new Spread(attribute: raw.left, weight: raw.middle, spreadTarget: targets)
}

return ret
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -295,4 +295,36 @@ class NomadConfigSpec extends Specification {
then:
thrown(IllegalArgumentException)
}

void "should instantiate a spread spec if specified"() {
when:
def config = new NomadConfig([
jobs: [spreads : {
spread = [name:'test', weight:100]
}]
])

then:
config.jobOpts.spreadsSpec
config.jobOpts.spreadsSpec.getRaws().size() == 1
config.jobOpts.spreadsSpec.getRaws().first().left == 'test'
config.jobOpts.spreadsSpec.getRaws().first().middle == 100
config.jobOpts.spreadsSpec.getRaws().first().right.size() == 0

when:
def config2 = new NomadConfig([
jobs: [spreads : {
spread = [name:'test', weight:100, targets:[ a:50, b:100]]
}]
])

then:
config2.jobOpts.spreadsSpec
config2.jobOpts.spreadsSpec.getRaws().size() == 1
config2.jobOpts.spreadsSpec.getRaws().first().left == 'test'
config2.jobOpts.spreadsSpec.getRaws().first().middle == 100
config2.jobOpts.spreadsSpec.getRaws().first().right.size() == 2
config2.jobOpts.spreadsSpec.getRaws().first().right.first().left == 'a'
config2.jobOpts.spreadsSpec.getRaws().first().right.first().right == 50
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -623,4 +623,67 @@ class NomadServiceSpec extends Specification{
({ 'a'*10 }) | ['aaaaaaaaaa']
}

void "submit a task with a spread"(){
given:
def config = new NomadConfig(
client:[
address : "http://${mockWebServer.hostName}:${mockWebServer.port}"
],
jobs:[
spreads : {
spread = [name:'test', weight:50, targets:['a':30]]
}
]
)
def service = new NomadService(config)

String id = "theId"
String name = "theName"
String image = "theImage"
List<String> args = ["theCommand", "theArgs"]
String workingDir = "/a/b/c"
Map<String, String>env = [test:"test"]

def mockTask = Mock(TaskRun){
getName() >> name
getContainer() >> image
getConfig() >> Mock(TaskConfig)
getWorkDirStr() >> workingDir
getContainer() >> "ubuntu"
getProcessor() >> Mock(TaskProcessor){
getExecutor() >> Mock(Executor){
isFusionEnabled() >> false
}
}
getWorkDir() >> Path.of(workingDir)
toTaskBean() >> Mock(TaskBean){
getWorkDir() >> Path.of(workingDir)
getScript() >> "theScript"
getShell() >> ["bash"]
getInputFiles() >> [:]
}
}

mockWebServer.enqueue(new MockResponse()
.setBody(JsonOutput.toJson(["EvalID":"test"]).toString())
.addHeader("Content-Type", "application/json"));
when:

def idJob = service.submitTask(id, mockTask, args, env)
def recordedRequest = mockWebServer.takeRequest();
def body = new JsonSlurper().parseText(recordedRequest.body.readUtf8())

then:
idJob

and:
recordedRequest.method == "POST"
recordedRequest.path == "/v1/jobs"

and:
body.Job.Spreads[0].Attribute == 'test'
body.Job.Spreads[0].Weight == 50
body.Job.Spreads[0].SpreadTarget.first().Value == 'a'
body.Job.Spreads[0].SpreadTarget.first().Percent == 30
}
}
18 changes: 18 additions & 0 deletions validation/spread/main.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/usr/bin/env nextflow

process sayHello {
container 'ubuntu:20.04'

input:
val x
output:
stdout
script:
"""
echo '$x world!'
"""
}

workflow {
Channel.of('Bonjour', 'Ciao', 'Hello', 'Hola') | sayHello | view
}
35 changes: 35 additions & 0 deletions validation/spread/node-nextflow.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
plugins {
id "nf-nomad@${System.getenv("NOMAD_PLUGIN_VERSION") ?: "latest"}"
}

process {
executor = "nomad"
}

nomad {

client {
address = "http://localhost:4646"
}

jobs {
deleteOnCompletion = false
volume = { type "host" name "scratchdir" }

spreads = {
spread = [ name:'node.datacenter', weight: 50 ]
}
}

}

profiles{
localnomad{
process {
withName: sayHello {
datacenters = ['test-datacenter', 'demo-datacenter']
spread = [ name:'node.datacenter', weight: 50, targets : ['us-east1':70, 'us-east2':30] ]
}
}
}
}

0 comments on commit e8f28ab

Please sign in to comment.