Skip to content

Commit

Permalink
Implement GNU jobserver posix client support
Browse files Browse the repository at this point in the history
The core principle of a jobserver is simple:
Before starting a new job (edge in ninja-speak),
a token must be acquired from an external entity as approval.

Once a job is finished, the token must be returned to signal a free thread.
In the case of GNU Make, this external entity is the parent process
which has executed ninja and is managing the load capacity for
all subprocesses which it has spawned. Introducing client support
for this model allows ninja to give load capacity management
to this parent process, allowing it to control the number of
subprocesses that ninja spawns at any given time.

This functionality is desirable when ninja is used as part of a bigger build,
such as builds with Yocto/OpenEmbedded, Openwrt/Linux, Buildroot and Android.
Here, multiple compile jobs are executed in parallel
in order to maximize cpu utilization, but if each compile job
uses all available cores, the system is overloaded.

This implementation instantiates the client in real_main()
and passes references to the class into other classes.
All tokens are returned whenever the CommandRunner aborts,
and the current number of tokens compared to the current number
of running subprocesses controls the available load capacity,
used to determine how many new tokens to attempt to acquire
in order to start another job for each loop to find work.

Co-authored-by: Martin Hundebøll <martin@geanix.com>
Signed-off-by: Michael Pratt <mcpratt@pm.me>
  • Loading branch information
mcprat and hundeboll committed Aug 10, 2024
1 parent dcefb83 commit c1c6829
Show file tree
Hide file tree
Showing 8 changed files with 340 additions and 16 deletions.
5 changes: 4 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,10 @@ if(WIN32)
# errors by telling windows.h to not define those two.
add_compile_definitions(NOMINMAX)
else()
target_sources(libninja PRIVATE src/subprocess-posix.cc)
target_sources(libninja PRIVATE
src/subprocess-posix.cc
src/jobserver-posix.cc
)
if(CMAKE_SYSTEM_NAME STREQUAL "OS400" OR CMAKE_SYSTEM_NAME STREQUAL "AIX")
target_sources(libninja PRIVATE src/getopt.c)
# Build getopt.c, which can be compiled as either C or C++, as C++
Expand Down
1 change: 1 addition & 0 deletions configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@ def has_re2c() -> bool:
objs += cc('getopt')
else:
objs += cxx('subprocess-posix')
objs += cxx('jobserver-posix')
if platform.is_aix():
objs += cc('getopt')
if platform.is_msvc():
Expand Down
55 changes: 49 additions & 6 deletions src/build.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,23 @@ Edge* Plan::FindWork() {
if (ready_.empty())
return NULL;

// TODO: jobserver client support for Windows
#ifndef _WIN32
// Only initiate work if the jobserver can acquire a token.
if (builder_->jobserver_.Enabled() && !builder_->jobserver_.Acquire()) {
return NULL;
}
#endif

Edge* work = ready_.top();
ready_.pop();

// TODO: jobserver client support for Windows
#ifndef _WIN32
// Mark this edge as using a job token to be released when work is finished.
work->has_job_token_ = builder_->jobserver_.Enabled();
#endif

return work;
}

