diff --git a/bricks/sync/test.cc b/bricks/sync/test.cc index 56981b06..889d1bb3 100644 --- a/bricks/sync/test.cc +++ b/bricks/sync/test.cc @@ -254,7 +254,6 @@ TEST(OwnedBorrowed, UseInternalIsDestructingGetter) { } TEST(WaitableAtomic, Smoke) { - using current::IntrusiveClient; using current::WaitableAtomic; struct Object { @@ -264,54 +263,39 @@ TEST(WaitableAtomic, Smoke) { bool y_done = false; }; + // The object that is being mutated. WaitableAtomic object; - { - // This scope runs asynchronous operations in two dedicated threads. - WaitableAtomic top_level_lock; - - // The `++x` thread uses mutable accessors. - std::thread( - [&top_level_lock, &object](IntrusiveClient top_level_client) { - // Should be able to register another client for `top_level_lock`. - ASSERT_TRUE(bool(top_level_lock.RegisterScopedClient())); - while (top_level_client) { - // This loop will be terminated as `top_level_lock` will be leaving the scope. - ++object.MutableScopedAccessor()->x; - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - // Should no longer be able to register another client for `top_level_lock`. - ASSERT_FALSE(bool(top_level_lock.RegisterScopedClient())); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - object.MutableScopedAccessor()->x_done = true; - }, - top_level_lock.RegisterScopedClient()) - .detach(); - - // The `++y` thread uses the functional style. - std::thread( - [&top_level_lock, &object](IntrusiveClient top_level_client) { - // Should be able to register another client for `top_level_lock`. - ASSERT_TRUE(bool(top_level_lock.RegisterScopedClient())); - while (top_level_client) { - // This loop will be terminated as `top_level_lock` will be leaving the scope. - object.MutableUse([](Object& object) { ++object.y; }); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - // Should no longer be able to register another client for `top_level_lock`. - ASSERT_FALSE(bool(top_level_lock.RegisterScopedClient())); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - object.MutableUse([](Object& object) { object.y_done = true; }); - }, - top_level_lock.RegisterScopedClient()) - .detach(); - // Let `++x` and `++y` threads run 25ms. - std::this_thread::sleep_for(std::chrono::milliseconds(25)); - - // This block will only finish when both client threads have terminated. - // This is the reason behind using `.detach()` instead of `.join()`, - // since the latter would ruin the purpose of the test. - } + // This scope runs asynchronous operations in two dedicated threads. + WaitableAtomic done_flag(false); + + // The `++x` thread uses mutable accessors. + std::thread( + [&done_flag, &object]() { + while (!done_flag.GetValue()) { + ++object.MutableScopedAccessor()->x; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + object.MutableScopedAccessor()->x_done = true; + }).detach(); + + // The `++y` thread uses the functional style. + std::thread( + [&done_flag, &object]() { + while (!done_flag.GetValue()) { + object.MutableUse([](Object& object) { ++object.y; }); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + object.MutableUse([](Object& object) { object.y_done = true; }); + }).detach(); + + // Let `++x` and `++y` threads run for 25ms. + std::this_thread::sleep_for(std::chrono::milliseconds(25)); + + done_flag.SetValue(true); + object.Wait([](const Object& o) { return o.x_done && o.y_done; }); // Analyze the result. Object copy_of_object(object.GetValue()); @@ -354,15 +338,6 @@ TEST(WaitableAtomic, ProxyConstructor) { EXPECT_EQ(2, object.GetValue().APlusB()); } -TEST(WaitableAtomic, IntrusiveClientsCanBeTransferred) { - using current::IntrusiveClient; - using current::WaitableAtomic; - - WaitableAtomic object; - auto f = [](IntrusiveClient& c) { static_cast(c); }; - std::thread([&f](IntrusiveClient c) { f(c); }, object.RegisterScopedClient()).detach(); -} - TEST(WaitableAtomic, WaitFor) { using current::WaitableAtomic; { diff --git a/bricks/sync/waitable_atomic.h b/bricks/sync/waitable_atomic.h index 755ceaf7..2b8786ff 100644 --- a/bricks/sync/waitable_atomic.h +++ b/bricks/sync/waitable_atomic.h @@ -24,24 +24,14 @@ SOFTWARE. // WaitableAtomic acts as an atomic wrapper over type T with one additional feature: the clients // can wait for updates on this object instead of using spin locks or other external waiting primitives. -// Additionally, if the second template parameter is set to true, WaitableAtomic allows -// creating scoped clients via `IntrusiveClient client = my_waitable_atomic.RegisterScopedClient()`. -// When an instance of intrusive WaitableAtomic is going out of scope: -// 1) Its registered clients will be notified. (IntrusiveClient supports `operator bool()`). -// 2) Pending wait operations, if any, will be aborted. (And return `false`.) -// 3) WaitableAtomic will wait for all the clients to gracefully terminate (go out of scope) -// before destructing the data object contained within this WaitableAtomic. #ifndef BRICKS_WAITABLE_ATOMIC_H #define BRICKS_WAITABLE_ATOMIC_H -#include #include #include #include -#include "../time/chrono.h" - #ifdef CURRENT_FOR_CPP14 #include "../template/weed.h" #endif // CURRENT_FOR_CPP14 @@ -60,6 +50,9 @@ class ScopedUniqueLock { explicit ScopedUniqueLock(std::mutex& mutex) : lock_(mutex) {} ScopedUniqueLock(ScopedUniqueLock&& rhs) : lock_(std::move(rhs.lock_)) {} + protected: + ~ScopedUniqueLock() = default; // The destructor of a non-`final` class should be `virtual` or `protected`. + private: std::unique_lock lock_; @@ -107,330 +100,233 @@ class IntrusiveClient final { Interface* intrusive_object_; }; -template -class WaitableAtomicImpl { +template +class WaitableAtomic { public: - class BasicImpl { - public: - using data_t = DATA; - enum { IS_INTRUSIVE = false }; - - template - BasicImpl(ARGS&&... args) : data_(std::forward(args)...) {} - - BasicImpl(const DATA& data) : data_(data) {} - - template - struct NotifyIfMutable { - class ImmutableAccessorDoesNotNotify { - public: - explicit ImmutableAccessorDoesNotNotify(POINTER*) {} - }; - - class MutableAccessorDoesNotify { - public: - explicit MutableAccessorDoesNotify(POINTER* parent) : parent_(parent), mark_as_unmodified_(false) {} - ~MutableAccessorDoesNotify() { - if (!mark_as_unmodified_) { - parent_->Notify(); - } - } - void MarkAsUnmodified() { mark_as_unmodified_ = true; } + using data_t = DATA; + + template + WaitableAtomic(ARGS&&... args) : data_(std::forward(args)...) {} - private: - POINTER* parent_; - bool mark_as_unmodified_; - }; + WaitableAtomic(const DATA& data) : data_(data) {} - using impl_t = - std::conditional_t::value, ImmutableAccessorDoesNotNotify, MutableAccessorDoesNotify>; + template + struct NotifyIfMutable { + class ImmutableAccessorDoesNotNotify { + public: + explicit ImmutableAccessorDoesNotNotify(POINTER*) {} }; - // A generic implementation for both mutable and immutable scoped accessors. - template - class ScopedAccessorImpl final : private ScopedUniqueLock, public NotifyIfMutable::impl_t { + class MutableAccessorDoesNotify { public: - using parent_t = PARENT; - using optional_notifier_t = typename NotifyIfMutable::impl_t; - using data_t = std:: - conditional_t::value, const typename parent_t::data_t, typename parent_t::data_t>; + explicit MutableAccessorDoesNotify(POINTER* parent) : parent_(parent), mark_as_unmodified_(false) {} + ~MutableAccessorDoesNotify() { + if (!mark_as_unmodified_) { + parent_->Notify(); + } + } + void MarkAsUnmodified() { mark_as_unmodified_ = true; } - explicit ScopedAccessorImpl(parent_t* parent) - : ScopedUniqueLock(parent->data_mutex_), optional_notifier_t(parent), pdata_(&parent->data_) {} + private: + POINTER* parent_; + bool mark_as_unmodified_; + }; - ScopedAccessorImpl(ScopedAccessorImpl&& rhs) - : ScopedUniqueLock(std::move(rhs)), optional_notifier_t(rhs), pdata_(rhs.pdata_) {} + using impl_t = + std::conditional_t::value, ImmutableAccessorDoesNotNotify, MutableAccessorDoesNotify>; + }; - ~ScopedAccessorImpl() {} + // A generic implementation for both mutable and immutable scoped accessors. + template + class ScopedAccessorImpl final : private ScopedUniqueLock, public NotifyIfMutable::impl_t { + public: + using parent_t = PARENT; + using optional_notifier_t = typename NotifyIfMutable::impl_t; + using data_t = std:: + conditional_t::value, const typename parent_t::data_t, typename parent_t::data_t>; - ScopedAccessorImpl() = delete; - ScopedAccessorImpl(const ScopedAccessorImpl&) = delete; - void operator=(const ScopedAccessorImpl&) = delete; + explicit ScopedAccessorImpl(parent_t* parent) + : ScopedUniqueLock(parent->data_mutex_), optional_notifier_t(parent), pdata_(&parent->data_) {} - data_t* operator->() const { return pdata_; } - data_t& operator*() const { return *pdata_; } + ScopedAccessorImpl(ScopedAccessorImpl&& rhs) + : ScopedUniqueLock(std::move(rhs)), optional_notifier_t(rhs), pdata_(rhs.pdata_) {} - private: - data_t* pdata_; - }; + ~ScopedAccessorImpl() {} - using MutableAccessor = ScopedAccessorImpl; - using ImmutableAccessor = ScopedAccessorImpl; + ScopedAccessorImpl() = delete; + ScopedAccessorImpl(const ScopedAccessorImpl&) = delete; + void operator=(const ScopedAccessorImpl&) = delete; - friend class ScopedAccessorImpl; - friend class ScopedAccessorImpl; + data_t* operator->() const { return pdata_; } + data_t& operator*() const { return *pdata_; } - ImmutableAccessor ImmutableScopedAccessor() const { return ImmutableAccessor(this); } + private: + data_t* pdata_; + }; - MutableAccessor MutableScopedAccessor() { return MutableAccessor(this); } + using MutableAccessor = ScopedAccessorImpl; + using ImmutableAccessor = ScopedAccessorImpl; - void Notify() { data_condition_variable_.notify_all(); } + friend class ScopedAccessorImpl; + friend class ScopedAccessorImpl; - void UseAsLock(std::function f) const { - std::unique_lock lock(data_t::data_mutex_); - f(); - } + ImmutableAccessor ImmutableScopedAccessor() const { return ImmutableAccessor(this); } - bool Wait(std::function predicate) const { - std::unique_lock lock(data_mutex_); - if (!predicate(data_)) { - const data_t& data = data_; - data_condition_variable_.wait(lock, [&predicate, &data] { return predicate(data); }); - } - return true; + MutableAccessor MutableScopedAccessor() { return MutableAccessor(this); } + + void Notify() { data_condition_variable_.notify_all(); } + + void UseAsLock(std::function f) const { + std::unique_lock lock(data_t::data_mutex_); + f(); + } + + bool Wait(std::function predicate) const { + std::unique_lock lock(data_mutex_); + if (!predicate(data_)) { + const data_t& data = data_; + data_condition_variable_.wait(lock, [&predicate, &data] { return predicate(data); }); } + return true; + } #ifndef CURRENT_FOR_CPP14 - // NOTE(dkorolev): Deliberately not bothering with C++14 for this two-lambdas `Wait()`. - // TODO(dkorolev): The `.Wait()` above always returning `true` could use some TLC. - - template - std::invoke_result_t Wait(std::function wait_predicate, F&& retval_predicate) { - std::unique_lock lock(data_mutex_); - if (!wait_predicate(data_)) { - const data_t& data = data_; - data_condition_variable_.wait(lock, [&wait_predicate, &data] { return wait_predicate(data); }); - return retval_predicate(data_); - } else { - return retval_predicate(data_); - } + // NOTE(dkorolev): Deliberately not bothering with C++14 for this two-lambdas `Wait()`. + // TODO(dkorolev): The `.Wait()` above always returning `true` could use some TLC. + + template + std::invoke_result_t Wait(std::function wait_predicate, F&& retval_predicate) { + std::unique_lock lock(data_mutex_); + if (!wait_predicate(data_)) { + const data_t& data = data_; + data_condition_variable_.wait(lock, [&wait_predicate, &data] { return wait_predicate(data); }); + return retval_predicate(data_); + } else { + return retval_predicate(data_); } + } #endif // CURRENT_FOR_CPP14 - template - bool WaitFor(std::function predicate, T duration) const { - std::unique_lock lock(data_mutex_); - if (!predicate(data_)) { - const data_t& data = data_; - return data_condition_variable_.wait_for(lock, duration, [&predicate, &data] { return predicate(data); }); - } - return true; + template + bool WaitFor(std::function predicate, T duration) const { + std::unique_lock lock(data_mutex_); + if (!predicate(data_)) { + const data_t& data = data_; + return data_condition_variable_.wait_for(lock, duration, [&predicate, &data] { return predicate(data); }); } + return true; + } #ifndef CURRENT_FOR_CPP14 - // NOTE(dkorolev): Deliberately not bothering with C++14 for these three- and four-argument `WaitFor()`-s. - - template - std::invoke_result_t WaitFor(std::function predicate, - F&& retval_predicate, - T duration) { - std::unique_lock lock(data_mutex_); - if (!predicate(data_)) { - const data_t& data = data_; - if (data_condition_variable_.wait_for(lock, duration, [&predicate, &data] { return predicate(data); })) { - return retval_predicate(data_); - } else { - // The three-argument `WaitFor()` assumes the default constructor for the return type indicates that - // the wait should continue. Use the four-argument `WaitFor()` to provide a custom retval initializer. - // The custom retval predicate can also mutate the waited upon object as it sees fit. - return std::invoke_result_t(); - } - } else { + // NOTE(dkorolev): Deliberately not bothering with C++14 for these three- and four-argument `WaitFor()`-s. + + template + std::invoke_result_t WaitFor(std::function predicate, + F&& retval_predicate, + T duration) { + std::unique_lock lock(data_mutex_); + if (!predicate(data_)) { + const data_t& data = data_; + if (data_condition_variable_.wait_for(lock, duration, [&predicate, &data] { return predicate(data); })) { return retval_predicate(data_); + } else { + // The three-argument `WaitFor()` assumes the default constructor for the return type indicates that + // the wait should continue. Use the four-argument `WaitFor()` to provide a custom retval initializer. + // The custom retval predicate can also mutate the waited upon object as it sees fit. + return std::invoke_result_t(); } + } else { + return retval_predicate(data_); } + } - template - std::invoke_result_t WaitFor(std::function predicate, - F&& retval_predicate, - G&& wait_unsuccessul_predicate, - T duration) { - std::unique_lock lock(data_mutex_); - if (!predicate(data_)) { - const data_t& data = data_; - if (data_condition_variable_.wait_for(lock, duration, [&predicate, &data] { return predicate(data); })) { - return retval_predicate(data_); - } else { - return wait_unsuccessul_predicate(data_); - } - } else { + template + std::invoke_result_t WaitFor(std::function predicate, + F&& retval_predicate, + G&& wait_unsuccessul_predicate, + T duration) { + std::unique_lock lock(data_mutex_); + if (!predicate(data_)) { + const data_t& data = data_; + if (data_condition_variable_.wait_for(lock, duration, [&predicate, &data] { return predicate(data); })) { return retval_predicate(data_); + } else { + return wait_unsuccessul_predicate(data_); } + } else { + return retval_predicate(data_); } + } #endif // CURRENT_FOR_CPP14 #ifndef CURRENT_FOR_CPP14 - template - std::invoke_result_t ImmutableUse(F&& f, ARGS&&... args) const { - auto scope = ImmutableScopedAccessor(); - return f(*scope, std::forward(args)...); - } + template + std::invoke_result_t ImmutableUse(F&& f, ARGS&&... args) const { + auto scope = ImmutableScopedAccessor(); + return f(*scope, std::forward(args)...); + } - template - std::invoke_result_t MutableUse(F&& f, ARGS&&... args) { - auto scope = MutableScopedAccessor(); - return f(*scope, std::forward(args)...); - } + template + std::invoke_result_t MutableUse(F&& f, ARGS&&... args) { + auto scope = MutableScopedAccessor(); + return f(*scope, std::forward(args)...); + } #else - template - weed::call_with_type ImmutableUse(F&& f, ARGS&&... args) const { - auto scope = ImmutableScopedAccessor(); - return f(*scope, std::forward(args)...); - } + template + weed::call_with_type ImmutableUse(F&& f, ARGS&&... args) const { + auto scope = ImmutableScopedAccessor(); + return f(*scope, std::forward(args)...); + } - template - weed::call_with_type MutableUse(F&& f, ARGS&&... args) { - auto scope = MutableScopedAccessor(); - return f(*scope, std::forward(args)...); - } + template + weed::call_with_type MutableUse(F&& f, ARGS&&... args) { + auto scope = MutableScopedAccessor(); + return f(*scope, std::forward(args)...); + } #endif // CURRENT_FOR_CPP14 - bool PotentiallyMutableUse(std::function f) { - auto scope = MutableScopedAccessor(); - if (f(*scope)) { - return true; - } else { - scope.MarkAsUnmodified(); - return false; - } - } - data_t GetValue() const { return *ImmutableScopedAccessor(); } - - void SetValue(const data_t& data) { *MutableScopedAccessor() = data; } - - void SetValueIf(std::function predicate, const data_t& data) { - auto a = MutableScopedAccessor(); - if (predicate(*a)) { - *a = data; - } else { - a.MarkAsUnmodified(); - } - } - - protected: - data_t data_; - mutable std::mutex data_mutex_; - mutable std::condition_variable data_condition_variable_; - - private: - BasicImpl(const BasicImpl&) = delete; - void operator=(const BasicImpl&) = delete; - BasicImpl(BasicImpl&&) = delete; - }; - - class IntrusiveImpl : public BasicImpl, public IntrusiveClient::Interface { - public: - using data_t = DATA; - enum { IS_INTRUSIVE = true }; - - explicit IntrusiveImpl(CustomWaitableAtomicDestructor* destructor_ptr = nullptr) - : destructing_(false), destructor_ptr_(destructor_ptr) { - RefCounterTryIncrease(); - } - - explicit IntrusiveImpl(const data_t& data, CustomWaitableAtomicDestructor* destructor_ptr = nullptr) - : BasicImpl(data), destructing_(false), destructor_ptr_(destructor_ptr) { - RefCounterTryIncrease(); - } - - virtual ~IntrusiveImpl() { - { - std::lock_guard guard(BasicImpl::data_mutex_); - destructing_ = true; - BasicImpl::Notify(); - } - RefCounterDecrease(); - if (destructor_ptr_) { - destructor_ptr_->WaitableAtomicDestructing(); // LCOV_EXCL_LINE - } - { - // Wait for the registered scoped clients to leave their respective scopes. - std::unique_lock lock(BasicImpl::data_mutex_); - if (ref_count_ > 0u) { - cv_.wait(lock, [this]() { return ref_count_ == 0u; }); - } - } - } - - bool Wait(std::function predicate) const { - std::unique_lock lock(BasicImpl::data_mutex_); - if (destructing_) { - return false; - } else { - if (!predicate(BasicImpl::data_)) { - const data_t& data = BasicImpl::data_; - BasicImpl::data_condition_variable_.wait( - lock, [this, &predicate, &data] { return destructing_ || predicate(data); }); - } - return !destructing_; - } - } - - IntrusiveClient RegisterScopedClient() { return IntrusiveClient(this); } - - virtual bool RefCounterTryIncrease() override { - { - std::lock_guard guard(BasicImpl::data_mutex_); - if (destructing_) { - return false; - } else { - ++ref_count_; - } - } - cv_.notify_one(); + bool PotentiallyMutableUse(std::function f) { + auto scope = MutableScopedAccessor(); + if (f(*scope)) { return true; + } else { + scope.MarkAsUnmodified(); + return false; } + } + data_t GetValue() const { return *ImmutableScopedAccessor(); } - virtual void RefCounterDecrease() override { - { - std::lock_guard guard(BasicImpl::data_mutex_); - --ref_count_; - } - cv_.notify_one(); - } + void SetValue(const data_t& data) { *MutableScopedAccessor() = data; } - virtual bool IsDestructing() const override { - std::lock_guard guard(BasicImpl::data_mutex_); - return destructing_; + void SetValueIf(std::function predicate, const data_t& data) { + auto a = MutableScopedAccessor(); + if (predicate(*a)) { + *a = data; + } else { + a.MarkAsUnmodified(); } + } - protected: - std::atomic_bool destructing_; - size_t ref_count_ = 0; - std::condition_variable cv_; - CustomWaitableAtomicDestructor* destructor_ptr_ = nullptr; - - private: - IntrusiveImpl(const IntrusiveImpl&) = delete; - void operator=(const IntrusiveImpl&) = delete; - IntrusiveImpl(IntrusiveImpl&&) = delete; - }; + protected: + data_t data_; + mutable std::mutex data_mutex_; + mutable std::condition_variable data_condition_variable_; - using type = std::conditional_t; + private: + WaitableAtomic(const WaitableAtomic&) = delete; + void operator=(const WaitableAtomic&) = delete; + WaitableAtomic(WaitableAtomic&&) = delete; }; -template -using WaitableAtomic = typename WaitableAtomicImpl::type; - } // namespace current #endif // BRICKS_WAITABLE_ATOMIC_H diff --git a/examples/streamed_sockets/latencytest/dsl/test.cc b/examples/streamed_sockets/latencytest/dsl/test.cc index c7ea78e4..61b3e0a4 100644 --- a/examples/streamed_sockets/latencytest/dsl/test.cc +++ b/examples/streamed_sockets/latencytest/dsl/test.cc @@ -28,6 +28,8 @@ SOFTWARE. #include "../../../../3rdparty/gtest/gtest-main.h" #include "../../../../bricks/strings/join.h" +#include "../../../../bricks/strings/printf.h" +#include "../../../../bricks/util/singleton.h" #ifndef CURRENT_FOR_CPP14 // NOTE(dkorolev): Excluding this part from the C++14 version of Current.