Skip to content

Commit

Permalink
fixed a test explicitly using never stop token
Browse files Browse the repository at this point in the history
  • Loading branch information
dietmarkuehl committed Jan 18, 2025
1 parent 63ed1b6 commit 2c182da
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 16 deletions.
16 changes: 11 additions & 5 deletions include/beman/lazy/detail/any_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,18 @@ class any_scheduler {
}
void complete_stopped() override { ::beman::execution26::set_stopped(std::move(this->receiver)); }
::beman::execution26::inplace_stop_token get_stop_token() override {
if (not this->callback) {
this->callback.emplace(
::beman::execution26::get_stop_token(::beman::execution26::get_env(this->receiver)),
stopper{this});
if constexpr (::std::same_as<token_t, ::beman::execution26::inplace_stop_token>) {
return ::beman::execution26::get_stop_token(::beman::execution26::get_env(this->receiver));
} else {
if constexpr (not::std::same_as<token_t, ::beman::execution26::never_stop_token>) {
if (not this->callback) {
this->callback.emplace(
::beman::execution26::get_stop_token(::beman::execution26::get_env(this->receiver)),
stopper{this});
}
}
return this->source.get_token();
}
return this->source.get_token();
}
};

Expand Down
70 changes: 59 additions & 11 deletions tests/beman/lazy/any_scheduler.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
#include <beman/lazy/detail/any_scheduler.hpp>
#include <beman/lazy/detail/inline_scheduler.hpp>
#include <beman/execution26/execution.hpp>
#include <beman/execution26/stop_token.hpp>
#include <atomic>
#include <latch>
#include <exception>
#include <system_error>
#include <thread>
Expand All @@ -29,6 +31,7 @@ struct thread_context {
virtual void complete() = 0;
};

std::latch stop_done{1u};
std::mutex mutex;
std::condition_variable condition;
bool done{false};
Expand Down Expand Up @@ -57,6 +60,7 @@ struct thread_context {
while (auto w{this->get_work()}) {
w->complete();
}
this->stop_done.count_down();
}) {}
~thread_context() {
this->stop();
Expand Down Expand Up @@ -141,25 +145,46 @@ struct thread_context {
this->done = true;
}
this->condition.notify_one();
this->stop_done.wait();
}
};

enum class stop_result { none, success, failure, stopped };
template <typename Token>
struct stop_env {
ex::inplace_stop_token token;
auto query(ex::get_stop_token_t) const noexcept { return this->token; }
Token token;
auto query(ex::get_stop_token_t) const noexcept { return this->token; }
};
template <typename Token>
stop_env(Token&&) -> stop_env<std::remove_cvref_t<Token>>;

template <typename Token>
struct stop_receiver {
using receiver_concept = ex::receiver_t;
ex::inplace_stop_token token;
stop_result& result;
stop_env get_env() const noexcept { return {this->token}; }
Token token;
stop_result& result;
std::latch* completed{};
auto get_env() const noexcept { return stop_env{this->token}; }

void set_value(auto&&...) && noexcept { this->result = stop_result::success; }
void set_error(auto&&) && noexcept { this->result = stop_result::failure; }
void set_stopped() && noexcept { this->result = stop_result::stopped; }
void set_value(auto&&...) && noexcept {
this->result = stop_result::success;
if (this->completed)
this->completed->count_down();
}
void set_error(auto&&) && noexcept {
this->result = stop_result::failure;
if (this->completed)
this->completed->count_down();
}
void set_stopped() && noexcept {
this->result = stop_result::stopped;
if (this->completed)
this->completed->count_down();
}
};
static_assert(ex::receiver<stop_receiver>);
template <typename Token>
stop_receiver(Token&&, stop_result&, std::latch* = nullptr) -> stop_receiver<std::remove_cvref_t<Token>>;
static_assert(ex::receiver<stop_receiver<ex::inplace_stop_token>>);

} // namespace

Expand Down Expand Up @@ -240,7 +265,7 @@ int main() {
ex::inplace_stop_source source;
stop_result result{stop_result::none};
auto state{ex::connect(ex::schedule(ctxt1.get_scheduler(thread_context::complete::never)),
stop_receiver(source.get_token(), result))};
stop_receiver{source.get_token(), result})};
assert(result == stop_result::none);
ex::start(state);
assert(result == stop_result::none);
Expand All @@ -252,11 +277,34 @@ int main() {
stop_result result{stop_result::none};
auto state{
ex::connect(ex::schedule(ly::detail::any_scheduler(ctxt1.get_scheduler(thread_context::complete::never))),
stop_receiver(source.get_token(), result))};
stop_receiver{source.get_token(), result})};
assert(result == stop_result::none);
ex::start(state);
assert(result == stop_result::none);
source.request_stop();
assert(result == stop_result::stopped);
}
{
ex::stop_source source;
stop_result result{stop_result::none};
auto state{
ex::connect(ex::schedule(ly::detail::any_scheduler(ctxt1.get_scheduler(thread_context::complete::never))),
stop_receiver{source.get_token(), result})};
assert(result == stop_result::none);
ex::start(state);
assert(result == stop_result::none);
source.request_stop();
assert(result == stop_result::stopped);
}
{
std::latch completed{1};
stop_result result{stop_result::none};
auto state{ex::connect(
ex::schedule(ly::detail::any_scheduler(ctxt1.get_scheduler(thread_context::complete::success))),
stop_receiver{ex::never_stop_token(), result, &completed})};
assert(result == stop_result::none);
ex::start(state);
completed.wait();
assert(result == stop_result::success);
}
}

0 comments on commit 2c182da

Please sign in to comment.