Expand Down Expand Up @@ -201,6 +216,15 @@ bool Plan::EdgeFinished(Edge* edge, EdgeResult result, string* err) {
edge->pool()->EdgeFinished(*edge);
edge->pool()->RetrieveReadyEdges(&ready_);

// TODO: jobserver client support for Windows
#ifndef _WIN32
// If jobserver is used, return the token for this job.
if (edge->has_job_token_) {
builder_->jobserver_.Release();
edge->has_job_token_ = false;
}
#endif

// The rest of this function only applies to successful commands.
if (result != kEdgeSucceeded)
return true;
Expand Down Expand Up @@ -594,7 +618,9 @@ void Plan::Dump() const {
}

struct RealCommandRunner : public CommandRunner {
explicit RealCommandRunner(const BuildConfig& config) : config_(config) {}
explicit RealCommandRunner(const BuildConfig& config, Jobserver& jobserver) :
config_(config), jobserver_(jobserver) {}

virtual ~RealCommandRunner() {}
virtual size_t CanRunMore() const;
virtual bool StartCommand(Edge* edge);
Expand All @@ -603,6 +629,7 @@ struct RealCommandRunner : public CommandRunner {
virtual void Abort();

const BuildConfig& config_;
Jobserver& jobserver_;
SubprocessSet subprocs_;
map<const Subprocess*, Edge*> subproc_to_edge_;
};
Expand All @@ -617,6 +644,10 @@ vector<Edge*> RealCommandRunner::GetActiveEdges() {

void RealCommandRunner::Abort() {
subprocs_.Clear();
// TODO: jobserver client support for Windows
#ifndef _WIN32
jobserver_.Clear();
#endif
}

size_t RealCommandRunner::CanRunMore() const {
Expand All @@ -631,6 +662,18 @@ size_t RealCommandRunner::CanRunMore() const {
capacity = load_capacity;
}

// TODO: jobserver client support for Windows
#ifndef _WIN32
int job_tokens = jobserver_.Tokens();

// When initialized, behave as if the implicit token is acquired already.
// Otherwise, this occurs after a token is released but before it is replaced,
// so the base capacity is represented by job_tokens + 1 when positive.
// Add an extra loop on capacity for each job in order to get an extra token.
if (job_tokens)
capacity = abs(job_tokens) - subproc_number + 2;
#endif

if (capacity < 0)
capacity = 0;

Expand Down Expand Up @@ -670,10 +713,10 @@ bool RealCommandRunner::WaitForCommand(Result* result) {
return true;
}

Builder::Builder(State* state, const BuildConfig& config, BuildLog* build_log,
DepsLog* deps_log, DiskInterface* disk_interface,
Status* status, int64_t start_time_millis)
: state_(state), config_(config), plan_(this), status_(status),
Builder::Builder(State* state, const BuildConfig& config, Jobserver& jobserver,
BuildLog* build_log, DepsLog* deps_log, DiskInterface* disk_interface,
Status* status, int64_t start_time_millis) : plan_(this),
state_(state), config_(config), jobserver_(jobserver), status_(status),
start_time_millis_(start_time_millis), disk_interface_(disk_interface),
explanations_(g_explaining ? new Explanations() : nullptr),
scan_(state, build_log, deps_log, disk_interface,
Expand Down Expand Up @@ -778,7 +821,7 @@ bool Builder::Build(string* err) {
if (config_.dry_run)
command_runner_.reset(new DryRunCommandRunner);
else
command_runner_.reset(new RealCommandRunner(config_));
command_runner_.reset(new RealCommandRunner(config_, jobserver_));
}

// We are about to start the build process.
Expand Down
8 changes: 5 additions & 3 deletions src/build.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "depfile_parser.h"
#include "exit_status.h"
#include "graph.h"
#include "jobserver.h"
#include "util.h" // int64_t

struct BuildLog;
Expand Down Expand Up @@ -187,9 +188,9 @@ struct BuildConfig {

/// Builder wraps the build process: starting commands, updating status.
struct Builder {
Builder(State* state, const BuildConfig& config, BuildLog* build_log,
DepsLog* deps_log, DiskInterface* disk_interface, Status* status,
int64_t start_time_millis);
Builder(State* state, const BuildConfig& config, Jobserver& jobserver,
BuildLog* build_log, DepsLog* deps_log, DiskInterface* disk_interface,
Status* status, int64_t start_time_millis);
~Builder();

/// Clean up after interrupted commands by deleting output files.
Expand Down Expand Up @@ -224,6 +225,7 @@ struct Builder {

State* state_;
const BuildConfig& config_;
Jobserver& jobserver_;
Plan plan_;
std::unique_ptr<CommandRunner> command_runner_;
Status* status_;
Expand Down
1 change: 1 addition & 0 deletions src/graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ struct Edge {
bool deps_loaded_ = false;
bool deps_missing_ = false;
bool generated_by_dep_loader_ = false;
bool has_job_token_ = false;
TimeStamp command_start_time_ = 0;

const Rule& rule() const { return *rule_; }
Expand Down
160 changes: 160 additions & 0 deletions src/jobserver-posix.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Copyright 2024 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "jobserver.h"

#include <fcntl.h>
#include <unistd.h>

#include <cassert>
#include <cstring>

#include "util.h"

Jobserver::Jobserver() {
assert(rfd_ < 0);
assert(wfd_ < 0);

// Return early if no makeflags are passed in the environment
char* makeflags = std::getenv("MAKEFLAGS");
if (makeflags == nullptr) {
return;
}

// Tokenize string to characters in flag_, then words in flags_
while (flag_char_ < strlen(makeflags)) {
while (flag_char_ < strlen(makeflags) &&
!isblank(static_cast<unsigned char>(makeflags[flag_char_]))) {
flag_.push_back(static_cast<unsigned char>(makeflags[flag_char_]));
flag_char_++;
}

if (flag_.size() > 0)
flags_.push_back(flag_);

flag_.clear();
flag_char_++;
}

// Search for --jobserver-auth
for (size_t n = 0; n < flags_.size(); n++)
if (flags_[n].find(AUTH_KEY) == 0)
flag_ = flags_[n].substr(strlen(AUTH_KEY));

// Fallback to --jobserver-fds
if (flag_.empty())
for (size_t n = 0; n < flags_.size(); n++)
if (flags_[n].find(FDS_KEY) == 0)
flag_ = flags_[n].substr(strlen(FDS_KEY));

jobserver_name_.assign(flag_);

const char* jobserver = jobserver_name_.c_str();

// Return early if the flag's value is empty
if (jobserver_name_.empty()) {
Warning("invalid jobserver value: '%s'", jobserver);
return;
}

jobserver_fifo_ = jobserver_name_.find(FIFO_KEY) == 0 ? true : false;

// Return early if jobserver type is unknown (neither fifo nor pipe)
if (!jobserver_fifo_ && sscanf(jobserver, "%d,%d", &rfd_, &wfd_) != 2) {
Warning("invalid jobserver value: '%s'", jobserver);
return;
}

if (jobserver_fifo_) {
rfd_ = open(jobserver + strlen(FIFO_KEY), O_RDONLY | O_NONBLOCK);
wfd_ = open(jobserver + strlen(FIFO_KEY), O_WRONLY);
}

if (rfd_ == -1 || wfd_ == -1)
Fatal("failed to open jobserver: %s: %s",
jobserver, errno ? strerror(errno) : "Bad file descriptor");
else if (rfd_ > 0 && wfd_ > 0)
Info("using jobserver: %s", jobserver);
else
Fatal("Make provided an invalid pipe: %s (mark the command as recursive)",
jobserver);

// Signal that we have initialized but do not have a token yet
token_count_ = -1;
}

Jobserver::~Jobserver() {
Clear();

if (rfd_ >= 0)
close(rfd_);
if (wfd_ >= 0)
close(wfd_);

rfd_ = -1;
wfd_ = -1;
}

bool Jobserver::Enabled() const {
return rfd_ >= 0 && wfd_ >= 0;
}

int Jobserver::Tokens() const {
return token_count_;
}

bool Jobserver::Acquire() {
// The first token is implicitly handed to a process
if (token_count_ <= 0) {
token_count_ = 1;
return true;
}

char token;
int ret = read(rfd_, &token, 1);
if (ret < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
if (!jobserver_fifo_)
Warning("Make closed the pipe: %d (mark the command as recursive)", rfd_);
Fatal("failed to read token from jobserver: %d: %s", rfd_, strerror(errno));
}

if (ret > 0)
token_count_++;

return ret > 0;
}

void Jobserver::Release() {
if (token_count_ < 0)
token_count_ = 0;
if (token_count_ > 0)
token_count_--;

// The first token is implicitly handed to a process
if (token_count_ == 0)
return;

char token = '+';
int ret = write(wfd_, &token, 1);
if (ret != 1) {
if (!jobserver_fifo_)
Warning("Make closed the pipe: %d (mark the command as recursive)", wfd_);
Fatal("failed to return token to jobserver: %d: %s", rfd_, strerror(errno));
}
}

void Jobserver::Clear() {
while (token_count_)
Release();
}
Loading

0 comments on commit c1c6829

Please sign in to comment.