From 7af4a49835880fc92461882d5354c0ff12a9672a Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 23 Mar 2017 22:39:31 -0400 Subject: MAINT-6789: Add LLEventBatch, LLEventThrottle, LLEventBatchThrottle. These classes are as yet untested: they are straw people for API review, based on email conversations with Caladbolg and Rider. --- indra/llcommon/lleventfilter.cpp | 115 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) (limited to 'indra/llcommon/lleventfilter.cpp') diff --git a/indra/llcommon/lleventfilter.cpp b/indra/llcommon/lleventfilter.cpp index 64ab58adcd..1fd29ea9e5 100644 --- a/indra/llcommon/lleventfilter.cpp +++ b/indra/llcommon/lleventfilter.cpp @@ -148,6 +148,11 @@ bool LLEventTimeoutBase::tick(const LLSD&) return false; // show event to other listeners } +bool LLEventTimeoutBase::running() const +{ + return mMainloop.connected(); +} + LLEventTimeout::LLEventTimeout() {} LLEventTimeout::LLEventTimeout(LLEventPump& source): @@ -164,3 +169,113 @@ bool LLEventTimeout::countdownElapsed() const { return mTimer.hasExpired(); } + +LLEventBatch::LLEventBatch(std::size_t size): + LLEventFilter("batch"), + mBatchSize(size) +{} + +LLEventBatch::LLEventBatch(LLEventPump& source, std::size_t size): + LLEventFilter(source, "batch"), + mBatchSize(size) +{} + +void LLEventBatch::flush() +{ + // copy and clear mBatch BEFORE posting to avoid weird circularity effects + LLSD batch(mBatch); + mBatch.clear(); + LLEventStream::post(batch); +} + +bool LLEventBatch::post(const LLSD& event) +{ + mBatch.append(event); + if (mBatch.size() >= mBatchSize) + { + flush(); + } + return false; +} + +LLEventThrottle::LLEventThrottle(F32 interval): + LLEventFilter("throttle"), + mInterval(interval), + mPosts(0) +{} + +LLEventThrottle::LLEventThrottle(LLEventPump& source, F32 interval): + LLEventFilter(source, "throttle"), + mInterval(interval), + mPosts(0) +{} + +void LLEventThrottle::flush() +{ + // flush() is a no-op unless there's something pending. + // Don't test mPending because there's no requirement that the consumer + // post() anything but an isUndefined(). This is what mPosts is for. + if (mPosts) + { + mPosts = 0; + mAlarm.cancel(); + // This is not to set our alarm; we are not yet requesting + // any notification. This is just to track whether subsequent post() + // calls fall within this mInterval or not. + mTimer.setTimerExpirySec(mInterval); + // copy and clear mPending BEFORE posting to avoid weird circularity + // effects + LLSD pending = mPending; + mPending.clear(); + LLEventStream::post(pending); + } +} + +LLSD LLEventThrottle::pending() const +{ + return mPending; +} + +bool LLEventThrottle::post(const LLSD& event) +{ + // Always capture most recent post() event data. If caller wants to + // aggregate multiple events, let them retrieve pending() and modify + // before calling post(). + mPending = event; + // Always increment mPosts. Unless we count this call, flush() does + // nothing. + ++mPosts; + // We reset mTimer on every flush() call to let us know if we're still + // within the same mInterval. So -- are we? + F32 timeRemaining = mTimer.getRemainingTimeF32(); + if (! timeRemaining) + { + // more than enough time has elapsed, immediately flush() + flush(); + } + else + { + // still within mInterval of the last flush() call: have to defer + if (! mAlarm.running()) + { + // timeRemaining tells us how much longer it will be until + // mInterval seconds since the last flush() call. At that time, + // flush() deferred events. + mAlarm.actionAfter(timeRemaining, boost::bind(&LLEventThrottle::flush, this)); + } + } +} + +LLEventBatchThrottle::LLEventBatchThrottle(F32 interval): + LLEventThrottle(interval) +{} + +LLEventBatchThrottle::LLEventBatchThrottle(LLEventPump& source, F32 interval): + LLEventThrottle(source, interval) +{} + +bool LLEventBatchThrottle::post(const LLSD& event) +{ + // simply retrieve pending value and append the new event to it + return LLEventThrottle::post(pending().append(event)); +} -- cgit v1.2.3 From 2fe4f6b9187d9153e335bf54ea43fc89a7987459 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Fri, 24 Mar 2017 18:25:16 -0400 Subject: MAINT-6789: Add LLEventThrottle::getInterval(), setInterval() plus LLEventBatch::getSize(), setSize() plus LLEventThrottle::getPostCount() and getDelay(). The interesting thing about LLEventThrottle::setInterval() and LLEventBatch::setSize() is that either might cause an immediate flush(). --- indra/llcommon/lleventfilter.cpp | 49 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) (limited to 'indra/llcommon/lleventfilter.cpp') diff --git a/indra/llcommon/lleventfilter.cpp b/indra/llcommon/lleventfilter.cpp index 1fd29ea9e5..87eb583cb6 100644 --- a/indra/llcommon/lleventfilter.cpp +++ b/indra/llcommon/lleventfilter.cpp @@ -198,6 +198,16 @@ bool LLEventBatch::post(const LLSD& event) return false; } +void LLEventBatch::setSize(std::size_t size) +{ + mBatchSize = size; + // changing the size might mean that we have to flush NOW + if (mBatch.size() >= mBatchSize) + { + flush(); + } +} + LLEventThrottle::LLEventThrottle(F32 interval): LLEventFilter("throttle"), mInterval(interval), @@ -264,6 +274,45 @@ bool LLEventThrottle::post(const LLSD& event) mAlarm.actionAfter(timeRemaining, boost::bind(&LLEventThrottle::flush, this)); } } + return false; +} + +void LLEventThrottle::setInterval(F32 interval) +{ + F32 oldInterval = mInterval; + mInterval = interval; + // If we are not now within oldInterval of the last flush(), we're done: + // this will only affect behavior starting with the next flush(). + F32 timeRemaining = mTimer.getRemainingTimeF32(); + if (timeRemaining) + { + // We are currently within oldInterval of the last flush(). Figure out + // how much time remains until (the new) mInterval of the last + // flush(). Bt we don't actually store a timestamp for the last + // flush(); it's implicit. There are timeRemaining seconds until what + // used to be the end of the interval. Move that endpoint by the + // difference between the new interval and the old. + timeRemaining += (mInterval - oldInterval); + // If we're called with a larger interval, the difference is positive + // and timeRemaining increases. + // If we're called with a smaller interval, the difference is negative + // and timeRemaining decreases. The interesting case is when it goes + // nonpositive: when the new interval means we can flush immediately. + if (timeRemaining <= 0.0f) + { + flush(); + } + else + { + // immediately reset mTimer + mTimer.setTimerExpirySec(timeRemaining); + // and if mAlarm is running, reset that too + if (mAlarm.running()) + { + mAlarm.actionAfter(timeRemaining, boost::bind(&LLEventThrottle::flush, this)); + } + } + } } LLEventBatchThrottle::LLEventBatchThrottle(F32 interval): -- cgit v1.2.3 From 9c66072cacae0b3b86a277780e3a19e94accd6bc Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Wed, 10 May 2017 15:04:18 -0400 Subject: Add LLEventThrottle tests; actually *all* lleventfilter.cpp tests. For some reason there wasn't an entry in indra/llcommon/CMakeLists.txt to run the tests in indra/llcommon/tests/lleventfilter_test.cpp. It seems likely that at some point it existed, since all previous tests built and ran successfully. In any case, (re-)add lleventfilter_test.cpp to the set of llcommon tests. Also alphabetize them to make it easier to find a particular test invocation. Also add new tests for LLEventThrottle. To support this, refactor the concrete LLEventThrottle class into LLEventThrottleBase containing all the tricky logic, with pure virtual methods for access to LLTimer and LLEventTimeout, and an LLEventThrottle subclass containing the LLTimer and LLEventTimeout instances and corresponding implementations of the new pure virtual methods. That permits us to introduce TestEventThrottle, an alternate subclass with dummy implementations of the methods related to LLTimer and LLEventTimeout. In particular, we can explicitly advance simulated realtime to simulate particular LLTimer and LLEventTimeout behaviors. Finally, introduce Concat, a test LLEventPump listener class whose function is to concatenate received string event data into a composite string so we can readily test for particular sequences of events. --- indra/llcommon/lleventfilter.cpp | 96 +++++++++++++++++++++++++++++++++------- 1 file changed, 80 insertions(+), 16 deletions(-) (limited to 'indra/llcommon/lleventfilter.cpp') diff --git a/indra/llcommon/lleventfilter.cpp b/indra/llcommon/lleventfilter.cpp index 87eb583cb6..ddd7d5547a 100644 --- a/indra/llcommon/lleventfilter.cpp +++ b/indra/llcommon/lleventfilter.cpp @@ -38,12 +38,18 @@ #include "llerror.h" // LL_ERRS #include "llsdutil.h" // llsd_matches() +/***************************************************************************** +* LLEventFilter +*****************************************************************************/ LLEventFilter::LLEventFilter(LLEventPump& source, const std::string& name, bool tweak): LLEventStream(name, tweak), mSource(source.listen(getName(), boost::bind(&LLEventFilter::post, this, _1))) { } +/***************************************************************************** +* LLEventMatching +*****************************************************************************/ LLEventMatching::LLEventMatching(const LLSD& pattern): LLEventFilter("matching"), mPattern(pattern) @@ -64,6 +70,9 @@ bool LLEventMatching::post(const LLSD& event) return LLEventStream::post(event); } +/***************************************************************************** +* LLEventTimeoutBase +*****************************************************************************/ LLEventTimeoutBase::LLEventTimeoutBase(): LLEventFilter("timeout") { @@ -153,6 +162,9 @@ bool LLEventTimeoutBase::running() const return mMainloop.connected(); } +/***************************************************************************** +* LLEventTimeout +*****************************************************************************/ LLEventTimeout::LLEventTimeout() {} LLEventTimeout::LLEventTimeout(LLEventPump& source): @@ -170,6 +182,9 @@ bool LLEventTimeout::countdownElapsed() const return mTimer.hasExpired(); } +/***************************************************************************** +* LLEventBatch +*****************************************************************************/ LLEventBatch::LLEventBatch(std::size_t size): LLEventFilter("batch"), mBatchSize(size) @@ -208,19 +223,22 @@ void LLEventBatch::setSize(std::size_t size) } } -LLEventThrottle::LLEventThrottle(F32 interval): +/***************************************************************************** +* LLEventThrottleBase +*****************************************************************************/ +LLEventThrottleBase::LLEventThrottleBase(F32 interval): LLEventFilter("throttle"), mInterval(interval), mPosts(0) {} -LLEventThrottle::LLEventThrottle(LLEventPump& source, F32 interval): +LLEventThrottleBase::LLEventThrottleBase(LLEventPump& source, F32 interval): LLEventFilter(source, "throttle"), mInterval(interval), mPosts(0) {} -void LLEventThrottle::flush() +void LLEventThrottleBase::flush() { // flush() is a no-op unless there's something pending. // Don't test mPending because there's no requirement that the consumer @@ -228,11 +246,11 @@ void LLEventThrottle::flush() if (mPosts) { mPosts = 0; - mAlarm.cancel(); + alarmCancel(); // This is not to set our alarm; we are not yet requesting // any notification. This is just to track whether subsequent post() // calls fall within this mInterval or not. - mTimer.setTimerExpirySec(mInterval); + timerSet(mInterval); // copy and clear mPending BEFORE posting to avoid weird circularity // effects LLSD pending = mPending; @@ -241,12 +259,12 @@ void LLEventThrottle::flush() } } -LLSD LLEventThrottle::pending() const +LLSD LLEventThrottleBase::pending() const { return mPending; } -bool LLEventThrottle::post(const LLSD& event) +bool LLEventThrottleBase::post(const LLSD& event) { // Always capture most recent post() event data. If caller wants to // aggregate multiple events, let them retrieve pending() and modify @@ -257,7 +275,7 @@ bool LLEventThrottle::post(const LLSD& event) ++mPosts; // We reset mTimer on every flush() call to let us know if we're still // within the same mInterval. So -- are we? - F32 timeRemaining = mTimer.getRemainingTimeF32(); + F32 timeRemaining = timerGetRemaining(); if (! timeRemaining) { // more than enough time has elapsed, immediately flush() @@ -266,24 +284,24 @@ bool LLEventThrottle::post(const LLSD& event) else { // still within mInterval of the last flush() call: have to defer - if (! mAlarm.running()) + if (! alarmRunning()) { // timeRemaining tells us how much longer it will be until // mInterval seconds since the last flush() call. At that time, // flush() deferred events. - mAlarm.actionAfter(timeRemaining, boost::bind(&LLEventThrottle::flush, this)); + alarmActionAfter(timeRemaining, boost::bind(&LLEventThrottleBase::flush, this)); } } return false; } -void LLEventThrottle::setInterval(F32 interval) +void LLEventThrottleBase::setInterval(F32 interval) { F32 oldInterval = mInterval; mInterval = interval; // If we are not now within oldInterval of the last flush(), we're done: // this will only affect behavior starting with the next flush(). - F32 timeRemaining = mTimer.getRemainingTimeF32(); + F32 timeRemaining = timerGetRemaining(); if (timeRemaining) { // We are currently within oldInterval of the last flush(). Figure out @@ -305,16 +323,60 @@ void LLEventThrottle::setInterval(F32 interval) else { // immediately reset mTimer - mTimer.setTimerExpirySec(timeRemaining); + timerSet(timeRemaining); // and if mAlarm is running, reset that too - if (mAlarm.running()) + if (alarmRunning()) { - mAlarm.actionAfter(timeRemaining, boost::bind(&LLEventThrottle::flush, this)); + alarmActionAfter(timeRemaining, boost::bind(&LLEventThrottleBase::flush, this)); } } } } +F32 LLEventThrottleBase::getDelay() const +{ + return timerGetRemaining(); +} + +/***************************************************************************** +* LLEventThrottle implementation +*****************************************************************************/ +LLEventThrottle::LLEventThrottle(F32 interval): + LLEventThrottleBase(interval) +{} + +LLEventThrottle::LLEventThrottle(LLEventPump& source, F32 interval): + LLEventThrottleBase(source, interval) +{} + +void LLEventThrottle::alarmActionAfter(F32 interval, const LLEventTimeoutBase::Action& action) +{ + mAlarm.actionAfter(interval, action); +} + +bool LLEventThrottle::alarmRunning() const +{ + return mAlarm.running(); +} + +void LLEventThrottle::alarmCancel() +{ + return mAlarm.cancel(); +} + +void LLEventThrottle::timerSet(F32 interval) +{ + mTimer.setTimerExpirySec(interval); +} + +F32 LLEventThrottle::timerGetRemaining() const +{ + return mTimer.getRemainingTimeF32(); +} + +/***************************************************************************** +* LLEventBatchThrottle +*****************************************************************************/ LLEventBatchThrottle::LLEventBatchThrottle(F32 interval): LLEventThrottle(interval) {} @@ -326,5 +388,7 @@ LLEventBatchThrottle::LLEventBatchThrottle(LLEventPump& source, F32 interval): bool LLEventBatchThrottle::post(const LLSD& event) { // simply retrieve pending value and append the new event to it - return LLEventThrottle::post(pending().append(event)); + LLSD partial = pending(); + partial.append(event); + return LLEventThrottle::post(partial); } -- cgit v1.2.3 From 4d87ded8862f032682a66704b9c68ef5626779ea Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Wed, 10 May 2017 17:37:06 -0400 Subject: Add size limit to LLEventBatchThrottle like LLEventBatch. The new behavior is that it will flush when either the pending batch has grown to the specified size, or the time interval has expired. --- indra/llcommon/lleventfilter.cpp | 35 ++++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) (limited to 'indra/llcommon/lleventfilter.cpp') diff --git a/indra/llcommon/lleventfilter.cpp b/indra/llcommon/lleventfilter.cpp index ddd7d5547a..9fb18dc67d 100644 --- a/indra/llcommon/lleventfilter.cpp +++ b/indra/llcommon/lleventfilter.cpp @@ -206,10 +206,8 @@ void LLEventBatch::flush() bool LLEventBatch::post(const LLSD& event) { mBatch.append(event); - if (mBatch.size() >= mBatchSize) - { - flush(); - } + // calling setSize(same) performs the very check we want + setSize(mBatchSize); return false; } @@ -377,12 +375,14 @@ F32 LLEventThrottle::timerGetRemaining() const /***************************************************************************** * LLEventBatchThrottle *****************************************************************************/ -LLEventBatchThrottle::LLEventBatchThrottle(F32 interval): - LLEventThrottle(interval) +LLEventBatchThrottle::LLEventBatchThrottle(F32 interval, std::size_t size): + LLEventThrottle(interval), + mBatchSize(size) {} -LLEventBatchThrottle::LLEventBatchThrottle(LLEventPump& source, F32 interval): - LLEventThrottle(source, interval) +LLEventBatchThrottle::LLEventBatchThrottle(LLEventPump& source, F32 interval, std::size_t size): + LLEventThrottle(source, interval), + mBatchSize(size) {} bool LLEventBatchThrottle::post(const LLSD& event) @@ -390,5 +390,22 @@ bool LLEventBatchThrottle::post(const LLSD& event) // simply retrieve pending value and append the new event to it LLSD partial = pending(); partial.append(event); - return LLEventThrottle::post(partial); + bool ret = LLEventThrottle::post(partial); + // The post() call above MIGHT have called flush() already. If it did, + // then pending() was reset to empty. If it did not, though, but the batch + // size has grown to the limit, flush() anyway. If there's a limit at all, + // of course. Calling setSize(same) performs the very check we want. + setSize(mBatchSize); + return ret; +} + +void LLEventBatchThrottle::setSize(std::size_t size) +{ + mBatchSize = size; + // Changing the size might mean that we have to flush NOW. Don't forget + // that 0 means unlimited. + if (mBatchSize && pending().size() >= mBatchSize) + { + flush(); + } } -- cgit v1.2.3