From 1b1ebdf183e50c6a751493570ee6e643c33c4eda Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Mon, 4 Oct 2021 11:48:58 -0400 Subject: SL-16024: Introduce tuple.h with tuple_cons(), tuple_cdr(). These functions allow prepending or removing an item at the left end of an arbitrary tuple -- for instance, to add a sequence key to a caller's data, then remove it again when delivering the original tuple. --- indra/llcommon/tests/tuple_test.cpp | 47 +++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 indra/llcommon/tests/tuple_test.cpp (limited to 'indra/llcommon/tests') diff --git a/indra/llcommon/tests/tuple_test.cpp b/indra/llcommon/tests/tuple_test.cpp new file mode 100644 index 0000000000..af94e2086c --- /dev/null +++ b/indra/llcommon/tests/tuple_test.cpp @@ -0,0 +1,47 @@ +/** + * @file tuple_test.cpp + * @author Nat Goodspeed + * @date 2021-10-04 + * @brief Test for tuple. + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +// Precompiled header +#include "linden_common.h" +// associated header +#include "tuple.h" +// STL headers +// std headers +// external library headers +// other Linden headers +#include "../test/lltut.h" + +/***************************************************************************** +* TUT +*****************************************************************************/ +namespace tut +{ + struct tuple_data + { + }; + typedef test_group tuple_group; + typedef tuple_group::object object; + tuple_group tuplegrp("tuple"); + + template<> template<> + void object::test<1>() + { + set_test_name("tuple"); + std::tuple tup{ "abc", 17 }; + std::tuple ptup{ tuple_cons(34, tup) }; + std::tuple tup2; + int i; + std::tie(i, tup2) = tuple_split(ptup); + ensure_equals("tuple_car() fail", i, 34); + ensure_equals("tuple_cdr() (0) fail", std::get<0>(tup2), "abc"); + ensure_equals("tuple_cdr() (1) fail", std::get<1>(tup2), 17); + } +} // namespace tut -- cgit v1.3 From 955b967623983cb50ba09f7b82e5f01f2c6bcebb Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Tue, 5 Oct 2021 17:31:53 -0400 Subject: SL-16024: Add ThreadSafeSchedule, a timestamped LLThreadSafeQueue. ThreadSafeSchedule orders its items by timestamp, which can be passed either implicitly or explicitly. The timestamp specifies earliest delivery time: an item cannot be popped until that time. Add initial tests. Tweak the LLThreadSafeQueue base class to support ThreadSafeSchedule: introduce virtual canPop() method to report whether the current head item is available to pop. The base class unconditionally says yes, ThreadSafeSchedule says it depends on whether its timestamp is still in the future. This replaces the protected pop_() overload accepting a predicate. Rather than explicitly passing a predicate through a couple levels of function call, use canPop() at the level it matters. Runtime behavior that varies depending on an object's leaf class is what virtual functions were invented for. Give pop_() a three-state enum return so pop() can distinguish between "closed and empty" (throws exception) versus "closed, not yet drained because we're not yet ready to pop the head item" (waits). Also break out protected tryPopUntil_() method, the body logic of tryPopUntil(). The public method locks the data structure, the protected method requires that its caller has already done so. Add chrono.h with a more full-featured LL::time_point_cast() function than the one found in , which only converts between time_point durations, not between time_points based on different clocks. --- indra/llcommon/CMakeLists.txt | 3 + indra/llcommon/chrono.h | 65 +++++ indra/llcommon/llthreadsafequeue.h | 121 ++++---- indra/llcommon/tests/threadsafeschedule_test.cpp | 65 +++++ indra/llcommon/threadsafeschedule.h | 334 +++++++++++++++++++++++ 5 files changed, 535 insertions(+), 53 deletions(-) create mode 100644 indra/llcommon/chrono.h create mode 100644 indra/llcommon/tests/threadsafeschedule_test.cpp create mode 100644 indra/llcommon/threadsafeschedule.h (limited to 'indra/llcommon/tests') diff --git a/indra/llcommon/CMakeLists.txt b/indra/llcommon/CMakeLists.txt index 6558219462..5efcfabf24 100644 --- a/indra/llcommon/CMakeLists.txt +++ b/indra/llcommon/CMakeLists.txt @@ -127,6 +127,7 @@ set(llcommon_SOURCE_FILES set(llcommon_HEADER_FILES CMakeLists.txt + chrono.h ctype_workaround.h fix_macros.h indra_constants.h @@ -253,6 +254,7 @@ set(llcommon_HEADER_FILES lockstatic.h stdtypes.h stringize.h + threadsafeschedule.h timer.h tuple.h u64.h @@ -359,6 +361,7 @@ if (LL_TESTS) LL_ADD_INTEGRATION_TEST(lluri "" "${test_libs}") LL_ADD_INTEGRATION_TEST(llunits "" "${test_libs}") LL_ADD_INTEGRATION_TEST(stringize "" "${test_libs}") + LL_ADD_INTEGRATION_TEST(threadsafeschedule "" "${test_libs}") LL_ADD_INTEGRATION_TEST(tuple "" "${test_libs}") ## llexception_test.cpp isn't a regression test, and doesn't need to be run diff --git a/indra/llcommon/chrono.h b/indra/llcommon/chrono.h new file mode 100644 index 0000000000..806e871892 --- /dev/null +++ b/indra/llcommon/chrono.h @@ -0,0 +1,65 @@ +/** + * @file chrono.h + * @author Nat Goodspeed + * @date 2021-10-05 + * @brief supplement with utility functions + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +#if ! defined(LL_CHRONO_H) +#define LL_CHRONO_H + +#include +#include // std::enable_if + +namespace LL +{ + +// time_point_cast() is derived from https://stackoverflow.com/a/35293183 +// without the iteration: we think errors in the ~1 microsecond range are +// probably acceptable. + +// This variant is for the optimal case when the source and dest use the same +// clock: that case is handled by std::chrono. +template ::value, + bool>::type = true> +DestTimePoint time_point_cast(const SrcTimePoint& time) +{ + return std::chrono::time_point_cast(time); +} + +// This variant is for when the source and dest use different clocks -- see +// the linked StackOverflow answer, also Howard Hinnant's, for more context. +template ::value, + bool>::type = true> +DestTimePoint time_point_cast(const SrcTimePoint& time) +{ + // The basic idea is that we must adjust the passed time_point by the + // difference between the clocks' epochs. But since time_point doesn't + // expose its epoch, we fall back on what each of them thinks is now(). + // However, since we necessarily make sequential calls to those now() + // functions, the answers differ not only by the cycles spent executing + // those calls, but by potential OS interruptions between them. Try to + // reduce that error by capturing the source clock time both before and + // after the dest clock, and splitting the difference. Of course an + // interruption between two of these now() calls without a comparable + // interruption between the other two will skew the result, but better is + // more expensive. + const auto src_before = typename SrcTimePoint::clock::now(); + const auto dest_now = typename DestTimePoint::clock::now(); + const auto src_after = typename SrcTimePoint::clock::now(); + const auto src_diff = src_after - src_before; + const auto src_now = src_before + src_diff / 2; + return dest_now + (time - src_now); +} + +} // namespace LL + +#endif /* ! defined(LL_CHRONO_H) */ diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 1dffad6b89..bd2d82d4c3 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -83,6 +83,7 @@ public: // Limiting the number of pending items prevents unbounded growth of the // underlying queue. LLThreadSafeQueue(U32 capacity = 1024); + virtual ~LLThreadSafeQueue() {} // Add an element to the queue (will block if the queue has // reached capacity). @@ -162,10 +163,10 @@ public: // then every subsequent tryPop() call will return false void close(); - // detect closed state + // producer end: are we prevented from pushing any additional items? bool isClosed(); - // inverse of isClosed() - explicit operator bool(); + // consumer end: are we done, is the queue entirely drained? + bool done(); protected: typedef QueueT queue_type; @@ -178,6 +179,11 @@ protected: boost::fibers::condition_variable_any mCapacityCond; boost::fibers::condition_variable_any mEmptyCond; + // implementation logic, suitable for passing to tryLockUntil() + template + bool tryPopUntil_(lock_t& lock, + const std::chrono::time_point& until, + ElementT& element); // if we're able to lock immediately, do so and run the passed callable, // which must accept lock_t& and return bool template @@ -191,11 +197,11 @@ protected: template bool push_(lock_t& lock, T&& element); // while lock is locked, really pop the head element, if we can - bool pop_(lock_t& lock, ElementT& element); - // pop_() with an explicit predicate indicating whether the head element - // is ready to be popped - template - bool pop_(lock_t& lock, ElementT& element, PRED&& pred); + enum pop_result { EMPTY, WAITING, POPPED }; + pop_result pop_(lock_t& lock, ElementT& element); + // Is the current head element ready to pop? We say yes; subclass can + // override as needed. + virtual bool canPop(const ElementT& head) const { return true; } }; /***************************************************************************** @@ -387,26 +393,16 @@ bool LLThreadSafeQueue::tryPushUntil( // while lock is locked, really pop the head element, if we can template -bool LLThreadSafeQueue::pop_(lock_t& lock, ElementT& element) -{ - // default predicate: head element, if present, is always ready to pop - return pop_(lock, element, [](const ElementT&){ return true; }); -} - - -// pop_() with an explicit predicate indicating whether the head element -// is ready to be popped -template -template -bool LLThreadSafeQueue::pop_( - lock_t& lock, ElementT& element, PRED&& pred) +typename LLThreadSafeQueue::pop_result +LLThreadSafeQueue::pop_(lock_t& lock, ElementT& element) { // If mStorage is empty, there's no head element. - // If there's a head element, pass it to the predicate to see if caller - // considers it ready to pop. - // Unless both are satisfied, no point in continuing. - if (mStorage.empty() || ! std::forward(pred)(mStorage.front())) - return false; + if (mStorage.empty()) + return EMPTY; + + // If there's a head element, pass it to canPop() to see if it's ready to pop. + if (! canPop(mStorage.front())) + return WAITING; // std::queue::front() is the element about to pop() element = mStorage.front(); @@ -414,7 +410,7 @@ bool LLThreadSafeQueue::pop_( lock.unlock(); // now that we've popped, if somebody's been waiting to push, signal them mCapacityCond.notify_one(); - return true; + return POPPED; } @@ -422,17 +418,20 @@ template ElementT LLThreadSafeQueue::pop(void) { lock_t lock1(mLock); + ElementT value; while (true) { // On the consumer side, we always try to pop before checking mClosed // so we can finish draining the queue. - ElementT value; - if (pop_(lock1, value)) + pop_result popped = pop_(lock1, value); + if (popped == POPPED) return std::move(value); // Once the queue is empty, mClosed lets us know if there will ever be - // any more coming. - if (mClosed) + // any more coming. If we didn't pop because WAITING, i.e. canPop() + // returned false, then even if the producer end has been closed, + // there's still at least one item to drain: wait for it. + if (popped == EMPTY && mClosed) { LLTHROW(LLThreadSafeQueueInterrupt()); } @@ -452,7 +451,7 @@ bool LLThreadSafeQueue::tryPop(ElementT & element) // no need to check mClosed: tryPop() behavior when the queue is // closed is implemented by simple inability to push any new // elements - return pop_(lock, element); + return pop_(lock, element) == POPPED; }); } @@ -479,26 +478,38 @@ bool LLThreadSafeQueue::tryPopUntil( until, [this, until, &element](lock_t& lock) { - while (true) - { - if (pop_(lock, element)) - return true; + return tryPopUntil_(lock, until, element); + }); +} - if (mClosed) - { - return false; - } - // Storage empty. Wait for signal. - if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until)) - { - // timed out -- formally we might recheck both conditions above - return false; - } - // If we didn't time out, we were notified for some reason. Loop back - // to check. - } - }); +// body of tryPopUntil(), called once we have the lock +template +template +bool LLThreadSafeQueue::tryPopUntil_( + lock_t& lock, + const std::chrono::time_point& until, + ElementT& element) +{ + while (true) + { + if (pop_(lock, element) == POPPED) + return true; + + if (mClosed) + { + return false; + } + + // Storage empty. Wait for signal. + if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until)) + { + // timed out -- formally we might recheck both conditions above + return false; + } + // If we didn't time out, we were notified for some reason. Loop back + // to check. + } } @@ -509,6 +520,7 @@ size_t LLThreadSafeQueue::size(void) return mStorage.size(); } + template void LLThreadSafeQueue::close() { @@ -521,17 +533,20 @@ void LLThreadSafeQueue::close() mCapacityCond.notify_all(); } + template bool LLThreadSafeQueue::isClosed() { lock_t lock(mLock); - return mClosed && mStorage.size() == 0; + return mClosed; } + template -LLThreadSafeQueue::operator bool() +bool LLThreadSafeQueue::done() { - return ! isClosed(); + lock_t lock(mLock); + return mClosed && mStorage.size() == 0; } #endif diff --git a/indra/llcommon/tests/threadsafeschedule_test.cpp b/indra/llcommon/tests/threadsafeschedule_test.cpp new file mode 100644 index 0000000000..ec0fa0c928 --- /dev/null +++ b/indra/llcommon/tests/threadsafeschedule_test.cpp @@ -0,0 +1,65 @@ +/** + * @file threadsafeschedule_test.cpp + * @author Nat Goodspeed + * @date 2021-10-04 + * @brief Test for threadsafeschedule. + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +// Precompiled header +#include "linden_common.h" +// associated header +#include "threadsafeschedule.h" +// STL headers +// std headers +#include +// external library headers +// other Linden headers +#include "../test/lltut.h" + +using namespace std::literals::chrono_literals; // ms suffix +using namespace std::literals::string_literals; // s suffix +using Queue = LL::ThreadSafeSchedule; + +/***************************************************************************** +* TUT +*****************************************************************************/ +namespace tut +{ + struct threadsafeschedule_data + { + Queue queue; + }; + typedef test_group threadsafeschedule_group; + typedef threadsafeschedule_group::object object; + threadsafeschedule_group threadsafeschedulegrp("threadsafeschedule"); + + template<> template<> + void object::test<1>() + { + set_test_name("push"); + // Simply calling push() a few times might result in indeterminate + // delivery order if the resolution of steady_clock is coarser than + // the real time required for each push() call. Explicitly increment + // the timestamp for each one -- but since we're passing explicit + // timestamps, make the queue reorder them. + queue.push(Queue::TimeTuple(Queue::Clock::now() + 20ms, "ghi")); + queue.push("abc"s); + queue.push(Queue::Clock::now() + 10ms, "def"); + queue.close(); + auto entry = queue.pop(); + ensure_equals("failed to pop first", std::get<0>(entry), "abc"s); + entry = queue.pop(); + ensure_equals("failed to pop second", std::get<0>(entry), "def"s); + ensure("queue not closed", queue.isClosed()); + ensure("queue prematurely done", ! queue.done()); + entry = queue.pop(); + ensure_equals("failed to pop third", std::get<0>(entry), "ghi"s); + bool popped = queue.tryPop(entry); + ensure("queue not empty", ! popped); + ensure("queue not done", queue.done()); + } +} // namespace tut diff --git a/indra/llcommon/threadsafeschedule.h b/indra/llcommon/threadsafeschedule.h new file mode 100644 index 0000000000..545c820f53 --- /dev/null +++ b/indra/llcommon/threadsafeschedule.h @@ -0,0 +1,334 @@ +/** + * @file threadsafeschedule.h + * @author Nat Goodspeed + * @date 2021-10-02 + * @brief ThreadSafeSchedule is an ordered queue in which every item has an + * associated timestamp. + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +#if ! defined(LL_THREADSAFESCHEDULE_H) +#define LL_THREADSAFESCHEDULE_H + +#include "chrono.h" +#include "llexception.h" +#include "llthreadsafequeue.h" +#include "tuple.h" +#include +#include + +namespace LL +{ + namespace ThreadSafeSchedulePrivate + { + using TimePoint = std::chrono::steady_clock::time_point; + // Bundle consumer's data with a TimePoint to order items by timestamp. + template + using TimestampedTuple = std::tuple; + + // comparison functor for TimedTuples -- see TimedQueue comments + struct ReverseTupleOrder + { + template + bool operator()(const Tuple& left, const Tuple& right) const + { + return std::get<0>(left) > std::get<0>(right); + } + }; + + template + using TimedQueue = PriorityQueueAdapter< + TimestampedTuple, + // std::vector is the default storage for std::priority_queue, + // have to restate to specify comparison template parameter + std::vector>, + // std::priority_queue uses a counterintuitive comparison + // behavior: the default std::less comparator is used to present + // the *highest* value as top(). So to sort by earliest timestamp, + // we must invert by using >. + ReverseTupleOrder>; + } // namespace ThreadSafeSchedulePrivate + + /** + * ThreadSafeSchedule is an ordered LLThreadSafeQueue in which every item + * is given an associated timestamp. That is, TimePoint is implicitly + * prepended to the std::tuple with the specified types. + * + * Items are popped in increasing chronological order. Moreover, any item + * with a timestamp in the future is held back until + * std::chrono::steady_clock reaches that timestamp. + */ + template + class ThreadSafeSchedule: + public LLThreadSafeQueue, + ThreadSafeSchedulePrivate::TimedQueue> + { + public: + using DataTuple = std::tuple; + using TimeTuple = ThreadSafeSchedulePrivate::TimestampedTuple; + + private: + using super = LLThreadSafeQueue>; + using lock_t = typename super::lock_t; + using super::pop_; + using super::push_; + using super::mClosed; + using super::mEmptyCond; + using super::mCapacityCond; + + public: + using TimePoint = ThreadSafeSchedulePrivate::TimePoint; + using Clock = TimePoint::clock; + + ThreadSafeSchedule(U32 capacity=1024): + super(capacity) + {} + + /*----------------------------- push() -----------------------------*/ + /// explicitly pass TimeTuple + using super::push; + + /// pass DataTuple with implicit now + void push(const DataTuple& tuple) + { + push(tuple_cons(Clock::now(), tuple)); + } + + /// individually pass each component of the TimeTuple + void push(const TimePoint& time, Args&&... args) + { + push(TimeTuple(time, std::forward(args)...)); + } + + /// individually pass every component except the TimePoint (implies + /// now) -- could be ambiguous if the first specified template + /// parameter type is also TimePoint -- we could try to disambiguate, + /// but a simpler approach would be for the caller to explicitly + /// construct DataTuple and call that overload + void push(Args&&... args) + { + push(Clock::now(), std::forward(args)...); + } + + /*--------------------------- tryPush() ----------------------------*/ + /// explicit TimeTuple + using super::tryPush; + + /// DataTuple with implicit now + bool tryPush(const DataTuple& tuple) + { + return tryPush(tuple_cons(Clock::now(), tuple)); + } + + /// individually pass components + bool tryPush(const TimePoint& time, Args&&... args) + { + return tryPush(TimeTuple(time, std::forward(args)...)); + } + + /// individually pass components with implicit now + bool tryPush(Args&&... args) + { + return tryPush(Clock::now(), std::forward(args)...); + } + + /*-------------------------- tryPushFor() --------------------------*/ + /// explicit TimeTuple + using super::tryPushFor; + + /// DataTuple with implicit now + template + bool tryPushFor(const std::chrono::duration& timeout, + const DataTuple& tuple) + { + return tryPushFor(timeout, tuple_cons(Clock::now(), tuple)); + } + + /// individually pass components + template + bool tryPushFor(const std::chrono::duration& timeout, + const TimePoint& time, Args&&... args) + { + return tryPushFor(TimeTuple(time, std::forward(args)...)); + } + + /// individually pass components with implicit now + template + bool tryPushFor(const std::chrono::duration& timeout, + Args&&... args) + { + return tryPushFor(Clock::now(), std::forward(args)...); + } + + /*------------------------- tryPushUntil() -------------------------*/ + /// explicit TimeTuple + using super::tryPushUntil; + + /// DataTuple with implicit now + template + bool tryPushUntil(const std::chrono::time_point& until, + const DataTuple& tuple) + { + return tryPushUntil(until, tuple_cons(Clock::now(), tuple)); + } + + /// individually pass components + template + bool tryPushUntil(const std::chrono::time_point& until, + const TimePoint& time, Args&&... args) + { + return tryPushUntil(until, TimeTuple(time, std::forward(args)...)); + } + + /// individually pass components with implicit now + template + bool tryPushUntil(const std::chrono::time_point& until, + Args&&... args) + { + return tryPushUntil(until, Clock::now(), std::forward(args)...); + } + + /*----------------------------- pop() ------------------------------*/ + // Our consumer may or may not care about the timestamp associated + // with each popped item, so we allow retrieving either DataTuple or + // TimeTuple. One potential use would be to observe, and possibly + // adjust for, the time lag between the item time and the actual + // current time. + + /// pop DataTuple by value + DataTuple pop() + { + return tuple_cdr(popWithTime()); + } + + /// pop TimeTuple by value + TimeTuple popWithTime() + { + lock_t lock(super::mLock); + // We can't just sit around waiting forever, given that there may + // be items in the queue that are not yet ready but will *become* + // ready in the near future. So in fact, with this class, every + // pop() becomes a tryPopUntil(), constrained to the timestamp of + // the head item. It almost doesn't matter what we specify for the + // caller's time constraint -- all we really care about is the + // head item's timestamp. Since pop() and popWithTime() are + // defined to wait until either an item becomes available or the + // queue is closed, loop until one of those things happens. The + // constraint we pass just determines how often we'll loop while + // waiting. + TimeTuple tt; + while (true) + { + // Pick a point suitably far into the future. + TimePoint until = TimePoint::clock::now() + std::chrono::hours(24); + if (tryPopUntil_(lock, until, tt)) + return std::move(tt); + + // empty and closed: throw, just as super::pop() does + if (super::mStorage.empty() && super::mClosed) + { + LLTHROW(LLThreadSafeQueueInterrupt()); + } + // If not empty, we've still got items to drain. + // If not closed, it's worth waiting for more items. + // Either way, loop back to wait. + } + } + + // We can use tryPop(TimeTuple&) just as it stands; the only behavior + // difference is in our canPop() override method. + using super::tryPop; + + /// tryPop(DataTuple&) + bool tryPop(DataTuple& tuple) + { + TimeTuple tt; + if (! super::tryPop(tt)) + return false; + tuple = tuple_cdr(std::move(tt)); + return true; + } + + /// tryPopFor() + template + bool tryPopFor(const std::chrono::duration& timeout, Tuple& tuple) + { + // It's important to use OUR tryPopUntil() implementation, rather + // than delegating immediately to our base class. + return tryPopUntil(Clock::now() + timeout, tuple); + } + + /// tryPopUntil(TimeTuple&) + template + bool tryPopUntil(const std::chrono::time_point& until, + TimeTuple& tuple) + { + // super::tryPopUntil() wakes up when an item becomes available or + // we hit 'until', whichever comes first. Thing is, the current + // head of the queue could become ready sooner than either of + // those events, and we need to deliver it as soon as it does. + // Don't wait past the TimePoint of the head item. + // Naturally, lock the queue before peeking at mStorage. + return super::tryLockUntil( + until, + [this, until, &tuple](lock_t& lock) + { + // Use our time_point_cast to allow for 'until' that's a + // time_point type other than TimePoint. + return tryPopUntil_(lock, time_point_cast(until), tuple); + }); + } + + bool tryPopUntil_(lock_t& lock, const TimePoint& until, TimeTuple& tuple) + { + TimePoint adjusted = until; + if (! super::mStorage.empty()) + { + // use whichever is earlier: the head item's timestamp, or + // the caller's limit + adjusted = min(std::get<0>(super::mStorage.front()), adjusted); + } + // now delegate to base-class tryPopUntil_() + return super::tryPopUntil_(lock, adjusted, tuple); + } + + /// tryPopUntil(DataTuple&) + template + bool tryPopUntil(const std::chrono::time_point& until, + DataTuple& tuple) + { + TimeTuple tt; + if (! tryPopUntil(until, tt)) + return false; + tuple = tuple_cdr(std::move(tt)); + return true; + } + + /*------------------------------ etc. ------------------------------*/ + // We can't hide items that aren't yet ready because we can't traverse + // the underlying priority_queue: it has no iterators, only top(). So + // a consumer could observe size() > 0 and yet tryPop() returns false. + // Shrug, in a multi-consumer scenario that would be expected behavior. + using super::size; + // open/closed state + using super::close; + using super::isClosed; + using super::done; + + private: + // this method is called by base class pop_() every time we're + // considering whether to deliver the current head element + bool canPop(const TimeTuple& head) const override + { + // an item with a future timestamp isn't yet ready to pop + // (should we add some slop for overhead?) + return std::get<0>(head) <= Clock::now(); + } + }; + +} // namespace LL + +#endif /* ! defined(LL_THREADSAFESCHEDULE_H) */ -- cgit v1.3 From cf70766b4504f7ee745822926c526ed9c86c9339 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Wed, 6 Oct 2021 12:54:29 -0400 Subject: SL-16024: Fix ThreadSafeSchedule::tryPopFor(), tryPopUntil(). ThreadSafeSchedule::tryPopUntil() (and therefore tryPopFor()) was simply delegating to LLThreadSafeQueue::tryPopUntil(), with an adjusted timeout since we want to wake up as soon as the head item, if any, becomes ready. But then we have to loop back to retry the pop to actually deal with that head item. In addition, ThreadSafeSchedule::popWithTime() was spinning rather than properly blocking on a timed condition variable. Fixed. --- indra/llcommon/llthreadsafequeue.h | 51 +++++++++-------- indra/llcommon/tests/threadsafeschedule_test.cpp | 10 +++- indra/llcommon/threadsafeschedule.h | 72 ++++++++++++++++++------ 3 files changed, 88 insertions(+), 45 deletions(-) (limited to 'indra/llcommon/tests') diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index bd2d82d4c3..719edcd579 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -179,11 +179,12 @@ protected: boost::fibers::condition_variable_any mCapacityCond; boost::fibers::condition_variable_any mEmptyCond; + enum pop_result { EMPTY, DONE, WAITING, POPPED }; // implementation logic, suitable for passing to tryLockUntil() template - bool tryPopUntil_(lock_t& lock, - const std::chrono::time_point& until, - ElementT& element); + pop_result tryPopUntil_(lock_t& lock, + const std::chrono::time_point& until, + ElementT& element); // if we're able to lock immediately, do so and run the passed callable, // which must accept lock_t& and return bool template @@ -197,7 +198,6 @@ protected: template bool push_(lock_t& lock, T&& element); // while lock is locked, really pop the head element, if we can - enum pop_result { EMPTY, WAITING, POPPED }; pop_result pop_(lock_t& lock, ElementT& element); // Is the current head element ready to pop? We say yes; subclass can // override as needed. @@ -398,7 +398,7 @@ LLThreadSafeQueue::pop_(lock_t& lock, ElementT& element) { // If mStorage is empty, there's no head element. if (mStorage.empty()) - return EMPTY; + return mClosed? DONE : EMPTY; // If there's a head element, pass it to canPop() to see if it's ready to pop. if (! canPop(mStorage.front())) @@ -427,16 +427,16 @@ ElementT LLThreadSafeQueue::pop(void) if (popped == POPPED) return std::move(value); - // Once the queue is empty, mClosed lets us know if there will ever be - // any more coming. If we didn't pop because WAITING, i.e. canPop() - // returned false, then even if the producer end has been closed, - // there's still at least one item to drain: wait for it. - if (popped == EMPTY && mClosed) + // Once the queue is DONE, there will never be any more coming. + if (popped == DONE) { LLTHROW(LLThreadSafeQueueInterrupt()); } - // Storage empty, queue still open. Wait for signal. + // If we didn't pop because WAITING, i.e. canPop() returned false, + // then even if the producer end has been closed, there's still at + // least one item to drain: wait for it. Or we might be EMPTY, with + // the queue still open. Either way, wait for signal. mEmptyCond.wait(lock1); } } @@ -448,8 +448,8 @@ bool LLThreadSafeQueue::tryPop(ElementT & element) return tryLock( [this, &element](lock_t& lock) { - // no need to check mClosed: tryPop() behavior when the queue is - // closed is implemented by simple inability to push any new + // conflate EMPTY, DONE, WAITING: tryPop() behavior when the queue + // is closed is implemented by simple inability to push any new // elements return pop_(lock, element) == POPPED; }); @@ -478,7 +478,8 @@ bool LLThreadSafeQueue::tryPopUntil( until, [this, until, &element](lock_t& lock) { - return tryPopUntil_(lock, until, element); + // conflate EMPTY, DONE, WAITING + return tryPopUntil_(lock, until, element) == POPPED; }); } @@ -486,26 +487,28 @@ bool LLThreadSafeQueue::tryPopUntil( // body of tryPopUntil(), called once we have the lock template template -bool LLThreadSafeQueue::tryPopUntil_( +typename LLThreadSafeQueue::pop_result +LLThreadSafeQueue::tryPopUntil_( lock_t& lock, const std::chrono::time_point& until, ElementT& element) { while (true) { - if (pop_(lock, element) == POPPED) - return true; - - if (mClosed) + pop_result popped = pop_(lock, element); + if (popped == POPPED || popped == DONE) { - return false; + // If we succeeded, great! If we've drained the last item, so be + // it. Either way, break the loop and tell caller. + return popped; } - // Storage empty. Wait for signal. + // EMPTY or WAITING: wait for signal. if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until)) { - // timed out -- formally we might recheck both conditions above - return false; + // timed out -- formally we might recheck + // as it is, break loop + return popped; } // If we didn't time out, we were notified for some reason. Loop back // to check. @@ -546,7 +549,7 @@ template bool LLThreadSafeQueue::done() { lock_t lock(mLock); - return mClosed && mStorage.size() == 0; + return mClosed && mStorage.empty(); } #endif diff --git a/indra/llcommon/tests/threadsafeschedule_test.cpp b/indra/llcommon/tests/threadsafeschedule_test.cpp index ec0fa0c928..af67b9f492 100644 --- a/indra/llcommon/tests/threadsafeschedule_test.cpp +++ b/indra/llcommon/tests/threadsafeschedule_test.cpp @@ -47,6 +47,8 @@ namespace tut // the timestamp for each one -- but since we're passing explicit // timestamps, make the queue reorder them. queue.push(Queue::TimeTuple(Queue::Clock::now() + 20ms, "ghi")); + // Given the various push() overloads, you have to match the type + // exactly: conversions are ambiguous. queue.push("abc"s); queue.push(Queue::Clock::now() + 10ms, "def"); queue.close(); @@ -56,9 +58,11 @@ namespace tut ensure_equals("failed to pop second", std::get<0>(entry), "def"s); ensure("queue not closed", queue.isClosed()); ensure("queue prematurely done", ! queue.done()); - entry = queue.pop(); - ensure_equals("failed to pop third", std::get<0>(entry), "ghi"s); - bool popped = queue.tryPop(entry); + std::string s; + bool popped = queue.tryPopFor(1s, s); + ensure("failed to pop third", popped); + ensure_equals("third is wrong", s, "ghi"s); + popped = queue.tryPop(s); ensure("queue not empty", ! popped); ensure("queue not done", queue.done()); } diff --git a/indra/llcommon/threadsafeschedule.h b/indra/llcommon/threadsafeschedule.h index 545c820f53..8ab4311ca1 100644 --- a/indra/llcommon/threadsafeschedule.h +++ b/indra/llcommon/threadsafeschedule.h @@ -73,11 +73,7 @@ namespace LL private: using super = LLThreadSafeQueue>; using lock_t = typename super::lock_t; - using super::pop_; - using super::push_; - using super::mClosed; - using super::mEmptyCond; - using super::mCapacityCond; + using pop_result = typename super::pop_result; public: using TimePoint = ThreadSafeSchedulePrivate::TimePoint; @@ -92,6 +88,11 @@ namespace LL using super::push; /// pass DataTuple with implicit now + // This could be ambiguous for Args with a single type. Unfortunately + // we can't enable_if an individual method with a condition based on + // the *class* template arguments, only on that method's template + // arguments. We could specialize this class for the single-Args case; + // we could minimize redundancy by breaking out a common base class... void push(const DataTuple& tuple) { push(tuple_cons(Clock::now(), tuple)); @@ -103,11 +104,11 @@ namespace LL push(TimeTuple(time, std::forward(args)...)); } - /// individually pass every component except the TimePoint (implies - /// now) -- could be ambiguous if the first specified template - /// parameter type is also TimePoint -- we could try to disambiguate, - /// but a simpler approach would be for the caller to explicitly - /// construct DataTuple and call that overload + /// individually pass every component except the TimePoint (implies now) + // This could be ambiguous if the first specified template parameter + // type is also TimePoint. We could try to disambiguate, but a simpler + // approach would be for the caller to explicitly construct DataTuple + // and call that overload. void push(Args&&... args) { push(Clock::now(), std::forward(args)...); @@ -199,6 +200,10 @@ namespace LL // current time. /// pop DataTuple by value + // It would be great to notice when sizeof...(Args) == 1 and directly + // return the first (only) value, instead of making pop()'s caller + // call std::get<0>(value). See push(DataTuple) remarks for why we + // haven't yet jumped through those hoops. DataTuple pop() { return tuple_cdr(popWithTime()); @@ -224,16 +229,17 @@ namespace LL { // Pick a point suitably far into the future. TimePoint until = TimePoint::clock::now() + std::chrono::hours(24); - if (tryPopUntil_(lock, until, tt)) + pop_result popped = tryPopUntil_(lock, until, tt); + if (popped == super::POPPED) return std::move(tt); - // empty and closed: throw, just as super::pop() does - if (super::mStorage.empty() && super::mClosed) + // DONE: throw, just as super::pop() does + if (popped == super::DONE) { LLTHROW(LLThreadSafeQueueInterrupt()); } - // If not empty, we've still got items to drain. - // If not closed, it's worth waiting for more items. + // WAITING: we've still got items to drain. + // EMPTY: not closed, so it's worth waiting for more items. // Either way, loop back to wait. } } @@ -252,6 +258,16 @@ namespace LL return true; } + /// for when Args has exactly one type + bool tryPop(typename std::tuple_element<1, TimeTuple>::type& value) + { + TimeTuple tt; + if (! super::tryPop(tt)) + return false; + value = std::get<1>(std::move(tt)); + return true; + } + /// tryPopFor() template bool tryPopFor(const std::chrono::duration& timeout, Tuple& tuple) @@ -278,11 +294,12 @@ namespace LL { // Use our time_point_cast to allow for 'until' that's a // time_point type other than TimePoint. - return tryPopUntil_(lock, time_point_cast(until), tuple); + return super::POPPED == + tryPopUntil_(lock, LL::time_point_cast(until), tuple); }); } - bool tryPopUntil_(lock_t& lock, const TimePoint& until, TimeTuple& tuple) + pop_result tryPopUntil_(lock_t& lock, const TimePoint& until, TimeTuple& tuple) { TimePoint adjusted = until; if (! super::mStorage.empty()) @@ -292,7 +309,14 @@ namespace LL adjusted = min(std::get<0>(super::mStorage.front()), adjusted); } // now delegate to base-class tryPopUntil_() - return super::tryPopUntil_(lock, adjusted, tuple); + pop_result popped; + while ((popped = super::tryPopUntil_(lock, adjusted, tuple)) == super::WAITING) + { + // If super::tryPopUntil_() returns WAITING, it means there's + // a head item, but it's not yet time. But it's worth looping + // back to recheck. + } + return popped; } /// tryPopUntil(DataTuple&) @@ -307,6 +331,18 @@ namespace LL return true; } + /// for when Args has exactly one type + template + bool tryPopUntil(const std::chrono::time_point& until, + typename std::tuple_element<1, TimeTuple>::type& value) + { + TimeTuple tt; + if (! tryPopUntil(until, tt)) + return false; + value = std::get<1>(std::move(tt)); + return true; + } + /*------------------------------ etc. ------------------------------*/ // We can't hide items that aren't yet ready because we can't traverse // the underlying priority_queue: it has no iterators, only top(). So -- cgit v1.3 From b554c9eaf45c83500e6b65e295cc507b9a3d537b Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 7 Oct 2021 14:00:39 -0400 Subject: SL-16024: Adapt llinstancetracker_test.cpp to getInstance() change. --- indra/llcommon/tests/llinstancetracker_test.cpp | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'indra/llcommon/tests') diff --git a/indra/llcommon/tests/llinstancetracker_test.cpp b/indra/llcommon/tests/llinstancetracker_test.cpp index 9b89159625..5daa29adf4 100644 --- a/indra/llcommon/tests/llinstancetracker_test.cpp +++ b/indra/llcommon/tests/llinstancetracker_test.cpp @@ -90,19 +90,19 @@ namespace tut { Keyed one("one"); ensure_equals(Keyed::instanceCount(), 1); - Keyed* found = Keyed::getInstance("one"); - ensure("couldn't find stack Keyed", found); - ensure_equals("found wrong Keyed instance", found, &one); + auto found = Keyed::getInstance("one"); + ensure("couldn't find stack Keyed", bool(found)); + ensure_equals("found wrong Keyed instance", found.get(), &one); { boost::scoped_ptr two(new Keyed("two")); ensure_equals(Keyed::instanceCount(), 2); - Keyed* found = Keyed::getInstance("two"); - ensure("couldn't find heap Keyed", found); - ensure_equals("found wrong Keyed instance", found, two.get()); + auto found = Keyed::getInstance("two"); + ensure("couldn't find heap Keyed", bool(found)); + ensure_equals("found wrong Keyed instance", found.get(), two.get()); } ensure_equals(Keyed::instanceCount(), 1); } - Keyed* found = Keyed::getInstance("one"); + auto found = Keyed::getInstance("one"); ensure("Keyed key lives too long", ! found); ensure_equals(Keyed::instanceCount(), 0); } -- cgit v1.3 From 623ac79120a417ec445ce5c106a907fe46734309 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 7 Oct 2021 15:32:51 -0400 Subject: SL-16024: Add LL::WorkQueue for passing work items between threads. A typical WorkQueue has a string name, which can be used to find it to post work to it. "Work" is a nullary callable. WorkQueue is a multi-producer, multi-consumer thread-safe queue: multiple threads can service the WorkQueue, multiple threads can post work to it. Work can be scheduled in the future by submitting with a timestamp. In addition, a given work item can be scheduled to run on a recurring basis. A requesting thread servicing a WorkQueue of its own, such as the viewer's main thread, can submit work to another WorkQueue along with a callback to be passed the result (of arbitrary type) of the first work item. The callback is posted to the originating WorkQueue, permitting safe data exchange between participating threads. Methods are provided for different kinds of servicing threads. runUntilClose() is useful for a simple worker thread. runFor(duration) devotes no more than a specified time slice to that WorkQueue, e.g. for use by the main thread. --- indra/llcommon/CMakeLists.txt | 3 + indra/llcommon/tests/workqueue_test.cpp | 158 ++++++++++++++++ indra/llcommon/threadsafeschedule.h | 1 + indra/llcommon/workqueue.cpp | 114 +++++++++++ indra/llcommon/workqueue.h | 325 ++++++++++++++++++++++++++++++++ 5 files changed, 601 insertions(+) create mode 100644 indra/llcommon/tests/workqueue_test.cpp create mode 100644 indra/llcommon/workqueue.cpp create mode 100644 indra/llcommon/workqueue.h (limited to 'indra/llcommon/tests') diff --git a/indra/llcommon/CMakeLists.txt b/indra/llcommon/CMakeLists.txt index 5efcfabf24..a3dbb6d9d0 100644 --- a/indra/llcommon/CMakeLists.txt +++ b/indra/llcommon/CMakeLists.txt @@ -121,6 +121,7 @@ set(llcommon_SOURCE_FILES llworkerthread.cpp timing.cpp u64.cpp + workqueue.cpp StackWalker.cpp ) @@ -258,6 +259,7 @@ set(llcommon_HEADER_FILES timer.h tuple.h u64.h + workqueue.h StackWalker.h ) @@ -363,6 +365,7 @@ if (LL_TESTS) LL_ADD_INTEGRATION_TEST(stringize "" "${test_libs}") LL_ADD_INTEGRATION_TEST(threadsafeschedule "" "${test_libs}") LL_ADD_INTEGRATION_TEST(tuple "" "${test_libs}") + LL_ADD_INTEGRATION_TEST(workqueue "" "${test_libs}") ## llexception_test.cpp isn't a regression test, and doesn't need to be run ## every build. It's to help a developer make implementation choices about diff --git a/indra/llcommon/tests/workqueue_test.cpp b/indra/llcommon/tests/workqueue_test.cpp new file mode 100644 index 0000000000..ab1cae6c14 --- /dev/null +++ b/indra/llcommon/tests/workqueue_test.cpp @@ -0,0 +1,158 @@ +/** + * @file workqueue_test.cpp + * @author Nat Goodspeed + * @date 2021-10-07 + * @brief Test for workqueue. + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +// Precompiled header +#include "linden_common.h" +// associated header +#include "workqueue.h" +// STL headers +// std headers +#include +#include +// external library headers +// other Linden headers +#include "../test/lltut.h" +#include "llcond.h" +#include "llstring.h" +#include "stringize.h" + +using namespace LL; +using namespace std::literals::chrono_literals; // ms suffix +using namespace std::literals::string_literals; // s suffix + +/***************************************************************************** +* TUT +*****************************************************************************/ +namespace tut +{ + struct workqueue_data + { + WorkQueue queue{"queue"}; + }; + typedef test_group workqueue_group; + typedef workqueue_group::object object; + workqueue_group workqueuegrp("workqueue"); + + template<> template<> + void object::test<1>() + { + set_test_name("name"); + ensure_equals("didn't capture name", queue.getKey(), "queue"); + ensure("not findable", WorkQueue::getInstance("queue") == queue.getWeak().lock()); + WorkQueue q2; + ensure("has no name", LLStringUtil::startsWith(q2.getKey(), "WorkQueue")); + } + + template<> template<> + void object::test<2>() + { + set_test_name("post"); + bool wasRun{ false }; + // We only get away with binding a simple bool because we're running + // the work on the same thread. + queue.post([&wasRun](){ wasRun = true; }); + queue.close(); + ensure("ran too soon", ! wasRun); + queue.runUntilClose(); + ensure("didn't run", wasRun); + } + + template<> template<> + void object::test<3>() + { + set_test_name("postEvery"); + // record of runs + using Shared = std::deque; + // This is an example of how to share data between the originator of + // postEvery(work) and the work item itself, since usually a WorkQueue + // is used to dispatch work to a different thread. Neither of them + // should call any of LLCond's wait methods: you don't want to stall + // either the worker thread or the originating thread (conventionally + // main). Use LLCond or a subclass even if all you want to do is + // signal the work item that it can quit; consider LLOneShotCond. + LLCond data; + auto start = WorkQueue::TimePoint::clock::now(); + auto interval = 100ms; + queue.postEvery( + interval, + [&data, count = 0] + () mutable + { + // record the timestamp at which this instance is running + data.update_one( + [](Shared& data) + { + data.push_back(WorkQueue::TimePoint::clock::now()); + }); + // by the 3rd call, return false to stop + return (++count < 3); + }); + // no convenient way to close() our queue while we've got a + // postEvery() running, so run until we think we should have exhausted + // the iterations + queue.runFor(10*interval); + // Take a copy of the captured deque. + Shared result = data.get(); + ensure_equals("called wrong number of times", result.size(), 3); + // postEvery() assumes you want the first call to happen right away. + // Inject a fake start time that's (interval) earlier than that, to + // make our too early/too late tests uniform for all entries. + result.push_front(start - interval); + for (size_t i = 1; i < result.size(); ++i) + { + auto diff = (result[i] - result[i-1]); + try + { + ensure(STRINGIZE("call " << i << " too soon"), diff >= interval); + ensure(STRINGIZE("call " << i << " too late"), diff < interval*1.5); + } + catch (const tut::failure&) + { + auto interval_ms = interval / 1ms; + auto diff_ms = diff / 1ms; + std::cerr << "interval " << interval_ms + << "ms; diff " << diff_ms << "ms" << std::endl; + throw; + } + } + } + + template<> template<> + void object::test<4>() + { + set_test_name("postTo"); + WorkQueue main("main"); + auto qptr = WorkQueue::getInstance("queue"); + int result = 0; + main.postTo( + qptr, + [](){ return 17; }, + // Note that a postTo() *callback* can safely bind a reference to + // a variable on the invoking thread, because the callback is run + // on the invoking thread. + [&result](int i){ result = i; }); + // this should post the callback to main + qptr->runOne(); + // this should run the callback + main.runOne(); + ensure_equals("failed to run int callback", result, 17); + + std::string alpha; + // postTo() handles arbitrary return types + main.postTo( + qptr, + [](){ return "abc"s; }, + [&alpha](const std::string& s){ alpha = s; }); + qptr->runPending(); + main.runPending(); + ensure_equals("failed to run string callback", alpha, "abc"); + } +} // namespace tut diff --git a/indra/llcommon/threadsafeschedule.h b/indra/llcommon/threadsafeschedule.h index 0e70d30714..c8ad23532b 100644 --- a/indra/llcommon/threadsafeschedule.h +++ b/indra/llcommon/threadsafeschedule.h @@ -78,6 +78,7 @@ namespace LL enum pop_result { EMPTY=super::EMPTY, DONE=super::DONE, WAITING=super::WAITING, POPPED=super::POPPED }; public: + using Closed = LLThreadSafeQueueInterrupt; using TimePoint = ThreadSafeSchedulePrivate::TimePoint; using Clock = TimePoint::clock; diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp new file mode 100644 index 0000000000..15e292fb43 --- /dev/null +++ b/indra/llcommon/workqueue.cpp @@ -0,0 +1,114 @@ +/** + * @file workqueue.cpp + * @author Nat Goodspeed + * @date 2021-10-06 + * @brief Implementation for WorkQueue. + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +// Precompiled header +#include "linden_common.h" +// associated header +#include "workqueue.h" +// STL headers +// std headers +// external library headers +// other Linden headers +#include "llerror.h" +#include "llexception.h" +#include "stringize.h" + +LL::WorkQueue::WorkQueue(const std::string& name): + super(makeName(name)) +{ + // TODO: register for "LLApp" events so we can implicitly close() on + // viewer shutdown. +} + +void LL::WorkQueue::close() +{ + mQueue.close(); +} + +void LL::WorkQueue::runUntilClose() +{ + try + { + for (;;) + { + callWork(mQueue.pop()); + } + } + catch (const Queue::Closed&) + { + } +} + +bool LL::WorkQueue::runPending() +{ + for (Work work; mQueue.tryPop(work); ) + { + callWork(work); + } + return ! mQueue.done(); +} + +bool LL::WorkQueue::runOne() +{ + Work work; + if (mQueue.tryPop(work)) + { + callWork(work); + } + return ! mQueue.done(); +} + +bool LL::WorkQueue::runUntil(const TimePoint& until) +{ + // Should we subtract some slop to allow for typical Work execution time? + // How much slop? + Work work; + while (TimePoint::clock::now() < until && mQueue.tryPopUntil(until, work)) + { + callWork(work); + } + return ! mQueue.done(); +} + +std::string LL::WorkQueue::makeName(const std::string& name) +{ + if (! name.empty()) + return name; + + static thread_local U32 discriminator = 0; + return STRINGIZE("WorkQueue" << discriminator++); +} + +void LL::WorkQueue::callWork(const Queue::DataTuple& work) +{ + // ThreadSafeSchedule::pop() always delivers a tuple, even when + // there's only one data field per item, as for us. + callWork(std::get<0>(work)); +} + +void LL::WorkQueue::callWork(const Work& work) +{ + try + { + work(); + } + catch (...) + { + // No matter what goes wrong with any individual work item, the worker + // thread must go on! Log our own instance name with the exception. + LOG_UNHANDLED_EXCEPTION(getKey()); + } +} + +void LL::WorkQueue::error(const std::string& msg) +{ + LL_ERRS("WorkQueue") << msg << LL_ENDL; +} diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h new file mode 100644 index 0000000000..a52f7b0e26 --- /dev/null +++ b/indra/llcommon/workqueue.h @@ -0,0 +1,325 @@ +/** + * @file workqueue.h + * @author Nat Goodspeed + * @date 2021-09-30 + * @brief Queue used for inter-thread work passing. + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +#if ! defined(LL_WORKQUEUE_H) +#define LL_WORKQUEUE_H + +#include "llinstancetracker.h" +#include "threadsafeschedule.h" +#include +#include // std::function +#include +#include +#include // std::pair +#include + +namespace LL +{ + /** + * A typical WorkQueue has a string name that can be used to find it. + */ + class WorkQueue: public LLInstanceTracker + { + private: + using super = LLInstanceTracker; + + public: + using Work = std::function; + + private: + using Queue = ThreadSafeSchedule; + // helper for postEvery() + template + class BackJack; + + public: + using TimePoint = Queue::TimePoint; + using TimedWork = Queue::TimeTuple; + using Closed = Queue::Closed; + + /** + * You may omit the WorkQueue name, in which case a unique name is + * synthesized; for practical purposes that makes it anonymous. + */ + WorkQueue(const std::string& name = std::string()); + + /** + * Since the point of WorkQueue is to pass work to some other worker + * thread(s) asynchronously, it's important that the WorkQueue continue + * to exist until the worker thread(s) have drained it. To communicate + * that it's time for them to quit, close() the queue. + */ + void close(); + + /*---------------------- fire and forget API -----------------------*/ + + /// fire-and-forget, but at a particular (future?) time + template + void post(const TimePoint& time, CALLABLE&& callable) + { + // Defer reifying an arbitrary CALLABLE until we hit this method. + // All other methods should accept CALLABLEs of arbitrary type to + // avoid multiple levels of std::function indirection. + mQueue.push(TimedWork(time, std::move(callable))); + } + + /// fire-and-forget + template + void post(CALLABLE&& callable) + { + // We use TimePoint::clock::now() instead of TimePoint's + // representation of the epoch because this WorkQueue may contain + // a mix of past-due TimedWork items and TimedWork items scheduled + // for the future. Sift this new item into the correct place. + post(TimePoint::clock::now(), std::move(callable)); + } + + /** + * Launch a callable returning bool that will trigger repeatedly at + * specified interval, until the callable returns false. + * + * If you need to signal that callable from outside, DO NOT bind a + * reference to a simple bool! That's not thread-safe. Instead, bind + * an LLCond variant, e.g. LLOneShotCond or LLBoolCond. + */ + template + void postEvery(const std::chrono::duration& interval, + CALLABLE&& callable); + + /*------------------------- handshake API --------------------------*/ + + /** + * Post work to another WorkQueue to be run at a specified time, + * requesting a specific callback to be run on this WorkQueue on + * completion. + * + * Returns true if able to post, false if the other WorkQueue is + * inaccessible. + */ + template + bool postTo(std::weak_ptr target, + const TimePoint& time, CALLABLE&& callable, CALLBACK&& callback) + { + // We're being asked to post to the WorkQueue at target. + // target is a weak_ptr: have to lock it to check it. + auto tptr = target.lock(); + if (! tptr) + // can't post() if the target WorkQueue has been destroyed + return false; + + // Here we believe target WorkQueue still exists. Post to it a + // lambda that packages our callable, our callback and a weak_ptr + // to this originating WorkQueue. + tptr->post( + time, + [reply = super::getWeak(), + callable = std::move(callable), + callback = std::move(callback)] + () + { + // Call the callable in any case -- but to minimize + // copying the result, immediately bind it into a reply + // lambda. The reply lambda also binds the original + // callback, so that when we, the originating WorkQueue, + // finally receive and process the reply lambda, we'll + // call the bound callback with the bound result -- on the + // same thread that originally called postTo(). + auto rlambda = + [result = callable(), + callback = std::move(callback)] + () + { callback(std::move(result)); }; + // Check if this originating WorkQueue still exists. + // Remember, the outer lambda is now running on a thread + // servicing the target WorkQueue, and real time has + // elapsed since postTo()'s tptr->post() call. + // reply is a weak_ptr: have to lock it to check it. + auto rptr = reply.lock(); + if (rptr) + { + // Only post reply lambda if the originating WorkQueue + // still exists. If not -- who would we tell? Log it? + try + { + rptr->post(std::move(rlambda)); + } + catch (const Closed&) + { + // Originating WorkQueue might still exist, but + // might be Closed. Same thing: just discard the + // callback. + } + } + }); + // looks like we were able to post() + return true; + } + + /** + * Post work to another WorkQueue, requesting a specific callback to + * be run on this WorkQueue on completion. + * + * Returns true if able to post, false if the other WorkQueue is + * inaccessible. + */ + template + bool postTo(std::weak_ptr target, + CALLABLE&& callable, CALLBACK&& callback) + { + return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback)); + } + + /*--------------------------- worker API ---------------------------*/ + + /** + * runUntilClose() pulls TimedWork items off this WorkQueue until the + * queue is closed, at which point it returns. This would be the + * typical entry point for a simple worker thread. + */ + void runUntilClose(); + + /** + * runPending() runs all TimedWork items that are ready to run. It + * returns true if the queue remains open, false if the queue has been + * closed. This could be used by a thread whose primary purpose is to + * serve the queue, but also wants to do other things with its idle time. + */ + bool runPending(); + + /** + * runOne() runs at most one ready TimedWork item -- zero if none are + * ready. It returns true if the queue remains open, false if the + * queue has been closed. + */ + bool runOne(); + + /** + * runFor() runs a subset of ready TimedWork items, until the + * timeslice has been exceeded. It returns true if the queue remains + * open, false if the queue has been closed. This could be used by a + * busy main thread to lend a bounded few CPU cycles to this WorkQueue + * without risking the WorkQueue blowing out the length of any one + * frame. + */ + template + bool runFor(const std::chrono::duration& timeslice) + { + return runUntil(TimePoint::clock::now() + timeslice); + } + + /** + * runUntil() is just like runFor(), only with a specific end time + * instead of a timeslice duration. + */ + bool runUntil(const TimePoint& until); + + private: + static void error(const std::string& msg); + static std::string makeName(const std::string& name); + void callWork(const Queue::DataTuple& work); + void callWork(const Work& work); + Queue mQueue; + }; + + /** + * BackJack is, in effect, a hand-rolled lambda, binding a WorkQueue, a + * CALLABLE that returns bool, a TimePoint and an interval at which to + * relaunch it. As long as the callable continues returning true, BackJack + * keeps resubmitting it to the target WorkQueue. + */ + // Why is BackJack a class and not a lambda? Because, unlike a lambda, a + // class method gets its own 'this' pointer -- which we need to resubmit + // the whole BackJack callable. + template + class WorkQueue::BackJack + { + public: + // bind the desired data + BackJack(std::weak_ptr target, + const WorkQueue::TimePoint& start, + const std::chrono::duration& interval, + CALLABLE&& callable): + mTarget(target), + mStart(start), + mInterval(interval), + mCallable(std::move(callable)) + {} + + // Call by target WorkQueue -- note that although WE require a + // callable returning bool, WorkQueue wants a void callable. We + // consume the bool. + void operator()() + { + // If mCallable() throws an exception, don't catch it here: if it + // throws once, it's likely to throw every time, so it's a waste + // of time to arrange to call it again. + if (mCallable()) + { + // Modify mStart to the new start time we desire. If we simply + // added mInterval to now, we'd get actual timings of + // (mInterval + slop), where 'slop' is the latency between the + // previous mStart and the WorkQueue actually calling us. + // Instead, add mInterval to mStart so that at least we + // register our intent to fire at exact mIntervals. + mStart += mInterval; + + // We're being called at this moment by the target WorkQueue. + // Assume it still exists, rather than checking the result of + // lock(). + // Resubmit the whole *this callable: that's why we're a class + // rather than a lambda. Allow moving *this so we can carry a + // move-only callable; but naturally this statement must be + // the last time we reference this instance, which may become + // moved-from. + try + { + mTarget.lock()->post(mStart, std::move(*this)); + } + catch (const Closed&) + { + // Once this queue is closed, oh well, just stop + } + } + } + + private: + std::weak_ptr mTarget; + WorkQueue::TimePoint mStart; + std::chrono::duration mInterval; + CALLABLE mCallable; + }; + + template + void WorkQueue::postEvery(const std::chrono::duration& interval, + CALLABLE&& callable) + { + if (interval.count() <= 0) + { + // It's essential that postEvery() be called with a positive + // interval, since each call to BackJack posts another instance of + // itself at (start + interval) and we order by target time. A + // zero or negative interval would result in that BackJack + // instance going to the head of the queue every time, immediately + // ready to run. Effectively that would produce an infinite loop, + // a denial of service on this WorkQueue. + error("postEvery(interval) may not be 0"); + } + // Instantiate and post a suitable BackJack, binding a weak_ptr to + // self, the current time, the desired interval and the desired + // callable. + post( + BackJack( + getWeak(), TimePoint::clock::now(), interval, std::move(callable))); + } + +} // namespace LL + +#endif /* ! defined(LL_WORKQUEUE_H) */ -- cgit v1.3 From c585ddb75e383cdd994d0d99fed8f2de8f955e3c Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 7 Oct 2021 16:45:15 -0400 Subject: SL-16024: Defend against two threads making "anonymous" WorkQueues. Also make workqueue_test.cpp more robust. --- indra/llcommon/tests/workqueue_test.cpp | 11 ++++++----- indra/llcommon/workqueue.cpp | 18 ++++++++++++++++-- 2 files changed, 22 insertions(+), 7 deletions(-) (limited to 'indra/llcommon/tests') diff --git a/indra/llcommon/tests/workqueue_test.cpp b/indra/llcommon/tests/workqueue_test.cpp index ab1cae6c14..d5405400fd 100644 --- a/indra/llcommon/tests/workqueue_test.cpp +++ b/indra/llcommon/tests/workqueue_test.cpp @@ -103,12 +103,13 @@ namespace tut Shared result = data.get(); ensure_equals("called wrong number of times", result.size(), 3); // postEvery() assumes you want the first call to happen right away. - // Inject a fake start time that's (interval) earlier than that, to - // make our too early/too late tests uniform for all entries. - result.push_front(start - interval); - for (size_t i = 1; i < result.size(); ++i) + // Pretend our start time was (interval) earlier than that, to make + // our too early/too late tests uniform for all entries. + start -= interval; + for (size_t i = 0; i < result.size(); ++i) { - auto diff = (result[i] - result[i-1]); + auto diff = result[i] - start; + start += interval; try { ensure(STRINGIZE("call " << i << " too soon"), diff >= interval); diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index 15e292fb43..ffc9a97dc0 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -17,10 +17,15 @@ // std headers // external library headers // other Linden headers +#include "llcoros.h" +#include LLCOROS_MUTEX_HEADER #include "llerror.h" #include "llexception.h" #include "stringize.h" +using Mutex = LLCoros::Mutex; +using Lock = LLCoros::LockType; + LL::WorkQueue::WorkQueue(const std::string& name): super(makeName(name)) { @@ -83,8 +88,17 @@ std::string LL::WorkQueue::makeName(const std::string& name) if (! name.empty()) return name; - static thread_local U32 discriminator = 0; - return STRINGIZE("WorkQueue" << discriminator++); + static U32 discriminator = 0; + static Mutex mutex; + U32 num; + { + // Protect discriminator from concurrent access by different threads. + // It can't be thread_local, else two racing threads will come up with + // the same name. + Lock lk(mutex); + num = discriminator++; + } + return STRINGIZE("WorkQueue" << num); } void LL::WorkQueue::callWork(const Queue::DataTuple& work) -- cgit v1.3