Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ibodrov committed Dec 24, 2024
1 parent b8aa6d5 commit cddfc83
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@
import com.google.inject.Injector;
import com.walmartlabs.concord.agent.cfg.AgentConfiguration;
import com.walmartlabs.concord.agent.guice.WorkerModule;
import com.walmartlabs.concord.common.IOUtils;
import com.walmartlabs.concord.server.queueclient.message.ProcessResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.nio.file.Paths;

import static com.walmartlabs.concord.client2.ProcessEntry.StatusEnum.CANCELLED;
import static com.walmartlabs.concord.client2.ProcessEntry.StatusEnum.FAILED;
import static java.util.Objects.requireNonNull;

public class OneShotRunner {
Expand All @@ -57,22 +55,17 @@ public void run(String processResponseJson) throws Exception {
var processResponse = objectMapper.readValue(processResponseJson, ProcessResponse.class);
log.info("run [{}] -> preparing...", processResponse.getProcessId());

var workDir = Paths.get(System.getProperty("user.dir"));
var workDir = IOUtils.createTempDir(agentCfg.getPayloadDir(), "workDir");
var jobRequest = JobRequest.from(processResponse, workDir);

var instanceId = jobRequest.getInstanceId();

var workerModule = new WorkerModule(agentCfg.getAgentId(), instanceId, jobRequest.getSessionToken());
var workerFactory = injector.createChildInjector(workerModule).getInstance(WorkerFactory.class);

var worker = workerFactory.create(jobRequest, status -> {
log.info("run ['{}'] -> {}", instanceId, status);
var exitCode = 0;
if (status == FAILED || status == CANCELLED) {
exitCode = 1;
}
System.exit(exitCode);
});

worker.setThrowOnFailure(true);
worker.run();
}
}
12 changes: 12 additions & 0 deletions agent/src/main/java/com/walmartlabs/concord/agent/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class Worker implements Runnable {
private final JobRequest jobRequest;

private JobInstance jobInstance;
private boolean throwOnFailure; // TODO find a better place

@Inject
public Worker(RepositoryManager repositoryManager,
Expand Down Expand Up @@ -119,6 +120,10 @@ public void cancel() {
jobInstance.cancel();
}

public void setThrowOnFailure(boolean throwOnFailure) {
this.throwOnFailure = throwOnFailure;
}

private void handleError(UUID instanceId, Throwable error) {
StatusEnum status = StatusEnum.FAILED;

Expand All @@ -131,6 +136,13 @@ private void handleError(UUID instanceId, Throwable error) {

onStatusChange(instanceId, status);
log.info("handleError ['{}'] -> done", instanceId);

if (throwOnFailure) {
if (error instanceof RuntimeException re) {
throw re;
}
throw new RuntimeException(error);
}
}

private void onStatusChange(UUID instanceId, StatusEnum status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.nio.file.AtomicMoveNotSupportedException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
Expand Down Expand Up @@ -513,7 +514,12 @@ protected ProcessEntry startOneTime(RunnerJob job, String[] cmd) throws IOExcept
// the job's payload directory, contains all files from the state snapshot including imports
Path src = job.getPayloadDir();

Files.move(src, workDir, StandardCopyOption.ATOMIC_MOVE);
try {
Files.move(src, workDir, StandardCopyOption.ATOMIC_MOVE);
} catch (AtomicMoveNotSupportedException e) {
log.error("startOneTime ['{}'] -> unable to move {} to {} atomically", job.getInstanceId(), src, workDir);
throw e;
}

writeInstanceId(job.getInstanceId(), workDir);

Expand Down
1 change: 1 addition & 0 deletions agent/src/main/resources/concord-agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ concord-agent {
# the value might not longer be valid if the process restarts and
# gets a new Agent.
workDirBase = "/tmp/concord-agent/workDirs"
workDirBase = ${?WORK_DIR_BASE}

# directory to store the process logs
# created automatically if not specified
Expand Down

0 comments on commit cddfc83

Please sign in to comment.