diff --git a/CMakeLists.txt b/CMakeLists.txt index b8fdee7d3a..967aa245f8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -169,7 +169,10 @@ if(WIN32) # errors by telling windows.h to not define those two. add_compile_definitions(NOMINMAX) else() - target_sources(libninja PRIVATE src/subprocess-posix.cc) + target_sources(libninja PRIVATE + src/jobserver-posix.cc + src/subprocess-posix.cc + ) if(CMAKE_SYSTEM_NAME STREQUAL "OS400" OR CMAKE_SYSTEM_NAME STREQUAL "AIX") target_sources(libninja PRIVATE src/getopt.c) # Build getopt.c, which can be compiled as either C or C++, as C++ diff --git a/configure.py b/configure.py index c88daad508..b39cfa6b46 100755 --- a/configure.py +++ b/configure.py @@ -564,6 +564,7 @@ def has_re2c() -> bool: objs += cxx('minidump-win32', variables=cxxvariables) objs += cc('getopt') else: + objs += cxx('jobserver-posix') objs += cxx('subprocess-posix') if platform.is_aix(): objs += cc('getopt') diff --git a/src/build.cc b/src/build.cc index 7e5f790eb9..31a609c5ab 100644 --- a/src/build.cc +++ b/src/build.cc @@ -163,6 +163,16 @@ Edge* Plan::FindWork() { return NULL; Edge* work = ready_.top(); + + // Only initiate work if the jobserver client can acquire a token. + if (builder_ && builder_->jobserver_ && + builder_->jobserver_->Enabled()) { + int job_tokens = builder_->jobserver_->Tokens(); + work->job_token_ = builder_->jobserver_->Acquire(); + if (job_tokens == builder_->jobserver_->Tokens()) + return NULL; + } + ready_.pop(); return work; } @@ -199,6 +209,10 @@ bool Plan::EdgeFinished(Edge* edge, EdgeResult result, string* err) { edge->pool()->EdgeFinished(*edge); edge->pool()->RetrieveReadyEdges(&ready_); + // If jobserver is used, return the token for this job. + if (builder_ && builder_->jobserver_) + builder_->jobserver_->Release(&edge->job_token_); + // The rest of this function only applies to successful commands. if (result != kEdgeSucceeded) return true; @@ -592,14 +606,18 @@ void Plan::Dump() const { } struct RealCommandRunner : public CommandRunner { - explicit RealCommandRunner(const BuildConfig& config) : config_(config) {} + explicit RealCommandRunner(const BuildConfig& config, Jobserver* jobserver) : + config_(config), jobserver_(jobserver) {} + size_t CanRunMore() const override; bool StartCommand(Edge* edge) override; bool WaitForCommand(Result* result) override; vector GetActiveEdges() override; + void ClearJobTokens(const std::vector&) override; void Abort() override; const BuildConfig& config_; + Jobserver* jobserver_; SubprocessSet subprocs_; map subproc_to_edge_; }; @@ -612,7 +630,13 @@ vector RealCommandRunner::GetActiveEdges() { return edges; } +void RealCommandRunner::ClearJobTokens(const std::vector &edges) { + for (Edge* edge : edges) + jobserver_->Release(&edge->job_token_); +} + void RealCommandRunner::Abort() { + ClearJobTokens(GetActiveEdges()); subprocs_.Clear(); } @@ -628,6 +652,14 @@ size_t RealCommandRunner::CanRunMore() const { capacity = load_capacity; } + // When initialized, behave as if the implicit token is acquired already. + // Otherwise, this happens after a token is released but before it is replaced, + // so the base capacity is represented by job_tokens + 1 when positive. + // Add an extra loop on capacity for each job in order to get an extra token. + int job_tokens = jobserver_->Tokens(); + if (job_tokens) + capacity = abs(job_tokens) - subproc_number + 2; + if (capacity < 0) capacity = 0; @@ -667,10 +699,10 @@ bool RealCommandRunner::WaitForCommand(Result* result) { return true; } -Builder::Builder(State* state, const BuildConfig& config, BuildLog* build_log, - DepsLog* deps_log, DiskInterface* disk_interface, - Status* status, int64_t start_time_millis) - : state_(state), config_(config), plan_(this), status_(status), +Builder::Builder(State* state, const BuildConfig& config, Jobserver* jobserver, + BuildLog* build_log, DepsLog* deps_log, DiskInterface* disk_interface, + Status* status, int64_t start_time_millis) : state_(state), + config_(config), jobserver_(jobserver), plan_(this), status_(status), start_time_millis_(start_time_millis), disk_interface_(disk_interface), explanations_(g_explaining ? new Explanations() : nullptr), scan_(state, build_log, deps_log, disk_interface, @@ -775,7 +807,7 @@ bool Builder::Build(string* err) { if (config_.dry_run) command_runner_.reset(new DryRunCommandRunner); else - command_runner_.reset(new RealCommandRunner(config_)); + command_runner_.reset(new RealCommandRunner(config_, jobserver_)); } // We are about to start the build process. diff --git a/src/build.h b/src/build.h index 9bb0c70b5c..586d7c534d 100644 --- a/src/build.h +++ b/src/build.h @@ -24,6 +24,7 @@ #include "depfile_parser.h" #include "exit_status.h" #include "graph.h" +#include "jobserver.h" #include "util.h" // int64_t struct BuildLog; @@ -161,6 +162,7 @@ struct CommandRunner { virtual bool WaitForCommand(Result* result) = 0; virtual std::vector GetActiveEdges() { return std::vector(); } + virtual void ClearJobTokens(const std::vector&) {} virtual void Abort() {} }; @@ -187,9 +189,9 @@ struct BuildConfig { /// Builder wraps the build process: starting commands, updating status. struct Builder { - Builder(State* state, const BuildConfig& config, BuildLog* build_log, - DepsLog* deps_log, DiskInterface* disk_interface, Status* status, - int64_t start_time_millis); + Builder(State* state, const BuildConfig& config, Jobserver* jobserver, + BuildLog* build_log, DepsLog* deps_log, DiskInterface* disk_interface, + Status* status, int64_t start_time_millis); ~Builder(); /// Clean up after interrupted commands by deleting output files. @@ -224,6 +226,7 @@ struct Builder { State* state_; const BuildConfig& config_; + Jobserver* jobserver_; Plan plan_; std::unique_ptr command_runner_; Status* status_; diff --git a/src/build_test.cc b/src/build_test.cc index c84190a040..fb0167e5ae 100644 --- a/src/build_test.cc +++ b/src/build_test.cc @@ -525,6 +525,7 @@ struct FakeCommandRunner : public CommandRunner { virtual bool StartCommand(Edge* edge); virtual bool WaitForCommand(Result* result); virtual vector GetActiveEdges(); + virtual void ClearJobTokens(const std::vector&); virtual void Abort(); vector commands_ran_; @@ -535,12 +536,12 @@ struct FakeCommandRunner : public CommandRunner { struct BuildTest : public StateTestWithBuiltinRules, public BuildLogUser { BuildTest() : config_(MakeConfig()), command_runner_(&fs_), status_(config_), - builder_(&state_, config_, NULL, NULL, &fs_, &status_, 0) { + builder_(&state_, config_, NULL, NULL, NULL, &fs_, &status_, 0) { } explicit BuildTest(DepsLog* log) : config_(MakeConfig()), command_runner_(&fs_), status_(config_), - builder_(&state_, config_, NULL, log, &fs_, &status_, 0) {} + builder_(&state_, config_, NULL, NULL, log, &fs_, &status_, 0) {} virtual void SetUp() { StateTestWithBuiltinRules::SetUp(); @@ -610,7 +611,7 @@ void BuildTest::RebuildTarget(const string& target, const char* manifest, pdeps_log = &deps_log; } - Builder builder(pstate, config_, pbuild_log, pdeps_log, &fs_, &status_, 0); + Builder builder(pstate, config_, NULL, pbuild_log, pdeps_log, &fs_, &status_, 0); EXPECT_TRUE(builder.AddTarget(target, &err)); command_runner_.commands_ran_.clear(); @@ -797,7 +798,13 @@ vector FakeCommandRunner::GetActiveEdges() { return active_edges_; } +void FakeCommandRunner::ClearJobTokens(const std::vector &edges) { + for (Edge* edge : edges) + edge->job_token_ = '\0'; +} + void FakeCommandRunner::Abort() { + ClearJobTokens(GetActiveEdges()); active_edges_.clear(); } @@ -2559,7 +2566,7 @@ TEST_F(BuildWithDepsLogTest, Straightforward) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); EXPECT_TRUE(builder.AddTarget("out", &err)); ASSERT_EQ("", err); @@ -2589,7 +2596,7 @@ TEST_F(BuildWithDepsLogTest, Straightforward) { ASSERT_TRUE(deps_log.Load(deps_log_file_.path(), &state, &err)); ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); EXPECT_TRUE(builder.AddTarget("out", &err)); @@ -2630,7 +2637,7 @@ TEST_F(BuildWithDepsLogTest, ObsoleteDeps) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); EXPECT_TRUE(builder.AddTarget("out", &err)); ASSERT_EQ("", err); @@ -2659,7 +2666,7 @@ TEST_F(BuildWithDepsLogTest, ObsoleteDeps) { ASSERT_TRUE(deps_log.Load(deps_log_file_.path(), &state, &err)); ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); EXPECT_TRUE(builder.AddTarget("out", &err)); @@ -2695,7 +2702,7 @@ TEST_F(BuildWithDepsLogTest, DepsIgnoredInDryRun) { // The deps log is NULL in dry runs. config_.dry_run = true; - Builder builder(&state, config_, NULL, NULL, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, NULL, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); @@ -2730,7 +2737,7 @@ TEST_F(BuildWithDepsLogTest, TestInputMtimeRaceCondition) { BuildLog::LogEntry* log_entry = NULL; { - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); @@ -2750,7 +2757,7 @@ TEST_F(BuildWithDepsLogTest, TestInputMtimeRaceCondition) { } { - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); @@ -2772,7 +2779,7 @@ TEST_F(BuildWithDepsLogTest, TestInputMtimeRaceCondition) { } { - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); @@ -2811,7 +2818,7 @@ TEST_F(BuildWithDepsLogTest, TestInputMtimeRaceConditionWithDepFile) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); { - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); // Run the build, out gets built, dep file is created @@ -2832,7 +2839,7 @@ TEST_F(BuildWithDepsLogTest, TestInputMtimeRaceConditionWithDepFile) { { // Trigger the build again - "out" will rebuild since its newest input mtime (header.h) // is newer than the recorded mtime of out in the build log - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); @@ -2848,7 +2855,7 @@ TEST_F(BuildWithDepsLogTest, TestInputMtimeRaceConditionWithDepFile) { { // Trigger the build again - "out" won't rebuild since the file wasn't updated during // the previous build - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); @@ -2867,7 +2874,7 @@ TEST_F(BuildWithDepsLogTest, TestInputMtimeRaceConditionWithDepFile) { { // Rebuild. This time, long-cc will cause header.h to be updated while the build is // in progress - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); @@ -2883,7 +2890,7 @@ TEST_F(BuildWithDepsLogTest, TestInputMtimeRaceConditionWithDepFile) { { // Rebuild. Because header.h is now in the deplog for out, it should be detectable as // a change-while-in-progress and should cause a rebuild of out. - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); @@ -2899,7 +2906,7 @@ TEST_F(BuildWithDepsLogTest, TestInputMtimeRaceConditionWithDepFile) { { // This time, the header.h file was not updated during the build, so the target should // not be considered dirty. - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); @@ -2957,7 +2964,7 @@ TEST_F(BuildWithDepsLogTest, RestatDepfileDependencyDepsLog) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); EXPECT_TRUE(builder.AddTarget("out", &err)); ASSERT_EQ("", err); @@ -2983,7 +2990,7 @@ TEST_F(BuildWithDepsLogTest, RestatDepfileDependencyDepsLog) { ASSERT_TRUE(deps_log.Load(deps_log_file_.path(), &state, &err)); ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); EXPECT_TRUE(builder.AddTarget("out", &err)); @@ -3016,7 +3023,7 @@ TEST_F(BuildWithDepsLogTest, DepFileOKDepsLog) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); EXPECT_TRUE(builder.AddTarget("fo o.o", &err)); ASSERT_EQ("", err); @@ -3037,7 +3044,7 @@ TEST_F(BuildWithDepsLogTest, DepFileOKDepsLog) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); Edge* edge = state.edges_.back(); @@ -3087,7 +3094,7 @@ TEST_F(BuildWithDepsLogTest, DiscoveredDepDuringBuildChanged) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); EXPECT_TRUE(builder.AddTarget("out2", &err)); EXPECT_FALSE(builder.AlreadyUpToDate()); @@ -3111,7 +3118,7 @@ TEST_F(BuildWithDepsLogTest, DiscoveredDepDuringBuildChanged) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); EXPECT_TRUE(builder.AddTarget("out2", &err)); EXPECT_FALSE(builder.AlreadyUpToDate()); @@ -3134,7 +3141,7 @@ TEST_F(BuildWithDepsLogTest, DiscoveredDepDuringBuildChanged) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); EXPECT_TRUE(builder.AddTarget("out2", &err)); EXPECT_TRUE(builder.AlreadyUpToDate()); @@ -3162,7 +3169,7 @@ TEST_F(BuildWithDepsLogTest, DepFileDepsLogCanonicalize) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); EXPECT_TRUE(builder.AddTarget("a/b/c/d/e/fo o.o", &err)); ASSERT_EQ("", err); @@ -3185,7 +3192,7 @@ TEST_F(BuildWithDepsLogTest, DepFileDepsLogCanonicalize) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); state.GetNode("bar.h", 0)->MarkDirty(); // Mark bar.h as missing. @@ -4264,7 +4271,7 @@ TEST_F(BuildWithDepsLogTest, ValidationThroughDepfile) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); EXPECT_TRUE(builder.AddTarget("out2", &err)); @@ -4300,7 +4307,7 @@ TEST_F(BuildWithDepsLogTest, ValidationThroughDepfile) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); EXPECT_TRUE(builder.AddTarget("out2", &err)); diff --git a/src/graph.h b/src/graph.h index 314c44296a..3668b22280 100644 --- a/src/graph.h +++ b/src/graph.h @@ -227,6 +227,7 @@ struct Edge { bool deps_loaded_ = false; bool deps_missing_ = false; bool generated_by_dep_loader_ = false; + unsigned char job_token_ = '\0'; TimeStamp command_start_time_ = 0; const Rule& rule() const { return *rule_; } diff --git a/src/jobserver-posix.cc b/src/jobserver-posix.cc new file mode 100644 index 0000000000..f983248825 --- /dev/null +++ b/src/jobserver-posix.cc @@ -0,0 +1,160 @@ +// Copyright 2024 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "jobserver.h" + +#include +#include + +#include +#include + +#include "util.h" + +// Declare complete type for static constants in class. +constexpr char const Jobserver::kAuthKey[]; +constexpr char const Jobserver::kFdsKey[]; +constexpr char const Jobserver::kFifoKey[]; + +PosixJobserverClient::PosixJobserverClient() { + assert(!Enabled()); + + // Set name, type of pipe, and if non-parallel from MAKEFLAGS. + Parse(); + + const char* jobserver = jobserver_name_.c_str(); + + // Warn if jobserver type is unknown (neither fifo nor pipe). + if (!jobserver_fifo_ && sscanf(jobserver, "%d,%d", &rfd_, &wfd_) != 2) + if (!jobserver_name_.empty()) + Warning("invalid jobserver auth: '%s'", jobserver); + + // Open FDs to the pipe if needed, read must be non-blocking. + // If passed FDs are blocking on read, force non-parallel build. + if (jobserver_fifo_) { + rfd_ = open(jobserver + strlen(kFifoKey), O_RDONLY | O_NONBLOCK); + wfd_ = open(jobserver + strlen(kFifoKey), O_WRONLY); + } else if (Enabled() && (fcntl(rfd_, F_GETFL) & O_NONBLOCK) == 0) { + jobserver_closed_ = true; + } + + // Exit on failure to open FDs, build non-parallel for invalid passed FDs. + if (Enabled()) + Info("using jobserver: %s", jobserver); + else if (jobserver_fifo_ && (rfd_ == -1 || wfd_ == -1)) + Fatal("failed to open jobserver: %s: %s", jobserver, strerror(errno)); + else if (!jobserver_name_.empty()) + jobserver_closed_ = true; + + // Signal that we have initialized but do not have a token yet. + if (Enabled()) + token_count_ = -1; +} + +void PosixJobserverClient::Parse() { + // Return early if no makeflags are passed in the environment. + const char* makeflags = std::getenv("MAKEFLAGS"); + if (makeflags == nullptr || strlen(makeflags) == 0) + return; + + std::string::size_type flag_char = 0; + std::string flag; + std::vector flags; + + // Tokenize string to characters in flag, then words in flags. + while (flag_char < strlen(makeflags)) { + while (flag_char < strlen(makeflags) && + !isblank(static_cast(makeflags[flag_char]))) { + flag.push_back(static_cast(makeflags[flag_char])); + flag_char++; + } + + if (!flag.empty()) + flags.push_back(flag); + + flag.clear(); + flag_char++; + } + + // --jobserver-auth= + for (size_t n = 0; n < flags.size(); n++) + if (flags[n].find(kAuthKey) == 0) + flag = flags[n].substr(strlen(kAuthKey)); + + // --jobserver-fds= + if (flag.empty()) + for (size_t n = 0; n < flags.size(); n++) + if (flags[n].find(kFdsKey) == 0) + flag = flags[n].substr(strlen(kFdsKey)); + + // -j 1 + if (flag.empty()) + for (size_t n = 0; n < flags.size(); n++) + if (flags[n].find("-j") == 0) + jobserver_closed_ = true; + + // Check for fifo pipe. + if (flag.find(kFifoKey) == 0) + jobserver_fifo_ = true; + + jobserver_name_.assign(flag); +} + +bool PosixJobserverClient::Enabled() const { + return (rfd_ >= 0 && wfd_ >= 0) || jobserver_closed_; +} + +unsigned char PosixJobserverClient::Acquire() { + unsigned char token = '\0'; + + // The first token is implicitly handed to a process. + // Fallback to non-parallel if jobserver-capable parent has no pipe. + if (token_count_ <= 0 || jobserver_closed_) { + token_count_ = 1; + return token; + } + + int ret = read(rfd_, &token, 1); + if (ret < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { + jobserver_closed_ = true; + if (!jobserver_fifo_) + Warning("pipe closed: %d (mark the command as recursive)", rfd_); + else + Fatal("failed to read from jobserver: %d: %s", rfd_, strerror(errno)); + } + + if (ret > 0) + token_count_++; + + return token; +} + +void PosixJobserverClient::Release(unsigned char* token) { + if (token_count_ < 0) + token_count_ = 0; + if (token_count_ > 0) + token_count_--; + + // The first token is implicitly handed to a process. + // Writing is not possible if the pipe is closed. + if (*token == '\0' || jobserver_closed_) + return; + + int ret = write(wfd_, token, 1); + if (ret != 1) { + Fatal("failed to write to jobserver: %d: %s", wfd_, strerror(errno)); + } + + *token = '\0'; +} diff --git a/src/jobserver.h b/src/jobserver.h new file mode 100644 index 0000000000..c60751f365 --- /dev/null +++ b/src/jobserver.h @@ -0,0 +1,112 @@ +// Copyright 2024 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +/// The GNU jobserver limits parallelism by assigning a token from an external +/// pool for each command. On posix systems, the pool is a fifo or simple pipe +/// with N characters. On windows systems, the pool is a semaphore initialized +/// to N. When a command is finished, the acquired token is released by writing +/// it back to the fifo or pipe or by increasing the semaphore count. +/// +/// The jobserver functionality is enabled by passing --jobserver-auth= +/// (previously --jobserver-fds= in older versions of Make) in the MAKEFLAGS +/// environment variable and creating the respective file descriptors or objects. +/// On posix systems, is 'fifo:' or ',' for pipes. +/// On windows systems, is the name of the semaphore. +/// +/// The classes parse the MAKEFLAGS variable and opens an object handle if needed. +/// Once enabled, Acquire() must be called to acquire a token from the pool. +/// If a token is acquired, a new command can be started. +/// Once the command is completed, Release() must be called to return a token. +/// The token server does not care in which order a token is received. +struct Jobserver { + /// Return current token count or initialization signal if negative. + int Tokens() const { return token_count_; } + + /// Read MAKEFLAGS environment variable and process the jobserver flag value. + virtual void Parse() {}; + + /// Return true if jobserver functionality is enabled and initialized. + virtual bool Enabled() const { return false; } + + /// Implementation-specific method to acquire a token from the external pool + /// which is called for all tokens but returns early for the first token. + /// This method is called every time Ninja needs to start a command process. + /// Return a non-NUL char value on success (token acquired), and '\0' on failure. + /// First call always succeeds. Ninja is aborted on read errors from a fifo pipe. + /// The return is the token character to be saved for release after work is done. + virtual unsigned char Acquire() { return '\0'; } + + /// Implementation-specific method to release a token to the external pool + /// which is called for all tokens but returns early for the last token. + /// The parameter is the token returned by Acquire() to be sent to the token server. + /// A token with the default value of '\0' will not be sent to the token server. + /// After sent, the token that the pointer parameter points to is cleared to '\0'. + /// It must be called for each successful call to Acquire() after the command exits, + /// even if subprocesses fail or in the case of errors causing Ninja to exit. + /// Ninja is aborted on write errors, and calls always decrement token count. + virtual void Release(unsigned char*) {}; + + protected: + /// The number of currently acquired tokens, or a status signal if negative. + /// This is used to estimate the load capacity for attempting to start a new job, + /// and when the implicit (first) token has been acquired (initialization). + /// -1: initialized without a token + /// 0: uninitialized or disabled + /// +n: number of tokens in use + int token_count_ = 0; + + /// Whether a pipe to the jobserver token pool is closed + /// when it is expected to be open based on MAKEFLAGS + /// (e.g. subcommands not marked recursive, environment passed), + /// or the pipe is closed when expected to be closed + /// but when the parent process is jobserver-capable + /// (e.g. the parent jobserver process build is non-parallel). + bool jobserver_closed_ = false; + + /// String of the parsed value of the jobserver flag passed to environment. + std::string jobserver_name_; + + /// Substrings for parsing MAKEFLAGS environment variable. + static constexpr char const kAuthKey[] = "--jobserver-auth="; + static constexpr char const kFdsKey[] = "--jobserver-fds="; + static constexpr char const kFifoKey[] = "fifo:"; +}; + +struct PosixJobserverClient : public Jobserver { + /// Parse the MAKEFLAGS environment variable to receive the path / FDs + /// of the token pool, and open the handle to the pool if it is an object. + /// If a jobserver argument is found in the MAKEFLAGS environment variable, + /// and the handle is successfully opened, later calls to Enable() return true. + /// If a jobserver argument is found, but the handle fails to be opened, + /// the ninja process is aborted with an error, or, when the FDs provided are bad + /// the build falls back to non-parallel building and the client does not read. + explicit PosixJobserverClient(); + + void Parse() override; + bool Enabled() const override; + unsigned char Acquire() override; + void Release(unsigned char*) override; + + private: + /// Whether the type of jobserver pipe supplied to ninja is named. + bool jobserver_fifo_ = false; + + /// File descriptors to communicate with upstream jobserver token pool. + int rfd_ = -1; + int wfd_ = -1; +}; diff --git a/src/ninja.cc b/src/ninja.cc index 2902359f15..975780e3f0 100644 --- a/src/ninja.cc +++ b/src/ninja.cc @@ -84,8 +84,8 @@ struct Options { /// The Ninja main() loads up a series of data structures; various tools need /// to poke into these, so store them as fields on an object. struct NinjaMain : public BuildLogUser { - NinjaMain(const char* ninja_command, const BuildConfig& config) : - ninja_command_(ninja_command), config_(config), + NinjaMain(const char* ninja_command, const BuildConfig& config, Jobserver* jobserver) : + ninja_command_(ninja_command), config_(config), jobserver_(jobserver), start_time_millis_(GetTimeMillis()) {} /// Command line used to run Ninja. @@ -94,6 +94,9 @@ struct NinjaMain : public BuildLogUser { /// Build configuration set from flags (e.g. parallelism). const BuildConfig& config_; + /// Client for jobserver to allow a parent process to control parallelism. + Jobserver* jobserver_; + /// Loaded state (rules, nodes). State state_; @@ -267,7 +270,8 @@ bool NinjaMain::RebuildManifest(const char* input_file, string* err, if (!node) return false; - Builder builder(&state_, config_, &build_log_, &deps_log_, &disk_interface_, + Builder builder(&state_, config_, jobserver_, + &build_log_, &deps_log_, &disk_interface_, status, start_time_millis_); if (!builder.AddTarget(node, err)) return false; @@ -1355,7 +1359,8 @@ int NinjaMain::RunBuild(int argc, char** argv, Status* status) { disk_interface_.AllowStatCache(g_experimental_statcache); - Builder builder(&state_, config_, &build_log_, &deps_log_, &disk_interface_, + Builder builder(&state_, config_, jobserver_, + &build_log_, &deps_log_, &disk_interface_, status, start_time_millis_); for (size_t i = 0; i < targets.size(); ++i) { if (!builder.AddTarget(targets[i], &err)) { @@ -1542,6 +1547,14 @@ NORETURN void real_main(int argc, char** argv) { Status* status = Status::factory(config); + // Client of jobserver to manage job slots assigned to ninja. +// TODO: jobserver client support for Windows +#ifdef _WIN32 + Jobserver jobserver; +#else + PosixJobserverClient jobserver; +#endif + if (options.working_dir) { // The formatting of this string, complete with funny quotes, is // so Emacs can properly identify that the cwd has changed for @@ -1558,14 +1571,14 @@ NORETURN void real_main(int argc, char** argv) { if (options.tool && options.tool->when == Tool::RUN_AFTER_FLAGS) { // None of the RUN_AFTER_FLAGS actually use a NinjaMain, but it's needed // by other tools. - NinjaMain ninja(ninja_command, config); + NinjaMain ninja(ninja_command, config, &jobserver); exit((ninja.*options.tool->func)(&options, argc, argv)); } // Limit number of rebuilds, to prevent infinite loops. const int kCycleLimit = 100; for (int cycle = 1; cycle <= kCycleLimit; ++cycle) { - NinjaMain ninja(ninja_command, config); + NinjaMain ninja(ninja_command, config, &jobserver); ManifestParserOptions parser_opts; if (options.phony_cycle_should_err) {