Skip to content

Commit

Permalink
refactor job status query logic
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Aguilera <jorge@edn.es>
  • Loading branch information
jagedn authored and abhi18av committed Dec 8, 2024
1 parent 1b57a8f commit e04b654
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 45 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ oracle-nomad-cluster
.settings
/validation/nomad_temp/**
/validation/nomad
**/*.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class NomadExecutor extends Executor implements ExtensionPoint {

@Override
TaskHandler createTaskHandler(TaskRun task) {
assert task
assert task.workDir
log.trace "[NOMAD] launching process > ${task.name} -- work folder: ${task.workDirStr}"
return new NomadTaskHandler(task, this.config, service)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,52 +114,38 @@ class NomadService implements Closeable{
}


String getJobState(String jobId){
TaskState getTaskState(String jobId){
try {
List<AllocationListStub> allocations = safeExecutor.apply {
jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace,
null, null, null, null, null, null,
null, null)
}
AllocationListStub last = allocations?.sort {
AllocationListStub last = allocations ? allocations.sort {
it.modifyIndex
}?.last()
String currentState = last?.taskStates?.values()?.last()?.state
log.debug "Task $jobId , state=$currentState"
currentState ?: "Unknown"
}?.last() : null
TaskState currentState = last?.taskStates?.values()?.last()
log.debug "Task $jobId , state=${currentState.state}"
currentState
}catch(Exception e){
log.debug("[NOMAD] Failed to get jobState ${jobId} -- Cause: ${e.message ?: e}", e)
"dead"
new TaskState(state: "unknow")
}
}



boolean checkIfRunning(String jobId){
String getJobStatus(String jobId){
try {
Job job = safeExecutor.apply {
jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace,
null, null, null, null, null, null, null)
}
log.debug "[NOMAD] checkIfRunning jobID=$job.ID; status=$job.status"
job.status == "running"
job.status
}catch (Exception e){
log.debug("[NOMAD] Failed to get jobState ${jobId} -- Cause: ${e.message ?: e}", e)
false
}
}

boolean checkIfDead(String jobId){
try{
Job job = safeExecutor.apply {
jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace,
null, null, null, null, null, null, null)
}
log.debug "[NOMAD] checkIfDead jobID=$job.ID; status=$job.status"
job.status == "dead"
}catch (Exception e){
log.debug("[NOMAD] Failed to get job ${jobId} -- Cause: ${e.message ?: e}", e)
true
"Unknow"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nextflow.nomad.executor

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.nomadproject.client.model.TaskState
import nextflow.exception.ProcessSubmitException
import nextflow.exception.ProcessUnrecoverableException
import nextflow.executor.BashWrapperBuilder
Expand All @@ -31,6 +32,7 @@ import nextflow.processor.TaskStatus
import nextflow.trace.TraceRecord
import nextflow.util.Escape
import nextflow.SysEnv
import org.threeten.bp.OffsetDateTime

import java.nio.file.Path

Expand All @@ -51,7 +53,7 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {

private String clientName = null

private String state
private TaskState state

private long timestamp

Expand All @@ -72,41 +74,46 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {

@Override
boolean checkIfRunning() {
if(isActive()) {
determineClientNode()
if( !jobName ) throw new IllegalStateException("Missing Nomad Job name -- cannot check if running")
if(isSubmitted()) {
def state = taskState0()
// include `terminated` state to allow the handler status to progress
if( state && ( "running" == state.state || "terminated" == state.state)){
status = TaskStatus.RUNNING
determineClientNode()
return true
}
}
nomadService.checkIfRunning(this.jobName)
return false
}

@Override
boolean checkIfCompleted() {
if (!nomadService.checkIfDead(this.jobName)) {
return false
}
if( !jobName ) throw new IllegalStateException("Missing Nomad Job name -- cannot check if running")

state = taskState0(this.jobName)
def state = taskState0()

final isFinished = [
"complete",
"failed",
"dead",
"lost"].contains(state)
final isFinished = state && state.finishedAt != null

log.debug "[NOMAD] checkIfCompleted task.name=$task.name; state=$state completed=$isFinished"
log.debug "[NOMAD] checkIfCompleted task.name=$task.name; state=${state?.state} completed=$isFinished"

if (isFinished) {
// finalize the task
task.exitStatus = readExitFile()
task.stdout = outputFile
task.stderr = errorFile
this.status = TaskStatus.COMPLETED
if (state == "failed" || state == "lost" || state == "unknown")
status = TaskStatus.COMPLETED
if ( !state || state.failed ) {
task.error = new ProcessUnrecoverableException()
task.aborted = true
}

if (shouldDelete()) {
nomadService.jobPurge(this.jobName)
}

updateTimestamps(state?.startedAt, state?.finishedAt)
determineClientNode()
return true
}

Expand Down Expand Up @@ -180,13 +187,13 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {
return ret
}

protected String taskState0(String taskName) {
protected TaskState taskState0() {
final now = System.currentTimeMillis()
final delta = now - timestamp;
if (!status || delta >= 1_000) {

def newState = nomadService.getJobState(jobName)
log.debug "[NOMAD] Check jobState: jobName=$jobName currentState=$state newState=$newState"
def newState = nomadService.getTaskState(jobName)
log.debug "[NOMAD] Check jobState: jobName=$jobName currentState=${state?.state} newState=${newState?.state}"

if (newState) {
state = newState
Expand Down Expand Up @@ -229,4 +236,19 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {
return result
}

void updateTimestamps(OffsetDateTime start, OffsetDateTime end){
try {
startTimeMillis = start.toInstant().toEpochMilli()
completeTimeMillis = end.toInstant().toEpochMilli()
} catch( Exception e ) {
// Only update if startTimeMillis hasn't already been set.
// If startTimeMillis _has_ been set, then both startTimeMillis
// and completeTimeMillis will have been set with the normal
// TaskHandler mechanism, so there's no need to reset them here.
if (!startTimeMillis) {
startTimeMillis = System.currentTimeMillis()
completeTimeMillis = System.currentTimeMillis()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class NomadServiceSpec extends Specification{
mockWebServer.enqueue(new MockResponse()
.addHeader("Content-Type", "application/json"));

def state = service.getJobState("theId")
def state = service.getTaskState("theId")
def recordedRequest = mockWebServer.takeRequest();

then:
Expand Down Expand Up @@ -214,7 +214,7 @@ class NomadServiceSpec extends Specification{
""")
.addHeader("Content-Type", "application/json"));

state = service.getJobState("theId")
state = service.getTaskState("theId")
recordedRequest = mockWebServer.takeRequest();

then:
Expand Down
19 changes: 19 additions & 0 deletions validation/tower/main.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env nextflow

process sayHello {
container 'ubuntu:20.04'

input:
val x
output:
stdout
script:
"""
sleep 10
echo '$x world!'
"""
}

workflow {
Channel.of('Bonjour', 'Ciao', 'Hello', 'Hola') | sayHello | view
}
25 changes: 25 additions & 0 deletions validation/tower/nextflow.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
plugins {
id "nf-nomad@${System.getenv("NOMAD_PLUGIN_VERSION") ?: "latest"}"
}

process {
executor = "nomad"
}

tower {
enabled = true
workspaceId = "276172789442513"
}

nomad {

client {
address = "http://localhost:4646"
}

jobs {
deleteOnCompletion = false
volume = { type "host" name "scratchdir" }
}

}

0 comments on commit e04b654

Please sign in to comment.