diff --git a/.gitignore b/.gitignore index 61c50b1..3353b66 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ oracle-nomad-cluster .settings /validation/nomad_temp/** /validation/nomad +**/*.tsv \ No newline at end of file diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadExecutor.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadExecutor.groovy index 96a6b3b..5973aaf 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadExecutor.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadExecutor.groovy @@ -60,6 +60,9 @@ class NomadExecutor extends Executor implements ExtensionPoint { @Override TaskHandler createTaskHandler(TaskRun task) { + assert task + assert task.workDir + log.trace "[NOMAD] launching process > ${task.name} -- work folder: ${task.workDirStr}" return new NomadTaskHandler(task, this.config, service) } diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy index b29bf0f..bfc6954 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy @@ -114,52 +114,38 @@ class NomadService implements Closeable{ } - String getJobState(String jobId){ + TaskState getTaskState(String jobId){ try { List allocations = safeExecutor.apply { jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null, null) } - AllocationListStub last = allocations?.sort { + AllocationListStub last = allocations ? allocations.sort { it.modifyIndex - }?.last() - String currentState = last?.taskStates?.values()?.last()?.state - log.debug "Task $jobId , state=$currentState" - currentState ?: "Unknown" + }?.last() : null + TaskState currentState = last?.taskStates?.values()?.last() + log.debug "Task $jobId , state=${currentState.state}" + currentState }catch(Exception e){ log.debug("[NOMAD] Failed to get jobState ${jobId} -- Cause: ${e.message ?: e}", e) - "dead" + new TaskState(state: "unknow") } } - boolean checkIfRunning(String jobId){ + String getJobStatus(String jobId){ try { Job job = safeExecutor.apply { jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null) } log.debug "[NOMAD] checkIfRunning jobID=$job.ID; status=$job.status" - job.status == "running" + job.status }catch (Exception e){ log.debug("[NOMAD] Failed to get jobState ${jobId} -- Cause: ${e.message ?: e}", e) - false - } - } - - boolean checkIfDead(String jobId){ - try{ - Job job = safeExecutor.apply { - jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace, - null, null, null, null, null, null, null) - } - log.debug "[NOMAD] checkIfDead jobID=$job.ID; status=$job.status" - job.status == "dead" - }catch (Exception e){ - log.debug("[NOMAD] Failed to get job ${jobId} -- Cause: ${e.message ?: e}", e) - true + "Unknow" } } diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadTaskHandler.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadTaskHandler.groovy index c5a91ae..63d9931 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadTaskHandler.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadTaskHandler.groovy @@ -19,6 +19,7 @@ package nextflow.nomad.executor import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import io.nomadproject.client.model.TaskState import nextflow.exception.ProcessSubmitException import nextflow.exception.ProcessUnrecoverableException import nextflow.executor.BashWrapperBuilder @@ -31,6 +32,7 @@ import nextflow.processor.TaskStatus import nextflow.trace.TraceRecord import nextflow.util.Escape import nextflow.SysEnv +import org.threeten.bp.OffsetDateTime import java.nio.file.Path @@ -51,7 +53,7 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask { private String clientName = null - private String state + private TaskState state private long timestamp @@ -72,41 +74,46 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask { @Override boolean checkIfRunning() { - if(isActive()) { - determineClientNode() + if( !jobName ) throw new IllegalStateException("Missing Nomad Job name -- cannot check if running") + if(isSubmitted()) { + def state = taskState0() + // include `terminated` state to allow the handler status to progress + if( state && ( "running" == state.state || "terminated" == state.state)){ + status = TaskStatus.RUNNING + determineClientNode() + return true + } } - nomadService.checkIfRunning(this.jobName) + return false } @Override boolean checkIfCompleted() { - if (!nomadService.checkIfDead(this.jobName)) { - return false - } + if( !jobName ) throw new IllegalStateException("Missing Nomad Job name -- cannot check if running") - state = taskState0(this.jobName) + def state = taskState0() - final isFinished = [ - "complete", - "failed", - "dead", - "lost"].contains(state) + final isFinished = state && state.finishedAt != null - log.debug "[NOMAD] checkIfCompleted task.name=$task.name; state=$state completed=$isFinished" + log.debug "[NOMAD] checkIfCompleted task.name=$task.name; state=${state?.state} completed=$isFinished" if (isFinished) { // finalize the task task.exitStatus = readExitFile() task.stdout = outputFile task.stderr = errorFile - this.status = TaskStatus.COMPLETED - if (state == "failed" || state == "lost" || state == "unknown") + status = TaskStatus.COMPLETED + if ( !state || state.failed ) { task.error = new ProcessUnrecoverableException() + task.aborted = true + } if (shouldDelete()) { nomadService.jobPurge(this.jobName) } + updateTimestamps(state?.startedAt, state?.finishedAt) + determineClientNode() return true } @@ -180,13 +187,13 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask { return ret } - protected String taskState0(String taskName) { + protected TaskState taskState0() { final now = System.currentTimeMillis() final delta = now - timestamp; if (!status || delta >= 1_000) { - def newState = nomadService.getJobState(jobName) - log.debug "[NOMAD] Check jobState: jobName=$jobName currentState=$state newState=$newState" + def newState = nomadService.getTaskState(jobName) + log.debug "[NOMAD] Check jobState: jobName=$jobName currentState=${state?.state} newState=${newState?.state}" if (newState) { state = newState @@ -229,4 +236,19 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask { return result } + void updateTimestamps(OffsetDateTime start, OffsetDateTime end){ + try { + startTimeMillis = start.toInstant().toEpochMilli() + completeTimeMillis = end.toInstant().toEpochMilli() + } catch( Exception e ) { + // Only update if startTimeMillis hasn't already been set. + // If startTimeMillis _has_ been set, then both startTimeMillis + // and completeTimeMillis will have been set with the normal + // TaskHandler mechanism, so there's no need to reset them here. + if (!startTimeMillis) { + startTimeMillis = System.currentTimeMillis() + completeTimeMillis = System.currentTimeMillis() + } + } + } } diff --git a/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy b/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy index 66f84ad..4bb8427 100644 --- a/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy +++ b/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy @@ -65,7 +65,7 @@ class NomadServiceSpec extends Specification{ mockWebServer.enqueue(new MockResponse() .addHeader("Content-Type", "application/json")); - def state = service.getJobState("theId") + def state = service.getTaskState("theId") def recordedRequest = mockWebServer.takeRequest(); then: @@ -214,7 +214,7 @@ class NomadServiceSpec extends Specification{ """) .addHeader("Content-Type", "application/json")); - state = service.getJobState("theId") + state = service.getTaskState("theId") recordedRequest = mockWebServer.takeRequest(); then: diff --git a/validation/tower/main.nf b/validation/tower/main.nf new file mode 100644 index 0000000..2c53e53 --- /dev/null +++ b/validation/tower/main.nf @@ -0,0 +1,19 @@ +#!/usr/bin/env nextflow + +process sayHello { + container 'ubuntu:20.04' + + input: + val x + output: + stdout + script: + """ + sleep 10 + echo '$x world!' + """ +} + +workflow { + Channel.of('Bonjour', 'Ciao', 'Hello', 'Hola') | sayHello | view +} \ No newline at end of file diff --git a/validation/tower/nextflow.config b/validation/tower/nextflow.config new file mode 100644 index 0000000..fb92b2f --- /dev/null +++ b/validation/tower/nextflow.config @@ -0,0 +1,25 @@ +plugins { + id "nf-nomad@${System.getenv("NOMAD_PLUGIN_VERSION") ?: "latest"}" +} + +process { + executor = "nomad" +} + +tower { + enabled = true + workspaceId = "276172789442513" +} + +nomad { + + client { + address = "http://localhost:4646" + } + + jobs { + deleteOnCompletion = false + volume = { type "host" name "scratchdir" } + } + +}