From 61b5f3143e4ea53c9f64e5a1a5ad19f2edf3e776 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 5 Jan 2012 15:43:23 -0500 Subject: Introduce LLStreamQueue to buffer nonblocking I/O. Add unit tests to verify basic functionality. --- indra/llcommon/llstreamqueue.h | 229 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 229 insertions(+) create mode 100644 indra/llcommon/llstreamqueue.h (limited to 'indra/llcommon/llstreamqueue.h') diff --git a/indra/llcommon/llstreamqueue.h b/indra/llcommon/llstreamqueue.h new file mode 100644 index 0000000000..2fbc2067d2 --- /dev/null +++ b/indra/llcommon/llstreamqueue.h @@ -0,0 +1,229 @@ +/** + * @file llstreamqueue.h + * @author Nat Goodspeed + * @date 2012-01-04 + * @brief Definition of LLStreamQueue + * + * $LicenseInfo:firstyear=2012&license=viewerlgpl$ + * Copyright (c) 2012, Linden Research, Inc. + * $/LicenseInfo$ + */ + +#if ! defined(LL_LLSTREAMQUEUE_H) +#define LL_LLSTREAMQUEUE_H + +#include +#include +#include // std::streamsize +#include + +/** + * This class is a growable buffer between a producer and consumer. It serves + * as a queue usable with Boost.Iostreams -- hence, a "stream queue." + * + * This is especially useful for buffering nonblocking I/O. For instance, we + * want application logic to be able to serialize LLSD to a std::ostream. We + * may write more data than the destination pipe can handle all at once, but + * it's imperative NOT to block the application-level serialization call. So + * we buffer it instead. Successive frames can try nonblocking writes to the + * destination pipe until all buffered data has been sent. + * + * Similarly, we want application logic be able to deserialize LLSD from a + * std::istream. Again, we must not block that deserialize call waiting for + * more data to arrive from the input pipe! Instead we build up a buffer over + * a number of frames, using successive nonblocking reads, until we have + * "enough" data to be able to present it through a std::istream. + * + * @note The use cases for this class overlap somewhat with those for the + * LLIOPipe/LLPumpIO hierarchies, and indeed we considered using those. This + * class has two virtues over the older machinery: + * + * # It's vastly simpler -- way fewer concepts. It's not clear to me whether + * there were ever LLIOPipe/etc. use cases that demanded all the fanciness + * rolled in, or whether they were simply overdesigned. In any case, no + * remaining Lindens will admit to familiarity with those classes -- and + * they're sufficiently obtuse that it would take considerable learning + * curve to figure out how to use them properly. The bottom line is that + * current management is not keen on any more engineers climbing that curve. + * # This class is designed around available components such as std::string, + * std::list, Boost.Iostreams. There's less proprietary code. + */ +template +class LLGenericStreamQueue +{ +public: + LLGenericStreamQueue(): + mClosed(false) + {} + + /** + * Boost.Iostreams Source Device facade for use with other Boost.Iostreams + * functionality. LLGenericStreamQueue doesn't quite fit any of the Boost + * 1.48 Iostreams concepts; instead it behaves as both a Sink and a + * Source. This is its Source facade. + */ + struct Source + { + typedef Ch char_type; + typedef boost::iostreams::source_tag category; + + /// Bind the underlying LLGenericStreamQueue + Source(LLGenericStreamQueue& sq): + mStreamQueue(sq) + {} + + // Read up to n characters from the underlying data source into the + // buffer s, returning the number of characters read; return -1 to + // indicate EOF + std::streamsize read(Ch* s, std::streamsize n) + { + return mStreamQueue.read(s, n); + } + + LLGenericStreamQueue& mStreamQueue; + }; + + /** + * Boost.Iostreams Sink Device facade for use with other Boost.Iostreams + * functionality. LLGenericStreamQueue doesn't quite fit any of the Boost + * 1.48 Iostreams concepts; instead it behaves as both a Sink and a + * Source. This is its Sink facade. + */ + struct Sink + { + typedef Ch char_type; + typedef boost::iostreams::sink_tag category; + + /// Bind the underlying LLGenericStreamQueue + Sink(LLGenericStreamQueue& sq): + mStreamQueue(sq) + {} + + /// Write up to n characters from the buffer s to the output sequence, + /// returning the number of characters written + std::streamsize write(const Ch* s, std::streamsize n) + { + return mStreamQueue.write(s, n); + } + + /// Send EOF to consumer + void close() + { + mStreamQueue.close(); + } + + LLGenericStreamQueue& mStreamQueue; + }; + + /// Present Boost.Iostreams Source facade + Source asSource() { return Source(*this); } + /// Present Boost.Iostreams Sink facade + Sink asSink() { return Sink(*this); } + + /// append data to buffer + std::streamsize write(const Ch* s, std::streamsize n) + { + // Unclear how often we might be asked to write 0 bytes -- perhaps a + // naive caller responding to an unready nonblocking read. But if we + // do get such a call, don't add a completely empty BufferList entry. + if (n == 0) + return n; + // We could implement this using a single std::string object, a la + // ostringstream. But the trouble with appending to a string is that + // you might have to recopy all previous contents to grow its size. If + // we want this to scale to large data volumes, better to allocate + // individual pieces. + mBuffer.push_back(string(s, n)); + return n; + } + + /** + * Inform this LLGenericStreamQueue that no further data are forthcoming. + * For our purposes, close() is strictly a producer-side operation; + * there's little point in closing the consumer side. + */ + void close() + { + mClosed = true; + } + + /// consume data from buffer + std::streamsize read(Ch* s, std::streamsize n) + { + // read() is actually a convenience method for peek() followed by + // skip(). + std::streamsize got(peek(s, n)); + // We can only skip() as many characters as we can peek(); ignore + // skip() return here. + skip(n); + return got; + } + + /// Retrieve data from buffer without consuming. Like read(), return -1 on + /// EOF. + std::streamsize peek(Ch* s, std::streamsize n) const; + + /// Consume data from buffer without retrieving. Unlike read() and peek(), + /// at EOF we simply skip 0 characters. + std::streamsize skip(std::streamsize n); + +private: + typedef std::basic_string string; + typedef std::list BufferList; + BufferList mBuffer; + bool mClosed; +}; + +template +std::streamsize LLGenericStreamQueue::peek(Ch* s, std::streamsize n) const +{ + // Here we may have to build up 'n' characters from an arbitrary + // number of individual BufferList entries. + typename BufferList::const_iterator bli(mBuffer.begin()), blend(mBuffer.end()); + // Indicate EOF if producer has closed the pipe AND we've exhausted + // all previously-buffered data. + if (mClosed && bli == blend) + { + return -1; + } + // Here either producer hasn't yet closed, or we haven't yet exhausted + // remaining data. + std::streamsize needed(n), got(0); + // Loop until either we run out of BufferList entries or we've + // completely satisfied the request. + for ( ; bli != blend && needed; ++bli) + { + std::streamsize chunk(std::min(needed, std::streamsize(bli->length()))); + std::copy(bli->begin(), bli->begin() + chunk, s); + needed -= chunk; + s += chunk; + got += chunk; + } + return got; +} + +template +std::streamsize LLGenericStreamQueue::skip(std::streamsize n) +{ + typename BufferList::iterator bli(mBuffer.begin()), blend(mBuffer.end()); + std::streamsize toskip(n), skipped(0); + while (bli != blend && toskip >= bli->length()) + { + std::streamsize chunk(bli->length()); + typename BufferList::iterator zap(bli++); + mBuffer.erase(zap); + toskip -= chunk; + skipped += chunk; + } + if (bli != blend && toskip) + { + bli->erase(bli->begin(), bli->begin() + toskip); + skipped += toskip; + } + return skipped; +} + +typedef LLGenericStreamQueue LLStreamQueue; +typedef LLGenericStreamQueue LLWStreamQueue; + +#endif /* ! defined(LL_LLSTREAMQUEUE_H) */ -- cgit v1.3 From 39a86eda8d6d810bd7f4dd6b96f022548a496ba1 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 12 Jan 2012 12:19:27 -0500 Subject: Add LLStreamQueue::size() and tests to exercise it. --- indra/llcommon/llstreamqueue.h | 11 +++++++++++ indra/llcommon/tests/llstreamqueue_test.cpp | 30 ++++++++++++++++++++++++----- 2 files changed, 36 insertions(+), 5 deletions(-) (limited to 'indra/llcommon/llstreamqueue.h') diff --git a/indra/llcommon/llstreamqueue.h b/indra/llcommon/llstreamqueue.h index 2fbc2067d2..0726bad175 100644 --- a/indra/llcommon/llstreamqueue.h +++ b/indra/llcommon/llstreamqueue.h @@ -53,6 +53,7 @@ class LLGenericStreamQueue { public: LLGenericStreamQueue(): + mSize(0), mClosed(false) {} @@ -134,6 +135,7 @@ public: // we want this to scale to large data volumes, better to allocate // individual pieces. mBuffer.push_back(string(s, n)); + mSize += n; return n; } @@ -167,10 +169,17 @@ public: /// at EOF we simply skip 0 characters. std::streamsize skip(std::streamsize n); + /// How many characters do we currently have buffered? + std::streamsize size() const + { + return mSize; + } + private: typedef std::basic_string string; typedef std::list BufferList; BufferList mBuffer; + std::streamsize mSize; bool mClosed; }; @@ -212,12 +221,14 @@ std::streamsize LLGenericStreamQueue::skip(std::streamsize n) std::streamsize chunk(bli->length()); typename BufferList::iterator zap(bli++); mBuffer.erase(zap); + mSize -= chunk; toskip -= chunk; skipped += chunk; } if (bli != blend && toskip) { bli->erase(bli->begin(), bli->begin() + toskip); + mSize -= toskip; skipped += toskip; } return skipped; diff --git a/indra/llcommon/tests/llstreamqueue_test.cpp b/indra/llcommon/tests/llstreamqueue_test.cpp index e88c37d5be..050ad5c5bf 100644 --- a/indra/llcommon/tests/llstreamqueue_test.cpp +++ b/indra/llcommon/tests/llstreamqueue_test.cpp @@ -50,6 +50,8 @@ namespace tut { set_test_name("empty LLStreamQueue"); ensure_equals("brand-new LLStreamQueue isn't empty", + strq.size(), 0); + ensure_equals("brand-new LLStreamQueue returns data", strq.asSource().read(&buffer[0], buffer.size()), 0); strq.asSink().close(); ensure_equals("closed empty LLStreamQueue not at EOF", @@ -62,18 +64,22 @@ namespace tut set_test_name("one internal block, one buffer"); LLStreamQueue::Sink sink(strq.asSink()); ensure_equals("write(\"\")", sink.write("", 0), 0); - ensure_equals("0 write should leave LLStreamQueue empty", + ensure_equals("0 write should leave LLStreamQueue empty (size())", + strq.size(), 0); + ensure_equals("0 write should leave LLStreamQueue empty (peek())", strq.peek(&buffer[0], buffer.size()), 0); // The meaning of "atomic" is that it must be smaller than our buffer. std::string atomic("atomic"); ensure("test data exceeds buffer", atomic.length() < buffer.size()); ensure_equals(STRINGIZE("write(\"" << atomic << "\")"), sink.write(&atomic[0], atomic.length()), atomic.length()); + ensure_equals("size() after write()", strq.size(), atomic.length()); size_t peeklen(strq.peek(&buffer[0], buffer.size())); ensure_equals(STRINGIZE("peek(\"" << atomic << "\")"), peeklen, atomic.length()); ensure_equals(STRINGIZE("peek(\"" << atomic << "\") result"), std::string(buffer.begin(), buffer.begin() + peeklen), atomic); + ensure_equals("size() after peek()", strq.size(), atomic.length()); // peek() should not consume. Use a different buffer to prove it isn't // just leftover data from the first peek(). std::vector again(buffer.size()); @@ -90,6 +96,7 @@ namespace tut ensure_equals(STRINGIZE("read(\"" << atomic << "\") result"), std::string(third.begin(), third.begin() + readlen), atomic); ensure_equals("peek() after read()", strq.peek(&buffer[0], buffer.size()), 0); + ensure_equals("size() after read()", strq.size(), 0); } template<> template<> @@ -107,6 +114,7 @@ namespace tut std::string(buffer.begin(), buffer.begin() + peeklen), lovecraft); std::streamsize skip1(4); ensure_equals(STRINGIZE("skip(" << skip1 << ")"), strq.skip(skip1), skip1); + ensure_equals("size() after skip()", strq.size(), lovecraft.length() - skip1); size_t readlen(strq.read(&buffer[0], buffer.size())); ensure_equals(STRINGIZE("read(\"" << lovecraft.substr(skip1) << "\")"), readlen, lovecraft.length() - skip1); @@ -121,15 +129,20 @@ namespace tut { set_test_name("skip() multiple blocks"); std::string blocks[] = { "books of ", "H.P. ", "Lovecraft" }; - std::streamsize skip(blocks[0].length() + blocks[1].length() + 4); + std::streamsize total(blocks[0].length() + blocks[1].length() + blocks[2].length()); + std::streamsize leave(5); // len("craft") above + std::streamsize skip(total - leave); + std::streamsize written(0); BOOST_FOREACH(const std::string& block, blocks) { - strq.write(&block[0], block.length()); + written += strq.write(&block[0], block.length()); + ensure_equals("size() after write()", strq.size(), written); } std::streamsize skiplen(strq.skip(skip)); ensure_equals(STRINGIZE("skip(" << skip << ")"), skiplen, skip); + ensure_equals("size() after skip()", strq.size(), leave); size_t readlen(strq.read(&buffer[0], buffer.size())); - ensure_equals("read(\"craft\")", readlen, 5); + ensure_equals("read(\"craft\")", readlen, leave); ensure_equals("read(\"craft\") result", std::string(buffer.begin(), buffer.begin() + readlen), "craft"); } @@ -162,14 +175,21 @@ namespace tut strq.write(&block[0], block.length()); } strq.close(); + // We've already verified what strq.size() should be at this point; + // see above test named "skip() multiple blocks" + std::streamsize chksize(strq.size()); std::streamsize readlen(strq.read(&buffer[0], buffer.size())); ensure_equals("read() 0", readlen, buffer.size()); ensure_equals("read() 0 result", std::string(buffer.begin(), buffer.end()), "abcdefghij"); + chksize -= readlen; + ensure_equals("size() after read() 0", strq.size(), chksize); readlen = strq.read(&buffer[0], buffer.size()); ensure_equals("read() 1", readlen, buffer.size()); ensure_equals("read() 1 result", std::string(buffer.begin(), buffer.end()), "klmnopqrst"); + chksize -= readlen; + ensure_equals("size() after read() 1", strq.size(), chksize); readlen = strq.read(&buffer[0], buffer.size()); - ensure_equals("read() 2", readlen, 6); + ensure_equals("read() 2", readlen, chksize); ensure_equals("read() 2 result", std::string(buffer.begin(), buffer.begin() + readlen), "uvwxyz"); ensure_equals("read() 3", strq.read(&buffer[0], buffer.size()), -1); -- cgit v1.3