From 955b967623983cb50ba09f7b82e5f01f2c6bcebb Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Tue, 5 Oct 2021 17:31:53 -0400 Subject: SL-16024: Add ThreadSafeSchedule, a timestamped LLThreadSafeQueue. ThreadSafeSchedule orders its items by timestamp, which can be passed either implicitly or explicitly. The timestamp specifies earliest delivery time: an item cannot be popped until that time. Add initial tests. Tweak the LLThreadSafeQueue base class to support ThreadSafeSchedule: introduce virtual canPop() method to report whether the current head item is available to pop. The base class unconditionally says yes, ThreadSafeSchedule says it depends on whether its timestamp is still in the future. This replaces the protected pop_() overload accepting a predicate. Rather than explicitly passing a predicate through a couple levels of function call, use canPop() at the level it matters. Runtime behavior that varies depending on an object's leaf class is what virtual functions were invented for. Give pop_() a three-state enum return so pop() can distinguish between "closed and empty" (throws exception) versus "closed, not yet drained because we're not yet ready to pop the head item" (waits). Also break out protected tryPopUntil_() method, the body logic of tryPopUntil(). The public method locks the data structure, the protected method requires that its caller has already done so. Add chrono.h with a more full-featured LL::time_point_cast() function than the one found in , which only converts between time_point durations, not between time_points based on different clocks. --- indra/llcommon/tests/threadsafeschedule_test.cpp | 65 ++++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 indra/llcommon/tests/threadsafeschedule_test.cpp (limited to 'indra/llcommon/tests/threadsafeschedule_test.cpp') diff --git a/indra/llcommon/tests/threadsafeschedule_test.cpp b/indra/llcommon/tests/threadsafeschedule_test.cpp new file mode 100644 index 0000000000..ec0fa0c928 --- /dev/null +++ b/indra/llcommon/tests/threadsafeschedule_test.cpp @@ -0,0 +1,65 @@ +/** + * @file threadsafeschedule_test.cpp + * @author Nat Goodspeed + * @date 2021-10-04 + * @brief Test for threadsafeschedule. + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +// Precompiled header +#include "linden_common.h" +// associated header +#include "threadsafeschedule.h" +// STL headers +// std headers +#include +// external library headers +// other Linden headers +#include "../test/lltut.h" + +using namespace std::literals::chrono_literals; // ms suffix +using namespace std::literals::string_literals; // s suffix +using Queue = LL::ThreadSafeSchedule; + +/***************************************************************************** +* TUT +*****************************************************************************/ +namespace tut +{ + struct threadsafeschedule_data + { + Queue queue; + }; + typedef test_group threadsafeschedule_group; + typedef threadsafeschedule_group::object object; + threadsafeschedule_group threadsafeschedulegrp("threadsafeschedule"); + + template<> template<> + void object::test<1>() + { + set_test_name("push"); + // Simply calling push() a few times might result in indeterminate + // delivery order if the resolution of steady_clock is coarser than + // the real time required for each push() call. Explicitly increment + // the timestamp for each one -- but since we're passing explicit + // timestamps, make the queue reorder them. + queue.push(Queue::TimeTuple(Queue::Clock::now() + 20ms, "ghi")); + queue.push("abc"s); + queue.push(Queue::Clock::now() + 10ms, "def"); + queue.close(); + auto entry = queue.pop(); + ensure_equals("failed to pop first", std::get<0>(entry), "abc"s); + entry = queue.pop(); + ensure_equals("failed to pop second", std::get<0>(entry), "def"s); + ensure("queue not closed", queue.isClosed()); + ensure("queue prematurely done", ! queue.done()); + entry = queue.pop(); + ensure_equals("failed to pop third", std::get<0>(entry), "ghi"s); + bool popped = queue.tryPop(entry); + ensure("queue not empty", ! popped); + ensure("queue not done", queue.done()); + } +} // namespace tut -- cgit v1.3 From cf70766b4504f7ee745822926c526ed9c86c9339 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Wed, 6 Oct 2021 12:54:29 -0400 Subject: SL-16024: Fix ThreadSafeSchedule::tryPopFor(), tryPopUntil(). ThreadSafeSchedule::tryPopUntil() (and therefore tryPopFor()) was simply delegating to LLThreadSafeQueue::tryPopUntil(), with an adjusted timeout since we want to wake up as soon as the head item, if any, becomes ready. But then we have to loop back to retry the pop to actually deal with that head item. In addition, ThreadSafeSchedule::popWithTime() was spinning rather than properly blocking on a timed condition variable. Fixed. --- indra/llcommon/llthreadsafequeue.h | 51 +++++++++-------- indra/llcommon/tests/threadsafeschedule_test.cpp | 10 +++- indra/llcommon/threadsafeschedule.h | 72 ++++++++++++++++++------ 3 files changed, 88 insertions(+), 45 deletions(-) (limited to 'indra/llcommon/tests/threadsafeschedule_test.cpp') diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index bd2d82d4c3..719edcd579 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -179,11 +179,12 @@ protected: boost::fibers::condition_variable_any mCapacityCond; boost::fibers::condition_variable_any mEmptyCond; + enum pop_result { EMPTY, DONE, WAITING, POPPED }; // implementation logic, suitable for passing to tryLockUntil() template - bool tryPopUntil_(lock_t& lock, - const std::chrono::time_point& until, - ElementT& element); + pop_result tryPopUntil_(lock_t& lock, + const std::chrono::time_point& until, + ElementT& element); // if we're able to lock immediately, do so and run the passed callable, // which must accept lock_t& and return bool template @@ -197,7 +198,6 @@ protected: template bool push_(lock_t& lock, T&& element); // while lock is locked, really pop the head element, if we can - enum pop_result { EMPTY, WAITING, POPPED }; pop_result pop_(lock_t& lock, ElementT& element); // Is the current head element ready to pop? We say yes; subclass can // override as needed. @@ -398,7 +398,7 @@ LLThreadSafeQueue::pop_(lock_t& lock, ElementT& element) { // If mStorage is empty, there's no head element. if (mStorage.empty()) - return EMPTY; + return mClosed? DONE : EMPTY; // If there's a head element, pass it to canPop() to see if it's ready to pop. if (! canPop(mStorage.front())) @@ -427,16 +427,16 @@ ElementT LLThreadSafeQueue::pop(void) if (popped == POPPED) return std::move(value); - // Once the queue is empty, mClosed lets us know if there will ever be - // any more coming. If we didn't pop because WAITING, i.e. canPop() - // returned false, then even if the producer end has been closed, - // there's still at least one item to drain: wait for it. - if (popped == EMPTY && mClosed) + // Once the queue is DONE, there will never be any more coming. + if (popped == DONE) { LLTHROW(LLThreadSafeQueueInterrupt()); } - // Storage empty, queue still open. Wait for signal. + // If we didn't pop because WAITING, i.e. canPop() returned false, + // then even if the producer end has been closed, there's still at + // least one item to drain: wait for it. Or we might be EMPTY, with + // the queue still open. Either way, wait for signal. mEmptyCond.wait(lock1); } } @@ -448,8 +448,8 @@ bool LLThreadSafeQueue::tryPop(ElementT & element) return tryLock( [this, &element](lock_t& lock) { - // no need to check mClosed: tryPop() behavior when the queue is - // closed is implemented by simple inability to push any new + // conflate EMPTY, DONE, WAITING: tryPop() behavior when the queue + // is closed is implemented by simple inability to push any new // elements return pop_(lock, element) == POPPED; }); @@ -478,7 +478,8 @@ bool LLThreadSafeQueue::tryPopUntil( until, [this, until, &element](lock_t& lock) { - return tryPopUntil_(lock, until, element); + // conflate EMPTY, DONE, WAITING + return tryPopUntil_(lock, until, element) == POPPED; }); } @@ -486,26 +487,28 @@ bool LLThreadSafeQueue::tryPopUntil( // body of tryPopUntil(), called once we have the lock template template -bool LLThreadSafeQueue::tryPopUntil_( +typename LLThreadSafeQueue::pop_result +LLThreadSafeQueue::tryPopUntil_( lock_t& lock, const std::chrono::time_point& until, ElementT& element) { while (true) { - if (pop_(lock, element) == POPPED) - return true; - - if (mClosed) + pop_result popped = pop_(lock, element); + if (popped == POPPED || popped == DONE) { - return false; + // If we succeeded, great! If we've drained the last item, so be + // it. Either way, break the loop and tell caller. + return popped; } - // Storage empty. Wait for signal. + // EMPTY or WAITING: wait for signal. if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until)) { - // timed out -- formally we might recheck both conditions above - return false; + // timed out -- formally we might recheck + // as it is, break loop + return popped; } // If we didn't time out, we were notified for some reason. Loop back // to check. @@ -546,7 +549,7 @@ template bool LLThreadSafeQueue::done() { lock_t lock(mLock); - return mClosed && mStorage.size() == 0; + return mClosed && mStorage.empty(); } #endif diff --git a/indra/llcommon/tests/threadsafeschedule_test.cpp b/indra/llcommon/tests/threadsafeschedule_test.cpp index ec0fa0c928..af67b9f492 100644 --- a/indra/llcommon/tests/threadsafeschedule_test.cpp +++ b/indra/llcommon/tests/threadsafeschedule_test.cpp @@ -47,6 +47,8 @@ namespace tut // the timestamp for each one -- but since we're passing explicit // timestamps, make the queue reorder them. queue.push(Queue::TimeTuple(Queue::Clock::now() + 20ms, "ghi")); + // Given the various push() overloads, you have to match the type + // exactly: conversions are ambiguous. queue.push("abc"s); queue.push(Queue::Clock::now() + 10ms, "def"); queue.close(); @@ -56,9 +58,11 @@ namespace tut ensure_equals("failed to pop second", std::get<0>(entry), "def"s); ensure("queue not closed", queue.isClosed()); ensure("queue prematurely done", ! queue.done()); - entry = queue.pop(); - ensure_equals("failed to pop third", std::get<0>(entry), "ghi"s); - bool popped = queue.tryPop(entry); + std::string s; + bool popped = queue.tryPopFor(1s, s); + ensure("failed to pop third", popped); + ensure_equals("third is wrong", s, "ghi"s); + popped = queue.tryPop(s); ensure("queue not empty", ! popped); ensure("queue not done", queue.done()); } diff --git a/indra/llcommon/threadsafeschedule.h b/indra/llcommon/threadsafeschedule.h index 545c820f53..8ab4311ca1 100644 --- a/indra/llcommon/threadsafeschedule.h +++ b/indra/llcommon/threadsafeschedule.h @@ -73,11 +73,7 @@ namespace LL private: using super = LLThreadSafeQueue>; using lock_t = typename super::lock_t; - using super::pop_; - using super::push_; - using super::mClosed; - using super::mEmptyCond; - using super::mCapacityCond; + using pop_result = typename super::pop_result; public: using TimePoint = ThreadSafeSchedulePrivate::TimePoint; @@ -92,6 +88,11 @@ namespace LL using super::push; /// pass DataTuple with implicit now + // This could be ambiguous for Args with a single type. Unfortunately + // we can't enable_if an individual method with a condition based on + // the *class* template arguments, only on that method's template + // arguments. We could specialize this class for the single-Args case; + // we could minimize redundancy by breaking out a common base class... void push(const DataTuple& tuple) { push(tuple_cons(Clock::now(), tuple)); @@ -103,11 +104,11 @@ namespace LL push(TimeTuple(time, std::forward(args)...)); } - /// individually pass every component except the TimePoint (implies - /// now) -- could be ambiguous if the first specified template - /// parameter type is also TimePoint -- we could try to disambiguate, - /// but a simpler approach would be for the caller to explicitly - /// construct DataTuple and call that overload + /// individually pass every component except the TimePoint (implies now) + // This could be ambiguous if the first specified template parameter + // type is also TimePoint. We could try to disambiguate, but a simpler + // approach would be for the caller to explicitly construct DataTuple + // and call that overload. void push(Args&&... args) { push(Clock::now(), std::forward(args)...); @@ -199,6 +200,10 @@ namespace LL // current time. /// pop DataTuple by value + // It would be great to notice when sizeof...(Args) == 1 and directly + // return the first (only) value, instead of making pop()'s caller + // call std::get<0>(value). See push(DataTuple) remarks for why we + // haven't yet jumped through those hoops. DataTuple pop() { return tuple_cdr(popWithTime()); @@ -224,16 +229,17 @@ namespace LL { // Pick a point suitably far into the future. TimePoint until = TimePoint::clock::now() + std::chrono::hours(24); - if (tryPopUntil_(lock, until, tt)) + pop_result popped = tryPopUntil_(lock, until, tt); + if (popped == super::POPPED) return std::move(tt); - // empty and closed: throw, just as super::pop() does - if (super::mStorage.empty() && super::mClosed) + // DONE: throw, just as super::pop() does + if (popped == super::DONE) { LLTHROW(LLThreadSafeQueueInterrupt()); } - // If not empty, we've still got items to drain. - // If not closed, it's worth waiting for more items. + // WAITING: we've still got items to drain. + // EMPTY: not closed, so it's worth waiting for more items. // Either way, loop back to wait. } } @@ -252,6 +258,16 @@ namespace LL return true; } + /// for when Args has exactly one type + bool tryPop(typename std::tuple_element<1, TimeTuple>::type& value) + { + TimeTuple tt; + if (! super::tryPop(tt)) + return false; + value = std::get<1>(std::move(tt)); + return true; + } + /// tryPopFor() template bool tryPopFor(const std::chrono::duration& timeout, Tuple& tuple) @@ -278,11 +294,12 @@ namespace LL { // Use our time_point_cast to allow for 'until' that's a // time_point type other than TimePoint. - return tryPopUntil_(lock, time_point_cast(until), tuple); + return super::POPPED == + tryPopUntil_(lock, LL::time_point_cast(until), tuple); }); } - bool tryPopUntil_(lock_t& lock, const TimePoint& until, TimeTuple& tuple) + pop_result tryPopUntil_(lock_t& lock, const TimePoint& until, TimeTuple& tuple) { TimePoint adjusted = until; if (! super::mStorage.empty()) @@ -292,7 +309,14 @@ namespace LL adjusted = min(std::get<0>(super::mStorage.front()), adjusted); } // now delegate to base-class tryPopUntil_() - return super::tryPopUntil_(lock, adjusted, tuple); + pop_result popped; + while ((popped = super::tryPopUntil_(lock, adjusted, tuple)) == super::WAITING) + { + // If super::tryPopUntil_() returns WAITING, it means there's + // a head item, but it's not yet time. But it's worth looping + // back to recheck. + } + return popped; } /// tryPopUntil(DataTuple&) @@ -307,6 +331,18 @@ namespace LL return true; } + /// for when Args has exactly one type + template + bool tryPopUntil(const std::chrono::time_point& until, + typename std::tuple_element<1, TimeTuple>::type& value) + { + TimeTuple tt; + if (! tryPopUntil(until, tt)) + return false; + value = std::get<1>(std::move(tt)); + return true; + } + /*------------------------------ etc. ------------------------------*/ // We can't hide items that aren't yet ready because we can't traverse // the underlying priority_queue: it has no iterators, only top(). So -- cgit v1.3