From 4331c112aba074562e9a8826fe6d271a94f790f0 Mon Sep 17 00:00:00 2001 From: Dave Parks Date: Fri, 14 Oct 2011 11:52:40 -0500 Subject: Backed out changeset b782a75c99e6 --- indra/llmessage/lliosocket.cpp | 99 +++++++++++++++++++++++++++--------------- 1 file changed, 63 insertions(+), 36 deletions(-) (limited to 'indra/llmessage/lliosocket.cpp') diff --git a/indra/llmessage/lliosocket.cpp b/indra/llmessage/lliosocket.cpp index e802d9b3a6..ca84fa8bb8 100644 --- a/indra/llmessage/lliosocket.cpp +++ b/indra/llmessage/lliosocket.cpp @@ -35,7 +35,6 @@ #include "llhost.h" #include "llmemtype.h" #include "llpumpio.h" -#include "llthread.h" // // constants @@ -99,31 +98,51 @@ void ll_debug_socket(const char* msg, apr_socket_t* apr_sock) /// // static -LLSocket::ptr_t LLSocket::create(EType type, U16 port) +LLSocket::ptr_t LLSocket::create(apr_pool_t* pool, EType type, U16 port) { LLMemType m1(LLMemType::MTYPE_IO_TCP); + LLSocket::ptr_t rv; + apr_socket_t* socket = NULL; + apr_pool_t* new_pool = NULL; apr_status_t status = APR_EGENERAL; - LLSocket::ptr_t rv(new LLSocket); + + // create a pool for the socket + status = apr_pool_create(&new_pool, pool); + if(ll_apr_warn_status(status)) + { + if(new_pool) apr_pool_destroy(new_pool); + return rv; + } if(STREAM_TCP == type) { - status = apr_socket_create(&rv->mSocket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, rv->mPool()); + status = apr_socket_create( + &socket, + APR_INET, + SOCK_STREAM, + APR_PROTO_TCP, + new_pool); } else if(DATAGRAM_UDP == type) { - status = apr_socket_create(&rv->mSocket, APR_INET, SOCK_DGRAM, APR_PROTO_UDP, rv->mPool()); + status = apr_socket_create( + &socket, + APR_INET, + SOCK_DGRAM, + APR_PROTO_UDP, + new_pool); } else { - rv.reset(); + if(new_pool) apr_pool_destroy(new_pool); return rv; } if(ll_apr_warn_status(status)) { - rv->mSocket = NULL; - rv.reset(); + if(new_pool) apr_pool_destroy(new_pool); return rv; } + rv = ptr_t(new LLSocket(socket, new_pool)); if(port > 0) { apr_sockaddr_t* sa = NULL; @@ -133,7 +152,7 @@ LLSocket::ptr_t LLSocket::create(EType type, U16 port) APR_UNSPEC, port, 0, - rv->mPool()); + new_pool); if(ll_apr_warn_status(status)) { rv.reset(); @@ -141,8 +160,8 @@ LLSocket::ptr_t LLSocket::create(EType type, U16 port) } // This allows us to reuse the address on quick down/up. This // is unlikely to create problems. - ll_apr_warn_status(apr_socket_opt_set(rv->mSocket, APR_SO_REUSEADDR, 1)); - status = apr_socket_bind(rv->mSocket, sa); + ll_apr_warn_status(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); + status = apr_socket_bind(socket, sa); if(ll_apr_warn_status(status)) { rv.reset(); @@ -156,7 +175,7 @@ LLSocket::ptr_t LLSocket::create(EType type, U16 port) // to keep a queue of incoming connections for ACCEPT. lldebugs << "Setting listen state for socket." << llendl; status = apr_socket_listen( - rv->mSocket, + socket, LL_DEFAULT_LISTEN_BACKLOG); if(ll_apr_warn_status(status)) { @@ -177,28 +196,21 @@ LLSocket::ptr_t LLSocket::create(EType type, U16 port) } // static -LLSocket::ptr_t LLSocket::create(apr_status_t& status, LLSocket::ptr_t& listen_socket) +LLSocket::ptr_t LLSocket::create(apr_socket_t* socket, apr_pool_t* pool) { LLMemType m1(LLMemType::MTYPE_IO_TCP); - if (!listen_socket->getSocket()) - { - status = APR_ENOSOCKET; - return LLSocket::ptr_t(); - } - LLSocket::ptr_t rv(new LLSocket); - lldebugs << "accepting socket" << llendl; - status = apr_socket_accept(&rv->mSocket, listen_socket->getSocket(), rv->mPool()); - if (status != APR_SUCCESS) + LLSocket::ptr_t rv; + if(!socket) { - rv->mSocket = NULL; - rv.reset(); return rv; } + rv = ptr_t(new LLSocket(socket, pool)); rv->mPort = PORT_EPHEMERAL; rv->setOptions(); return rv; } + bool LLSocket::blockingConnect(const LLHost& host) { if(!mSocket) return false; @@ -211,7 +223,7 @@ bool LLSocket::blockingConnect(const LLHost& host) APR_UNSPEC, host.getPort(), 0, - mPool()))) + mPool))) { return false; } @@ -222,11 +234,13 @@ bool LLSocket::blockingConnect(const LLHost& host) return true; } -LLSocket::LLSocket() : - mSocket(NULL), - mPool(LLThread::tldata().mRootPool), +LLSocket::LLSocket(apr_socket_t* socket, apr_pool_t* pool) : + mSocket(socket), + mPool(pool), mPort(PORT_INVALID) { + ll_debug_socket("Constructing wholely formed socket", mSocket); + LLMemType m1(LLMemType::MTYPE_IO_TCP); } LLSocket::~LLSocket() @@ -238,6 +252,10 @@ LLSocket::~LLSocket() ll_debug_socket("Destroying socket", mSocket); apr_socket_close(mSocket); } + if(mPool) + { + apr_pool_destroy(mPool); + } } void LLSocket::setOptions() @@ -498,8 +516,10 @@ LLIOPipe::EStatus LLIOSocketWriter::process_impl( /// LLIOServerSocket::LLIOServerSocket( + apr_pool_t* pool, LLIOServerSocket::socket_t listener, factory_t factory) : + mPool(pool), mListenSocket(listener), mReactor(factory), mInitialized(false), @@ -559,15 +579,21 @@ LLIOPipe::EStatus LLIOServerSocket::process_impl( lldebugs << "accepting socket" << llendl; PUMP_DEBUG; - apr_status_t status; - LLSocket::ptr_t llsocket(LLSocket::create(status, mListenSocket)); + apr_pool_t* new_pool = NULL; + apr_status_t status = apr_pool_create(&new_pool, mPool); + apr_socket_t* socket = NULL; + status = apr_socket_accept( + &socket, + mListenSocket->getSocket(), + new_pool); + LLSocket::ptr_t llsocket(LLSocket::create(socket, new_pool)); //EStatus rv = STATUS_ERROR; - if(llsocket && status == APR_SUCCESS) + if(llsocket) { PUMP_DEBUG; apr_sockaddr_t* remote_addr; - apr_socket_addr_get(&remote_addr, APR_REMOTE, llsocket->getSocket()); + apr_socket_addr_get(&remote_addr, APR_REMOTE, socket); char* remote_host_string; apr_sockaddr_ip_get(&remote_host_string, remote_addr); @@ -582,6 +608,7 @@ LLIOPipe::EStatus LLIOServerSocket::process_impl( { chain.push_back(LLIOPipe::ptr_t(new LLIOSocketWriter(llsocket))); pump->addChain(chain, mResponseTimeout); + status = STATUS_OK; } else { @@ -590,8 +617,7 @@ LLIOPipe::EStatus LLIOServerSocket::process_impl( } else { - char buf[256]; - llwarns << "Unable to accept linden socket: " << apr_strerror(status, buf, sizeof(buf)) << llendl; + llwarns << "Unable to create linden socket." << llendl; } PUMP_DEBUG; @@ -604,10 +630,11 @@ LLIOPipe::EStatus LLIOServerSocket::process_impl( #if 0 LLIODataSocket::LLIODataSocket( U16 suggested_port, - U16 start_discovery_port) : + U16 start_discovery_port, + apr_pool_t* pool) : mSocket(NULL) { - if(PORT_INVALID == suggested_port) return; + if(!pool || (PORT_INVALID == suggested_port)) return; if(ll_apr_warn_status(apr_socket_create(&mSocket, APR_INET, SOCK_DGRAM, APR_PROTO_UDP, pool))) return; apr_sockaddr_t* sa = NULL; if(ll_apr_warn_status(apr_sockaddr_info_get(&sa, APR_ANYADDR, APR_UNSPEC, suggested_port, 0, pool))) return; -- cgit v1.3 From 9e6a5d721193f181c39e58fe00073bece74b081a Mon Sep 17 00:00:00 2001 From: Xiaohong Bao Date: Fri, 20 Jan 2012 11:55:15 -0700 Subject: fix for SH-2823 and SH-2824: LLCurl crash inside LLBufferArray::countAfter() and LLBufferArray::copyIntoBuffers --- indra/llcommon/llthread.cpp | 15 ++++--- indra/llcommon/llthread.h | 1 + indra/llmessage/llbuffer.cpp | 90 +++++++++++++++++++++++++++++++++++++- indra/llmessage/llbuffer.h | 25 +++++++++++ indra/llmessage/llbufferstream.cpp | 8 ++++ indra/llmessage/llcurl.cpp | 3 ++ indra/llmessage/lliohttpserver.cpp | 4 +- indra/llmessage/lliosocket.cpp | 3 ++ indra/llmessage/llpumpio.cpp | 1 + 9 files changed, 142 insertions(+), 8 deletions(-) (limited to 'indra/llmessage/lliosocket.cpp') diff --git a/indra/llcommon/llthread.cpp b/indra/llcommon/llthread.cpp index 4063cc730b..a6ad6b125c 100644 --- a/indra/llcommon/llthread.cpp +++ b/indra/llcommon/llthread.cpp @@ -337,11 +337,7 @@ LLMutex::~LLMutex() void LLMutex::lock() { -#if LL_DARWIN - if (mLockingThread == LLThread::currentID()) -#else - if (mLockingThread == sThreadID) -#endif + if(isSelfLocked()) { //redundant lock mCount++; return; @@ -398,6 +394,15 @@ bool LLMutex::isLocked() } } +bool LLMutex::isSelfLocked() +{ +#if LL_DARWIN + return mLockingThread == LLThread::currentID(); +#else + return mLockingThread == sThreadID; +#endif +} + U32 LLMutex::lockingThread() const { return mLockingThread; diff --git a/indra/llcommon/llthread.h b/indra/llcommon/llthread.h index f0e0de6173..b52e70ab2e 100644 --- a/indra/llcommon/llthread.h +++ b/indra/llcommon/llthread.h @@ -151,6 +151,7 @@ public: void lock(); // blocks void unlock(); bool isLocked(); // non-blocking, but does do a lock/unlock so not free + bool isSelfLocked(); //return true if locked in a same thread U32 lockingThread() const; //get ID of locking thread protected: diff --git a/indra/llmessage/llbuffer.cpp b/indra/llmessage/llbuffer.cpp index 0316797f00..250cace6e9 100644 --- a/indra/llmessage/llbuffer.cpp +++ b/indra/llmessage/llbuffer.cpp @@ -32,6 +32,9 @@ #include "llmath.h" #include "llmemtype.h" #include "llstl.h" +#include "llthread.h" + +#define ASSERT_LLBUFFERARRAY_MUTEX_LOCKED llassert(!mMutexp || mMutexp->isSelfLocked()); /** * LLSegment @@ -224,7 +227,8 @@ void LLHeapBuffer::allocate(S32 size) * LLBufferArray */ LLBufferArray::LLBufferArray() : - mNextBaseChannel(0) + mNextBaseChannel(0), + mMutexp(NULL) { LLMemType m1(LLMemType::MTYPE_IO_BUFFER); } @@ -233,6 +237,8 @@ LLBufferArray::~LLBufferArray() { LLMemType m1(LLMemType::MTYPE_IO_BUFFER); std::for_each(mBuffers.begin(), mBuffers.end(), DeletePointer()); + + delete mMutexp; } // static @@ -243,14 +249,57 @@ LLChannelDescriptors LLBufferArray::makeChannelConsumer( return rv; } +void LLBufferArray::lock() +{ + if(mMutexp) + { + mMutexp->lock() ; + } +} + +void LLBufferArray::unlock() +{ + if(mMutexp) + { + mMutexp->unlock() ; + } +} + +LLMutex* LLBufferArray::getMutex() +{ + return mMutexp ; +} + +void LLBufferArray::setThreaded(bool threaded) +{ + if(threaded) + { + if(!mMutexp) + { + mMutexp = new LLMutex(NULL); + } + } + else + { + if(mMutexp) + { + delete mMutexp ; + mMutexp = NULL ; + } + } +} + LLChannelDescriptors LLBufferArray::nextChannel() { LLChannelDescriptors rv(mNextBaseChannel++); return rv; } +//mMutexp should be locked before calling this. S32 LLBufferArray::capacity() const { + ASSERT_LLBUFFERARRAY_MUTEX_LOCKED + S32 total = 0; const_buffer_iterator_t iter = mBuffers.begin(); const_buffer_iterator_t end = mBuffers.end(); @@ -263,6 +312,8 @@ S32 LLBufferArray::capacity() const bool LLBufferArray::append(S32 channel, const U8* src, S32 len) { + LLMutexLock lock(mMutexp) ; + LLMemType m1(LLMemType::MTYPE_IO_BUFFER); std::vector segments; if(copyIntoBuffers(channel, src, len, segments)) @@ -273,8 +324,11 @@ bool LLBufferArray::append(S32 channel, const U8* src, S32 len) return false; } +//mMutexp should be locked before calling this. bool LLBufferArray::prepend(S32 channel, const U8* src, S32 len) { + ASSERT_LLBUFFERARRAY_MUTEX_LOCKED + LLMemType m1(LLMemType::MTYPE_IO_BUFFER); std::vector segments; if(copyIntoBuffers(channel, src, len, segments)) @@ -293,6 +347,8 @@ bool LLBufferArray::insertAfter( { LLMemType m1(LLMemType::MTYPE_IO_BUFFER); std::vector segments; + + LLMutexLock lock(mMutexp) ; if(mSegments.end() != segment) { ++segment; @@ -305,8 +361,11 @@ bool LLBufferArray::insertAfter( return false; } +//mMutexp should be locked before calling this. LLBufferArray::segment_iterator_t LLBufferArray::splitAfter(U8* address) { + ASSERT_LLBUFFERARRAY_MUTEX_LOCKED + LLMemType m1(LLMemType::MTYPE_IO_BUFFER); segment_iterator_t end = mSegments.end(); segment_iterator_t it = getSegment(address); @@ -335,20 +394,26 @@ LLBufferArray::segment_iterator_t LLBufferArray::splitAfter(U8* address) return rv; } +//mMutexp should be locked before calling this. LLBufferArray::segment_iterator_t LLBufferArray::beginSegment() { + ASSERT_LLBUFFERARRAY_MUTEX_LOCKED return mSegments.begin(); } +//mMutexp should be locked before calling this. LLBufferArray::segment_iterator_t LLBufferArray::endSegment() { + ASSERT_LLBUFFERARRAY_MUTEX_LOCKED return mSegments.end(); } +//mMutexp should be locked before calling this. LLBufferArray::segment_iterator_t LLBufferArray::constructSegmentAfter( U8* address, LLSegment& segment) { + ASSERT_LLBUFFERARRAY_MUTEX_LOCKED LLMemType m1(LLMemType::MTYPE_IO_BUFFER); segment_iterator_t rv = mSegments.begin(); segment_iterator_t end = mSegments.end(); @@ -395,8 +460,10 @@ LLBufferArray::segment_iterator_t LLBufferArray::constructSegmentAfter( return rv; } +//mMutexp should be locked before calling this. LLBufferArray::segment_iterator_t LLBufferArray::getSegment(U8* address) { + ASSERT_LLBUFFERARRAY_MUTEX_LOCKED segment_iterator_t end = mSegments.end(); if(!address) { @@ -414,9 +481,11 @@ LLBufferArray::segment_iterator_t LLBufferArray::getSegment(U8* address) return end; } +//mMutexp should be locked before calling this. LLBufferArray::const_segment_iterator_t LLBufferArray::getSegment( U8* address) const { + ASSERT_LLBUFFERARRAY_MUTEX_LOCKED const_segment_iterator_t end = mSegments.end(); if(!address) { @@ -466,6 +535,8 @@ S32 LLBufferArray::countAfter(S32 channel, U8* start) const S32 count = 0; S32 offset = 0; const_segment_iterator_t it; + + LLMutexLock lock(mMutexp) ; const_segment_iterator_t end = mSegments.end(); if(start) { @@ -517,6 +588,8 @@ U8* LLBufferArray::readAfter( len = 0; S32 bytes_to_copy = 0; const_segment_iterator_t it; + + LLMutexLock lock(mMutexp) ; const_segment_iterator_t end = mSegments.end(); if(start) { @@ -568,6 +641,7 @@ U8* LLBufferArray::seek( U8* start, S32 delta) const { + ASSERT_LLBUFFERARRAY_MUTEX_LOCKED LLMemType m1(LLMemType::MTYPE_IO_BUFFER); const_segment_iterator_t it; const_segment_iterator_t end = mSegments.end(); @@ -709,9 +783,14 @@ U8* LLBufferArray::seek( return rv; } +//test use only bool LLBufferArray::takeContents(LLBufferArray& source) { LLMemType m1(LLMemType::MTYPE_IO_BUFFER); + + LLMutexLock lock(mMutexp); + source.lock(); + std::copy( source.mBuffers.begin(), source.mBuffers.end(), @@ -723,13 +802,17 @@ bool LLBufferArray::takeContents(LLBufferArray& source) std::back_insert_iterator(mSegments)); source.mSegments.clear(); source.mNextBaseChannel = 0; + source.unlock(); + return true; } +//mMutexp should be locked before calling this. LLBufferArray::segment_iterator_t LLBufferArray::makeSegment( S32 channel, S32 len) { + ASSERT_LLBUFFERARRAY_MUTEX_LOCKED LLMemType m1(LLMemType::MTYPE_IO_BUFFER); // start at the end of the buffers, because it is the most likely // to have free space. @@ -765,8 +848,10 @@ LLBufferArray::segment_iterator_t LLBufferArray::makeSegment( return send; } +//mMutexp should be locked before calling this. bool LLBufferArray::eraseSegment(const segment_iterator_t& erase_iter) { + ASSERT_LLBUFFERARRAY_MUTEX_LOCKED LLMemType m1(LLMemType::MTYPE_IO_BUFFER); // Find out which buffer contains the segment, and if it is found, @@ -792,13 +877,14 @@ bool LLBufferArray::eraseSegment(const segment_iterator_t& erase_iter) return rv; } - +//mMutexp should be locked before calling this. bool LLBufferArray::copyIntoBuffers( S32 channel, const U8* src, S32 len, std::vector& segments) { + ASSERT_LLBUFFERARRAY_MUTEX_LOCKED LLMemType m1(LLMemType::MTYPE_IO_BUFFER); if(!src || !len) return false; S32 copied = 0; diff --git a/indra/llmessage/llbuffer.h b/indra/llmessage/llbuffer.h index 1c42b6fbc6..ccdb9fa7ee 100644 --- a/indra/llmessage/llbuffer.h +++ b/indra/llmessage/llbuffer.h @@ -39,6 +39,7 @@ #include #include +class LLMutex; /** * @class LLChannelDescriptors * @brief A way simple interface to accesss channels inside a buffer @@ -564,6 +565,29 @@ public: * @return Returns true on success. */ bool eraseSegment(const segment_iterator_t& iter); + + /** + * @brief Lock the mutex if it exists + * This method locks mMutexp to make accessing LLBufferArray thread-safe + */ + void lock(); + + /** + * @brief Unlock the mutex if it exists + */ + void unlock(); + + /** + * @brief Return mMutexp + */ + LLMutex* getMutex(); + + /** + * @brief Set LLBufferArray to be shared across threads or not + * This method is to create mMutexp if is threaded. + * @param threaded Indicates this LLBufferArray instance is shared across threads if true. + */ + void setThreaded(bool threaded); //@} protected: @@ -595,6 +619,7 @@ protected: S32 mNextBaseChannel; buffer_list_t mBuffers; segment_list_t mSegments; + LLMutex* mMutexp; }; #endif // LL_LLBUFFER_H diff --git a/indra/llmessage/llbufferstream.cpp b/indra/llmessage/llbufferstream.cpp index 6257983c43..8d8ad05ad5 100644 --- a/indra/llmessage/llbufferstream.cpp +++ b/indra/llmessage/llbufferstream.cpp @@ -31,6 +31,7 @@ #include "llbuffer.h" #include "llmemtype.h" +#include "llthread.h" static const S32 DEFAULT_OUTPUT_SEGMENT_SIZE = 1024 * 4; @@ -62,6 +63,7 @@ int LLBufferStreamBuf::underflow() return EOF; } + LLMutexLock lock(mBuffer->getMutex()); LLBufferArray::segment_iterator_t iter; LLBufferArray::segment_iterator_t end = mBuffer->endSegment(); U8* last_pos = (U8*)gptr(); @@ -149,6 +151,7 @@ int LLBufferStreamBuf::overflow(int c) // since we got here, we have a buffer, and we have a character to // put on it. LLBufferArray::segment_iterator_t it; + LLMutexLock lock(mBuffer->getMutex()); it = mBuffer->makeSegment(mChannels.out(), DEFAULT_OUTPUT_SEGMENT_SIZE); if(it != mBuffer->endSegment()) { @@ -210,6 +213,7 @@ int LLBufferStreamBuf::sync() // *NOTE: I bet we could just --address if address is not NULL. // Need to think about that. + LLMutexLock lock(mBuffer->getMutex()); address = mBuffer->seek(mChannels.out(), address, -1); if(address) { @@ -273,6 +277,8 @@ streampos LLBufferStreamBuf::seekoff( // NULL is fine break; } + + LLMutexLock lock(mBuffer->getMutex()); address = mBuffer->seek(mChannels.in(), base_addr, off); if(address) { @@ -304,6 +310,8 @@ streampos LLBufferStreamBuf::seekoff( // NULL is fine break; } + + LLMutexLock lock(mBuffer->getMutex()); address = mBuffer->seek(mChannels.out(), base_addr, off); if(address) { diff --git a/indra/llmessage/llcurl.cpp b/indra/llmessage/llcurl.cpp index 5edf0dc8c0..1ab82a273b 100644 --- a/indra/llmessage/llcurl.cpp +++ b/indra/llmessage/llcurl.cpp @@ -228,6 +228,8 @@ LLMutex* LLCurl::Easy::sHandleMutexp = NULL ; //static CURL* LLCurl::Easy::allocEasyHandle() { + llassert(LLCurl::getCurlThread()) ; + CURL* ret = NULL; LLMutexLock lock(sHandleMutexp) ; @@ -489,6 +491,7 @@ void LLCurl::Easy::prepRequest(const std::string& url, LLProxy::getInstance()->applyProxySettings(this); mOutput.reset(new LLBufferArray); + mOutput->setThreaded(true); setopt(CURLOPT_WRITEFUNCTION, (void*)&curlWriteCallback); setopt(CURLOPT_WRITEDATA, (void*)this); diff --git a/indra/llmessage/lliohttpserver.cpp b/indra/llmessage/lliohttpserver.cpp index 73e8a69085..987f386aa3 100644 --- a/indra/llmessage/lliohttpserver.cpp +++ b/indra/llmessage/lliohttpserver.cpp @@ -818,6 +818,8 @@ LLIOPipe::EStatus LLHTTPResponder::process_impl( // Copy everything after mLast read to the out. LLBufferArray::segment_iterator_t seg_iter; + + buffer->lock(); seg_iter = buffer->splitAfter(mLastRead); if(seg_iter != buffer->endSegment()) { @@ -838,7 +840,7 @@ LLIOPipe::EStatus LLHTTPResponder::process_impl( } #endif } - + buffer->unlock(); // // *FIX: get rid of extra bytes off the end // diff --git a/indra/llmessage/lliosocket.cpp b/indra/llmessage/lliosocket.cpp index 54ceab3422..d5b4d45821 100644 --- a/indra/llmessage/lliosocket.cpp +++ b/indra/llmessage/lliosocket.cpp @@ -445,6 +445,7 @@ LLIOPipe::EStatus LLIOSocketWriter::process_impl( // efficient - not only because writev() is better, but also // because we won't have to do as much work to find the start // address. + buffer->lock(); LLBufferArray::segment_iterator_t it; LLBufferArray::segment_iterator_t end = buffer->endSegment(); LLSegment segment; @@ -524,6 +525,8 @@ LLIOPipe::EStatus LLIOSocketWriter::process_impl( } } + buffer->unlock(); + PUMP_DEBUG; if(done && eos) { diff --git a/indra/llmessage/llpumpio.cpp b/indra/llmessage/llpumpio.cpp index 0ff300efd0..f3ef4f2684 100644 --- a/indra/llmessage/llpumpio.cpp +++ b/indra/llmessage/llpumpio.cpp @@ -207,6 +207,7 @@ bool LLPumpIO::addChain(const chain_t& chain, F32 timeout, bool has_curl_request info.mHasCurlRequest = has_curl_request; info.setTimeoutSeconds(timeout); info.mData = LLIOPipe::buffer_ptr_t(new LLBufferArray); + info.mData->setThreaded(has_curl_request); LLLinkInfo link; #if LL_DEBUG_PIPE_TYPE_IN_PUMP lldebugs << "LLPumpIO::addChain() " << chain[0] << " '" -- cgit v1.3