Skip to content

Commit

Permalink
[cuebot] Move dispatcher memory properties to opencue.properties (Aca…
Browse files Browse the repository at this point in the history
…demySoftwareFoundation#1570)

Memory properties constantly need to be tuned according to farm
requirements, which makes it a good candidate for becoming a property
instead of a hardcoded constant.
  • Loading branch information
DiegoTavares authored Nov 5, 2024
1 parent 5d39b26 commit a59bf82
Show file tree
Hide file tree
Showing 20 changed files with 261 additions and 116 deletions.
4 changes: 2 additions & 2 deletions cuebot/src/main/java/com/imageworks/spcue/ServiceEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ public class ServiceEntity extends Entity {
/**
* Determines the default minimum memory per frame.
*/
public long minMemory = Dispatcher.MEM_RESERVED_DEFAULT;
public long minMemory = Dispatcher.MEM_SERVICE_RESERVED_DEFAULT;

/**
* Determines the default minimum gpu per frame.
*/
public long minGpuMemory = Dispatcher.MEM_GPU_RESERVED_DEFAULT;
public long minGpuMemory = Dispatcher.MEM_SERVICE_GPU_RESERVED_DEFAULT;

/**
* Determines the default tags.
Expand Down
2 changes: 1 addition & 1 deletion cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ else if (proc.coresReserved >= 100) {
proc.coresReserved = wholeCores * 100;
} else {
if (frame.threadable) {
if (selfishServices != null &&
if (selfishServices != null &&
frame.services != null &&
containsSelfishService(frame.services.split(","), selfishServices)){
proc.coresReserved = wholeCores * 100;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.CallableStatementCreator;
import org.springframework.jdbc.core.RowMapper;
Expand Down Expand Up @@ -58,6 +60,9 @@

public class HostDaoJdbc extends JdbcDaoSupport implements HostDao {

@Autowired
private Environment env;

public static final RowMapper<HostEntity> HOST_DETAIL_MAPPER = new RowMapper<HostEntity>() {
public HostEntity mapRow(ResultSet rs, int rowNum) throws SQLException {
HostEntity host = new HostEntity();
Expand Down Expand Up @@ -324,9 +329,12 @@ public void insertRenderHost(RenderHost host, AllocationInterface a, boolean use
}

long memUnits = convertMemoryUnits(host);
if (memUnits < Dispatcher.MEM_RESERVED_MIN) {
long memReserverMin = env.getRequiredProperty(
"dispatcher.memory.mem_reserved_min",
Long.class);
if (memUnits < memReserverMin) {
throw new EntityCreationError("could not create host " + host.getName() + ", " +
" must have at least " + Dispatcher.MEM_RESERVED_MIN + " free memory.");
" must have at least " + memReserverMin + " free memory.");
}

String fqdn;
Expand Down Expand Up @@ -727,18 +735,21 @@ private long convertMemoryUnits(RenderHost host) {

long memUnits;
if (host.getTagsList().contains("64bit")) {
memUnits = CueUtil.convertKbToFakeKb64bit(host.getTotalMem());
memUnits = CueUtil.convertKbToFakeKb64bit(env, host.getTotalMem());
}
else {
memUnits = CueUtil.convertKbToFakeKb32bit(host.getTotalMem());
memUnits = CueUtil.convertKbToFakeKb32bit(env, host.getTotalMem());
}

/*
* If this is a desktop, we'll just cut the memory
* so we don't annoy the user.
*/
if (host.getNimbyEnabled()) {
memUnits = (long) (memUnits / 1.5) + Dispatcher.MEM_RESERVED_SYSTEM;
long memReservedSystem = env.getRequiredProperty(
"dispatcher.memory.mem_reserved_system",
Long.class);
memUnits = (long) (memUnits / 1.5) + memReservedSystem;
}

return memUnits;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.support.JdbcDaoSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;

import com.imageworks.spcue.ExecutionSummary;
import com.imageworks.spcue.FrameStateTotals;
Expand All @@ -41,7 +43,6 @@
import com.imageworks.spcue.LayerEntity;
import com.imageworks.spcue.LayerInterface;
import com.imageworks.spcue.LimitEntity;
import com.imageworks.spcue.LimitInterface;
import com.imageworks.spcue.ResourceUsage;
import com.imageworks.spcue.ThreadStats;
import com.imageworks.spcue.dao.LayerDao;
Expand All @@ -56,6 +57,7 @@
import org.apache.logging.log4j.LogManager;

public class LayerDaoJdbc extends JdbcDaoSupport implements LayerDao {
private final long MEM_RESERVED_MIN;
private static final Logger logger = LogManager.getLogger(LayerDaoJdbc.class);
private static final String INSERT_OUTPUT_PATH =
"INSERT INTO " +
Expand All @@ -67,6 +69,14 @@ public class LayerDaoJdbc extends JdbcDaoSupport implements LayerDao {
"str_filespec " +
") VALUES (?,?,?,?)";

@Autowired
public LayerDaoJdbc(Environment env) {
this.MEM_RESERVED_MIN = env.getRequiredProperty(
"dispatcher.memory.mem_reserved_min",
Long.class
);
}

@Override
public void insertLayerOutput(LayerInterface layer, String filespec) {
getJdbcTemplate().update(
Expand Down Expand Up @@ -341,8 +351,8 @@ public void insertLayerDetail(LayerDetail l) {

@Override
public void updateLayerMinMemory(LayerInterface layer, long val) {
if (val < Dispatcher.MEM_RESERVED_MIN) {
val = Dispatcher.MEM_RESERVED_MIN;
if (val < MEM_RESERVED_MIN) {
val = MEM_RESERVED_MIN;
}
getJdbcTemplate().update("UPDATE layer SET int_mem_min=? WHERE pk_layer=?",
val, layer.getLayerId());
Expand Down Expand Up @@ -380,8 +390,8 @@ public boolean balanceLayerMinMemory(LayerInterface layer, long frameMaxRss) {
if (maxrss < frameMaxRss) {
maxrss = frameMaxRss;
}
if (maxrss < Dispatcher.MEM_RESERVED_MIN) {
maxrss = Dispatcher.MEM_RESERVED_MIN;
if (maxrss < MEM_RESERVED_MIN) {
maxrss = MEM_RESERVED_MIN;
} else {
maxrss = maxrss + CueUtil.MB256;
}
Expand Down Expand Up @@ -603,11 +613,11 @@ public long findPastMaxRSS(JobInterface job, String name) {
try {
long maxRss = getJdbcTemplate().queryForObject(FIND_PAST_MAX_RSS,
Long.class, job.getJobId(), name);
if (maxRss >= Dispatcher.MEM_RESERVED_MIN) {
if (maxRss >= MEM_RESERVED_MIN) {
return maxRss;
}
else {
return Dispatcher.MEM_RESERVED_MIN;
return MEM_RESERVED_MIN;
}
} catch (EmptyResultDataAccessException e) {
// Actually want to return 0 here, which means
Expand All @@ -625,8 +635,8 @@ public void updateTags(JobInterface job, String tags, LayerType type) {

@Override
public void updateMinMemory(JobInterface job, long mem, LayerType type) {
if (mem < Dispatcher.MEM_RESERVED_MIN) {
mem = Dispatcher.MEM_RESERVED_MIN;
if (mem < MEM_RESERVED_MIN) {
mem = MEM_RESERVED_MIN;
}
getJdbcTemplate().update(
"UPDATE layer SET int_mem_min=? WHERE pk_job=? AND str_type=?",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.List;
import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.PreparedStatementCreator;
import org.springframework.jdbc.core.RowMapper;
Expand All @@ -45,14 +47,16 @@
import com.imageworks.spcue.dao.ProcDao;
import com.imageworks.spcue.dao.criteria.FrameSearchInterface;
import com.imageworks.spcue.dao.criteria.ProcSearchInterface;
import com.imageworks.spcue.dispatcher.Dispatcher;
import com.imageworks.spcue.dispatcher.ResourceDuplicationFailureException;
import com.imageworks.spcue.dispatcher.ResourceReservationFailureException;
import com.imageworks.spcue.grpc.host.HardwareState;
import com.imageworks.spcue.util.SqlUtil;

public class ProcDaoJdbc extends JdbcDaoSupport implements ProcDao {

@Autowired
private Environment env;

private static final String VERIFY_RUNNING_PROC =
"SELECT " +
"proc.pk_frame " +
Expand Down Expand Up @@ -121,15 +125,21 @@ public boolean deleteVirtualProc(VirtualProc proc) {

public void insertVirtualProc(VirtualProc proc) {
proc.id = SqlUtil.genKeyRandom();
long memReservedMin = env.getRequiredProperty(
"dispatcher.memory.mem_reserved_min",
Long.class);
long memGpuReservedMin = env.getRequiredProperty(
"dispatcher.memory.mem_gpu_reserved_min",
Long.class);
int result = 0;
try {
result = getJdbcTemplate().update(INSERT_VIRTUAL_PROC,
proc.getProcId(), proc.getHostId(), proc.getShowId(),
proc.getLayerId(), proc.getJobId(), proc.getFrameId(),
proc.coresReserved, proc.memoryReserved,
proc.memoryReserved, Dispatcher.MEM_RESERVED_MIN,
proc.memoryReserved, memReservedMin,
proc.gpusReserved, proc.gpuMemoryReserved,
proc.gpuMemoryReserved, Dispatcher.MEM_GPU_RESERVED_MIN,
proc.gpuMemoryReserved, memGpuReservedMin,
proc.isLocalDispatch);

// Update all of the resource counts
Expand Down Expand Up @@ -634,7 +644,10 @@ public boolean balanceUnderUtilizedProcs(ProcInterface targetProc, long targetMe
for (Map<String,Object> map: result) {
String pk_proc = (String) map.get("pk_proc");
Long free_mem = (Long) map.get("free_mem");
long available = free_mem - borrowMap.get(pk_proc) - Dispatcher.MEM_RESERVED_MIN;
long memReservedMin = env.getRequiredProperty(
"dispatcher.memory.mem_reserved_min",
Long.class);
long available = free_mem - borrowMap.get(pk_proc) - memReservedMin;
if (available > memPerFrame) {
borrowMap.put(pk_proc, borrowMap.get(pk_proc) + memPerFrame);
memBorrowedTotal = memBorrowedTotal + memPerFrame;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ public class CoreUnitDispatcher implements Dispatcher {

public boolean testMode = false;

@Autowired
private final long MEM_RESERVED_MIN;
private final long MEM_GPU_RESERVED_DEFAULT;
private final long MEM_GPU_RESERVED_MIN;

private Environment env;

/*
Expand All @@ -108,13 +111,28 @@ public class CoreUnitDispatcher implements Dispatcher {
*/
private Cache<String, String> jobLock;

@Autowired
public CoreUnitDispatcher(Environment env) {
this.env = env;
MEM_RESERVED_MIN = getLongProperty("dispatcher.memory.mem_reserved_min");
MEM_GPU_RESERVED_DEFAULT = getLongProperty("dispatcher.memory.mem_gpu_reserved_default");
MEM_GPU_RESERVED_MIN = getLongProperty("dispatcher.memory.mem_gpu_reserved_min");
}

/*
* Return an integer value from the opencue.properties given a key
*/
private int getIntProperty(String property) {
return env.getRequiredProperty(property, Integer.class);
}

/*
* Return an integer value from the opencue.properties given a key
*/
private long getLongProperty(String property) {
return env.getRequiredProperty(property, Long.class);
}

private Cache<String, String> getOrCreateJobLock() {
if (jobLock == null) {
this.jobLock = CacheBuilder.newBuilder()
Expand All @@ -134,10 +152,10 @@ private List<VirtualProc> dispatchJobs(DispatchHost host, Set<String> jobs) {
for (String jobid: jobs) {

if (!host.hasAdditionalResources(
Dispatcher.CORE_POINTS_RESERVED_MIN,
Dispatcher.MEM_RESERVED_MIN,
Dispatcher.GPU_UNITS_RESERVED_MIN,
Dispatcher.MEM_GPU_RESERVED_MIN)) {
CORE_POINTS_RESERVED_MIN,
MEM_RESERVED_MIN,
GPU_UNITS_RESERVED_MIN,
MEM_GPU_RESERVED_MIN)) {
return procs;
}

Expand Down Expand Up @@ -174,15 +192,13 @@ private List<VirtualProc> dispatchJobs(DispatchHost host, Set<String> jobs) {
private Set<String> getGpuJobs(DispatchHost host, ShowInterface show) {
Set<String> jobs = null;

// TODO: GPU: make index with the 4 components instead of just 3, replace the just 3

// If the host has gpu idle, first do a query to find gpu jobs
// If no gpu jobs found remove resources to leave room for a gpu frame
if (host.hasAdditionalResources(
Dispatcher.CORE_POINTS_RESERVED_DEFAULT,
Dispatcher.MEM_RESERVED_MIN,
this.MEM_RESERVED_MIN,
Dispatcher.GPU_UNITS_RESERVED_DEFAULT,
Dispatcher.MEM_GPU_RESERVED_DEFAULT)) {
this.MEM_GPU_RESERVED_DEFAULT)) {
if (show == null)
jobs = dispatchSupport.findDispatchJobs(host,
getIntProperty("dispatcher.job_query_max"));
Expand Down Expand Up @@ -312,9 +328,9 @@ public void wrapDispatchFrame() {
host.useResources(proc.coresReserved, proc.memoryReserved, proc.gpusReserved, proc.gpuMemoryReserved);
if (!host.hasAdditionalResources(
Dispatcher.CORE_POINTS_RESERVED_MIN,
Dispatcher.MEM_RESERVED_MIN,
MEM_RESERVED_MIN,
Dispatcher.GPU_UNITS_RESERVED_MIN,
Dispatcher.MEM_GPU_RESERVED_MIN)) {
MEM_GPU_RESERVED_MIN)) {
break;
}
else if (procs.size() >= getIntProperty("dispatcher.job_frame_dispatch_max")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,33 +51,15 @@ public interface Dispatcher {
// on the host.
public static final int CORE_LOAD_THRESHOLD = 5;


// The default amount of memory reserved for a frame if no memory
// reservation settings are specified
public static final long MEM_RESERVED_DEFAULT = 3355443;

// The maximum amount of memory that can be requested for a given frame.
public static final long MEM_RESERVED_MAX = CueUtil.GB * 50;

// The minimum amount of memory that can be assigned to a frame.
public static final long MEM_RESERVED_MIN = 262144;

// Memory reserved by system, gets chopped off the available memory
public static final long MEM_RESERVED_SYSTEM = 524288;

// Amount of memory that has to be idle for the rest of the cores
// on the machine to be considered stranded.
public static final long MEM_STRANDED_THRESHHOLD = CueUtil.GB + CueUtil.MB512;

// The default amount of gpu memory reserved for a frame if no gpu memory
// reservation settings are specified
public static final long MEM_GPU_RESERVED_DEFAULT = 0;

// The minimum amount of gpu memory that can be assigned to a frame.
public static final long MEM_GPU_RESERVED_MIN = 0;
// Determines the service default minimum memory per frame.
public static final long MEM_SERVICE_RESERVED_DEFAULT = CueUtil.GB4;

// The maximum amount of gpu memory that can be assigned to a frame.
public static final long MEM_GPU_RESERVED_MAX = CueUtil.GB * 1024;
// Determines the service default minimum gpu per frame.
public static final long MEM_SERVICE_GPU_RESERVED_DEFAULT = 0;

// Return value for cleared frame
public static final int EXIT_STATUS_FRAME_CLEARED = 299;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,10 @@ public void handleHostReport(HostReport report, boolean isBoot) {
bookingManager.removeInactiveLocalHostAssignment(lca);
}
}

long memReservedMin = env.getRequiredProperty(
"dispatcher.memory.mem_reserved_min",
Long.class);

if (!isTempDirStorageEnough(report.getHost().getTotalMcp(), report.getHost().getFreeMcp(), host.os)) {
msg = String.format(
"%s doesn't have enough free space in the temporary directory (mcp), %dMB",
Expand All @@ -264,13 +267,13 @@ else if (coresToReserve <= 0 || host.idleCores < Dispatcher.CORE_POINTS_RESERVED
msg = String.format("%s doesn't have enough idle cores, %d needs %d",
host.name, host.idleCores, Dispatcher.CORE_POINTS_RESERVED_MIN);
}
else if (host.idleMemory < Dispatcher.MEM_RESERVED_MIN) {
else if (host.idleMemory < memReservedMin) {
msg = String.format("%s doesn't have enough idle memory, %d needs %d",
host.name, host.idleMemory, Dispatcher.MEM_RESERVED_MIN);
host.name, host.idleMemory, memReservedMin);
}
else if (report.getHost().getFreeMem() < CueUtil.MB512) {
msg = String.format("%s doesn't have enough free system mem, %d needs %d",
host.name, report.getHost().getFreeMem(), Dispatcher.MEM_RESERVED_MIN);
host.name, report.getHost().getFreeMem(), memReservedMin);
}
else if(!host.hardwareState.equals(HardwareState.UP)) {
msg = host + " is not in the Up state.";
Expand Down
Loading

0 comments on commit a59bf82

Please sign in to comment.