summaryrefslogtreecommitdiff
path: root/indra/llcommon/workqueue.h
diff options
context:
space:
mode:
authorNat Goodspeed <nat@lindenlab.com>2021-11-24 10:47:54 -0500
committerNat Goodspeed <nat@lindenlab.com>2021-11-24 10:47:54 -0500
commit0b066539fe68dc5750900c3452189645c40adb45 (patch)
treefd01b6eaf174f7744fd82a28b0b948d301fd4d2b /indra/llcommon/workqueue.h
parent78d837789a3741c65c3334934d96a505a522ee43 (diff)
DRTVWR-546, SL-16220, SL-16094: Undo previous glthread branch revert.
Reverting a merge is sticky: it tells git you never want to see that branch again. Merging the DRTVWR-546 branch, which contained the revert, into the glthread branch undid much of the development work on that branch. To restore it we must revert the revert. This reverts commit 029b41c0419e975bbb28454538b46dc69ce5d2ba.
Diffstat (limited to 'indra/llcommon/workqueue.h')
-rw-r--r--indra/llcommon/workqueue.h197
1 files changed, 126 insertions, 71 deletions
diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h
index 8e4b38c2f3..96574a18b9 100644
--- a/indra/llcommon/workqueue.h
+++ b/indra/llcommon/workqueue.h
@@ -12,14 +12,14 @@
#if ! defined(LL_WORKQUEUE_H)
#define LL_WORKQUEUE_H
+#include "llcoros.h"
+#include "llexception.h"
#include "llinstancetracker.h"
#include "threadsafeschedule.h"
#include <chrono>
+#include <exception> // std::current_exception
#include <functional> // std::function
-#include <queue>
#include <string>
-#include <utility> // std::pair
-#include <vector>
namespace LL
{
@@ -45,11 +45,16 @@ namespace LL
using TimedWork = Queue::TimeTuple;
using Closed = Queue::Closed;
+ struct Error: public LLException
+ {
+ Error(const std::string& what): LLException(what) {}
+ };
+
/**
* You may omit the WorkQueue name, in which case a unique name is
* synthesized; for practical purposes that makes it anonymous.
*/
- WorkQueue(const std::string& name = std::string());
+ WorkQueue(const std::string& name = std::string(), size_t capacity=1024);
/**
* Since the point of WorkQueue is to pass work to some other worker
@@ -59,15 +64,36 @@ namespace LL
*/
void close();
+ /**
+ * WorkQueue supports multiple producers and multiple consumers. In
+ * the general case it's misleading to test size(), since any other
+ * thread might change it the nanosecond the lock is released. On that
+ * basis, some might argue against publishing a size() method at all.
+ *
+ * But there are two specific cases in which a test based on size()
+ * might be reasonable:
+ *
+ * * If you're the only producer, noticing that size() == 0 is
+ * meaningful.
+ * * If you're the only consumer, noticing that size() > 0 is
+ * meaningful.
+ */
+ size_t size();
+ /// producer end: are we prevented from pushing any additional items?
+ bool isClosed();
+ /// consumer end: are we done, is the queue entirely drained?
+ bool done();
+
/*---------------------- fire and forget API -----------------------*/
/// fire-and-forget, but at a particular (future?) time
template <typename CALLABLE>
void post(const TimePoint& time, CALLABLE&& callable)
{
- // Defer reifying an arbitrary CALLABLE until we hit this method.
- // All other methods should accept CALLABLEs of arbitrary type to
- // avoid multiple levels of std::function indirection.
+ // Defer reifying an arbitrary CALLABLE until we hit this or
+ // postIfOpen(). All other methods should accept CALLABLEs of
+ // arbitrary type to avoid multiple levels of std::function
+ // indirection.
mQueue.push(TimedWork(time, std::move(callable)));
}
@@ -83,6 +109,47 @@ namespace LL
}
/**
+ * post work for a particular time, unless the queue is closed before
+ * we can post
+ */
+ template <typename CALLABLE>
+ bool postIfOpen(const TimePoint& time, CALLABLE&& callable)
+ {
+ // Defer reifying an arbitrary CALLABLE until we hit this or
+ // post(). All other methods should accept CALLABLEs of arbitrary
+ // type to avoid multiple levels of std::function indirection.
+ return mQueue.pushIfOpen(TimedWork(time, std::move(callable)));
+ }
+
+ /**
+ * post work, unless the queue is closed before we can post
+ */
+ template <typename CALLABLE>
+ bool postIfOpen(CALLABLE&& callable)
+ {
+ return postIfOpen(TimePoint::clock::now(), std::move(callable));
+ }
+
+ /**
+ * Post work to be run at a specified time to another WorkQueue, which
+ * may or may not still exist and be open. Return true if we were able
+ * to post.
+ */
+ template <typename CALLABLE>
+ static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable);
+
+ /**
+ * Post work to another WorkQueue, which may or may not still exist
+ * and be open. Return true if we were able to post.
+ */
+ template <typename CALLABLE>
+ static bool postMaybe(weak_t target, CALLABLE&& callable)
+ {
+ return postMaybe(target, TimePoint::clock::now(),
+ std::forward<CALLABLE>(callable));
+ }
+
+ /**
* Launch a callable returning bool that will trigger repeatedly at
* specified interval, until the callable returns false.
*
@@ -115,63 +182,8 @@ namespace LL
// Studio compile errors that seem utterly unrelated to this source
// code.
template <typename CALLABLE, typename FOLLOWUP>
- bool postTo(WorkQueue::weak_t target,
- const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback)
- {
- // We're being asked to post to the WorkQueue at target.
- // target is a weak_ptr: have to lock it to check it.
- auto tptr = target.lock();
- if (! tptr)
- // can't post() if the target WorkQueue has been destroyed
- return false;
-
- // Here we believe target WorkQueue still exists. Post to it a
- // lambda that packages our callable, our callback and a weak_ptr
- // to this originating WorkQueue.
- tptr->post(
- time,
- [reply = super::getWeak(),
- callable = std::move(callable),
- callback = std::move(callback)]
- ()
- {
- // Call the callable in any case -- but to minimize
- // copying the result, immediately bind it into a reply
- // lambda. The reply lambda also binds the original
- // callback, so that when we, the originating WorkQueue,
- // finally receive and process the reply lambda, we'll
- // call the bound callback with the bound result -- on the
- // same thread that originally called postTo().
- auto rlambda =
- [result = callable(),
- callback = std::move(callback)]
- ()
- { callback(std::move(result)); };
- // Check if this originating WorkQueue still exists.
- // Remember, the outer lambda is now running on a thread
- // servicing the target WorkQueue, and real time has
- // elapsed since postTo()'s tptr->post() call.
- // reply is a weak_ptr: have to lock it to check it.
- auto rptr = reply.lock();
- if (rptr)
- {
- // Only post reply lambda if the originating WorkQueue
- // still exists. If not -- who would we tell? Log it?
- try
- {
- rptr->post(std::move(rlambda));
- }
- catch (const Closed&)
- {
- // Originating WorkQueue might still exist, but
- // might be Closed. Same thing: just discard the
- // callback.
- }
- }
- });
- // looks like we were able to post()
- return true;
- }
+ bool postTo(weak_t target,
+ const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback);
/**
* Post work to another WorkQueue, requesting a specific callback to
@@ -181,10 +193,36 @@ namespace LL
* inaccessible.
*/
template <typename CALLABLE, typename FOLLOWUP>
- bool postTo(WorkQueue::weak_t target,
- CALLABLE&& callable, FOLLOWUP&& callback)
+ bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback)
+ {
+ return postTo(target, TimePoint::clock::now(),
+ std::move(callable), std::move(callback));
+ }
+
+ /**
+ * Post work to another WorkQueue to be run at a specified time,
+ * blocking the calling coroutine until then, returning the result to
+ * caller on completion.
+ *
+ * In general, we assume that each thread's default coroutine is busy
+ * servicing its WorkQueue or whatever. To try to prevent mistakes, we
+ * forbid calling waitForResult() from a thread's default coroutine.
+ */
+ template <typename CALLABLE>
+ auto waitForResult(const TimePoint& time, CALLABLE&& callable);
+
+ /**
+ * Post work to another WorkQueue, blocking the calling coroutine
+ * until then, returning the result to caller on completion.
+ *
+ * In general, we assume that each thread's default coroutine is busy
+ * servicing its WorkQueue or whatever. To try to prevent mistakes, we
+ * forbid calling waitForResult() from a thread's default coroutine.
+ */
+ template <typename CALLABLE>
+ auto waitForResult(CALLABLE&& callable)
{
- return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback));
+ return waitForResult(TimePoint::clock::now(), std::move(callable));
}
/*--------------------------- worker API ---------------------------*/
@@ -233,6 +271,23 @@ namespace LL
bool runUntil(const TimePoint& until);
private:
+ template <typename CALLABLE, typename FOLLOWUP>
+ static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback);
+ /// general case: arbitrary C++ return type
+ template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE>
+ struct MakeReplyLambda;
+ /// specialize for CALLABLE returning void
+ template <typename CALLABLE, typename FOLLOWUP>
+ struct MakeReplyLambda<CALLABLE, FOLLOWUP, void>;
+
+ /// general case: arbitrary C++ return type
+ template <typename CALLABLE, typename RETURNTYPE>
+ struct WaitForResult;
+ /// specialize for CALLABLE returning void
+ template <typename CALLABLE>
+ struct WaitForResult<CALLABLE, void>;
+
+ static void checkCoroutine(const std::string& method);
static void error(const std::string& msg);
static std::string makeName(const std::string& name);
void callWork(const Queue::DataTuple& work);
@@ -254,8 +309,8 @@ namespace LL
{
public:
// bind the desired data
- BackJack(WorkQueue::weak_t target,
- const WorkQueue::TimePoint& start,
+ BackJack(weak_t target,
+ const TimePoint& start,
const std::chrono::duration<Rep, Period>& interval,
CALLABLE&& callable):
mTarget(target),
@@ -302,8 +357,8 @@ namespace LL
}
private:
- WorkQueue::weak_t mTarget;
- WorkQueue::TimePoint mStart;
+ weak_t mTarget;
+ TimePoint mStart;
std::chrono::duration<Rep, Period> mInterval;
CALLABLE mCallable;
};