From b7524918fae035911bc9cc394976bbd56ae85909 Mon Sep 17 00:00:00 2001 From: Dmytro Bogovych Date: Mon, 24 Jun 2019 18:45:52 +0300 Subject: [PATCH] - fix thread pool bug - cleanup byte buffer support --- src/CMakeLists.txt | 4 +- src/engine/helper/HL_ThreadPool.h | 14 +- src/libs/ice/ICEByteBuffer.cpp | 497 +++++++++++++++--------------- src/libs/ice/ICEByteBuffer.h | 2 +- 4 files changed, 260 insertions(+), 257 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 0a1cda18..4f511cb6 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -62,7 +62,7 @@ set (USE_EVS_CODEC OFF CACHE BOOL "Use EVS codec.") set(CMAKE_POSITION_INDEPENDENT_CODE ON) -set (OPENSSL_INCLUDE ${PLATFORM_LIBS}/openssl/1.0/include) +set (OPENSSL_INCLUDE ${LIB_PLATFORM}/openssl/1.0/include) message ("Using OpenSSL include files from ${OPENSSL_INCLUDE}") if (CMAKE_SYSTEM MATCHES "Windows*") @@ -176,7 +176,7 @@ target_include_directories(rtphone $ $ ${CMAKE_CURRENT_SOURCE_DIR}/libs - ${PLATFORM_LIBS}/opus/include + ${LIB_PLATFORM}/opus/include PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/libs/ ${CMAKE_CURRENT_SOURCE_DIR}/libs/speex/include diff --git a/src/engine/helper/HL_ThreadPool.h b/src/engine/helper/HL_ThreadPool.h index a374b5de..bec1be2c 100644 --- a/src/engine/helper/HL_ThreadPool.h +++ b/src/engine/helper/HL_ThreadPool.h @@ -10,6 +10,7 @@ #include #include #include +#include class ThreadPool { @@ -42,7 +43,7 @@ inline ThreadPool::ThreadPool(size_t threads) { for(;;) { - std::function task; + std::function task; bool task_assigned = false; { std::unique_lock lock(this->queue_mutex); @@ -54,11 +55,14 @@ inline ThreadPool::ThreadPool(size_t threads) return; if(this->pause) continue; - task = std::move(this->tasks.front()); - this->tasks.pop(); + if (this->tasks.size()) + { + task = std::move(this->tasks.front()); task_assigned = true; + this->tasks.pop(); + } } - - task(); + if (task_assigned) + task(); } } ); diff --git a/src/libs/ice/ICEByteBuffer.cpp b/src/libs/ice/ICEByteBuffer.cpp index f8ac74ab..0a6ff620 100644 --- a/src/libs/ice/ICEByteBuffer.cpp +++ b/src/libs/ice/ICEByteBuffer.cpp @@ -14,261 +14,260 @@ using namespace ice; ByteBuffer::ByteBuffer() { - initEmpty(); - mData.reserve(512); + initEmpty(); + mData.reserve(512); } ByteBuffer::ByteBuffer(size_t initialCapacity) { - initEmpty(); - mData.reserve(initialCapacity); + initEmpty(); + mData.reserve(initialCapacity); } ByteBuffer::ByteBuffer(const ByteBuffer& src) -:mData(src.mData), mComponent(src.mComponent), mTag(nullptr), - mRelayed(src.mRelayed), mCopyBehavior(src.mCopyBehavior), - mDataPtr(src.mDataPtr), mDataSize(src.mDataSize) + :mData(src.mData), mComponent(src.mComponent), mTag(nullptr), + mRelayed(src.mRelayed), mCopyBehavior(src.mCopyBehavior), + mDataPtr(src.mDataPtr), mDataSize(src.mDataSize) { - if (mCopyBehavior == CopyBehavior::CopyMemory && mData.size()) - mDataPtr = &mData[0]; + if (mCopyBehavior == CopyBehavior::CopyMemory && mData.size()) + mDataPtr = &mData[0]; } ByteBuffer::ByteBuffer(const void* packetPtr, size_t packetSize, CopyBehavior behavior) -:mComponent(-1), mTag(nullptr), mRelayed(false), mCopyBehavior(behavior), mDataPtr(nullptr), mDataSize(packetSize) + :mComponent(-1), mTag(nullptr), mRelayed(false), mCopyBehavior(behavior), mDataPtr(nullptr), mDataSize(packetSize) { - switch (behavior) - { - case CopyBehavior::CopyMemory: - mData.resize(packetSize); - memcpy(&mData[0], packetPtr, packetSize); - mDataPtr = &mData[0]; - break; + switch (behavior) + { + case CopyBehavior::CopyMemory: + mData.resize(packetSize); + memcpy(&mData[0], packetPtr, packetSize); + mDataPtr = &mData[0]; + break; - case CopyBehavior::UseExternal: - mDataPtr = (uint8_t*)packetPtr; - break; + case CopyBehavior::UseExternal: + mDataPtr = reinterpret_cast(const_cast(packetPtr)); + break; - default: - break; - } + default: + break; + } } ByteBuffer::~ByteBuffer() { - if (mCopyBehavior == CopyBehavior::CopyMemory) - memset(mDataPtr, 0, mDataSize); + if (mCopyBehavior == CopyBehavior::CopyMemory) + memset(mDataPtr, 0, mDataSize); } ByteBuffer& ByteBuffer::operator = (const ByteBuffer& src) { - mRelayed = src.mRelayed; - mComment = src.mComment; - mComponent = src.mComponent; - mRemoteAddress = src.mRemoteAddress; - mTag = src.mTag; - - if (src.mCopyBehavior == CopyBehavior::CopyMemory) - { - mData = src.mData; - mCopyBehavior = CopyBehavior::CopyMemory; - syncPointer(); - - } - else - { - mDataPtr = src.mDataPtr; - mDataSize = src.mDataSize; - mCopyBehavior = CopyBehavior::UseExternal; - } - - return *this; + mRelayed = src.mRelayed; + mComment = src.mComment; + mComponent = src.mComponent; + mRemoteAddress = src.mRemoteAddress; + mTag = src.mTag; + + if (src.mCopyBehavior == CopyBehavior::CopyMemory) + { + mData = src.mData; + mCopyBehavior = CopyBehavior::CopyMemory; + syncPointer(); + } + else + { + mDataPtr = src.mDataPtr; + mDataSize = src.mDataSize; + mCopyBehavior = CopyBehavior::UseExternal; + } + + return *this; } void ByteBuffer::clear() { - mData.resize(0); - mDataSize = 0; - mDataPtr = nullptr; + mData.resize(0); + mDataSize = 0; + mDataPtr = nullptr; } size_t ByteBuffer::size() const { - return mDataSize; + return mDataSize; } const uint8_t* ByteBuffer::data() const { - return (const uint8_t*)mDataPtr; + return reinterpret_cast(mDataPtr); } uint8_t* ByteBuffer::mutableData() { - return mDataPtr; + return mDataPtr; } NetworkAddress& ByteBuffer::remoteAddress() { - return mRemoteAddress; + return mRemoteAddress; } void ByteBuffer::setRemoteAddress(const NetworkAddress& addr) { - mRemoteAddress = addr; + mRemoteAddress = addr; } void ByteBuffer::setComment(std::string comment) { - mComment = comment; + mComment = comment; } std::string ByteBuffer::comment() { - return mComment; + return mComment; } std::string ByteBuffer::hexstring() { - std::string result; - result.resize(mDataSize * 2, (char)0xCC); - for (std::vector::size_type index = 0; index < mDataSize; index++) - { - char value[3]; - sprintf(value, "%02X", (unsigned char)mDataPtr[index]); - result[index*2] = value[0]; - result[index*2+1] = value[1]; - } - return result; + std::string result; + result.resize(mDataSize * 2, (char)0xCC); + for (std::vector::size_type index = 0; index < mDataSize; index++) + { + char value[3]; + sprintf(value, "%02X", (unsigned char)mDataPtr[index]); + result[index*2] = value[0]; + result[index*2+1] = value[1]; + } + return result; } void ByteBuffer::reserve(size_t capacity) { - mData.reserve(capacity); - syncPointer(); + mData.reserve(capacity); + syncPointer(); } void ByteBuffer::insertTurnPrefix(unsigned short prefix) { - assert(mCopyBehavior == CopyBehavior::CopyMemory); + assert(mCopyBehavior == CopyBehavior::CopyMemory); - mData.insert(mData.begin(), 2, 32); - unsigned short nprefix = htons(prefix); - mData[0] = nprefix & 0xFF; - mData[1] = (nprefix & 0xFF00) >> 8; - syncPointer(); + mData.insert(mData.begin(), 2, 32); + unsigned short nprefix = htons(prefix); + mData[0] = nprefix & 0xFF; + mData[1] = (nprefix & 0xFF00) >> 8; + syncPointer(); } int ByteBuffer::component() { - return mComponent; + return mComponent; } void ByteBuffer::setComponent(int component) { - mComponent = component; + mComponent = component; } void ByteBuffer::truncate(size_t newsize) { - assert (mCopyBehavior == CopyBehavior::CopyMemory); + assert (mCopyBehavior == CopyBehavior::CopyMemory); - mData.erase(mData.begin() + newsize, mData.end()); - syncPointer(); + mData.erase(mData.begin() + newsize, mData.end()); + syncPointer(); } void ByteBuffer::erase(size_t p, size_t l) { - assert (mCopyBehavior == CopyBehavior::CopyMemory); + assert (mCopyBehavior == CopyBehavior::CopyMemory); - mData.erase(mData.begin()+p, mData.begin()+p+l); - syncPointer(); + mData.erase(mData.begin() + p, mData.begin() + p + l); + syncPointer(); } void ByteBuffer::resize(size_t newsize) { - assert (mCopyBehavior == CopyBehavior::CopyMemory); + assert (mCopyBehavior == CopyBehavior::CopyMemory); - std::vector::size_type sz = mData.size(); - mData.resize(newsize); - if (newsize > sz) - memset(&mData[sz], 0, newsize - sz); + std::vector::size_type sz = mData.size(); + mData.resize(newsize); + if (newsize > sz) + memset(&mData[sz], 0, newsize - sz); - syncPointer(); + syncPointer(); } void ByteBuffer::appendBuffer(const void *data, size_t size) { - assert (mCopyBehavior == CopyBehavior::CopyMemory); + assert (mCopyBehavior == CopyBehavior::CopyMemory); - size_t len = mData.size(); - mData.resize(len + size); - memmove(mData.data() + len, data, size); - syncPointer(); + size_t len = mData.size(); + mData.resize(len + size); + memmove(mData.data() + len, data, size); + syncPointer(); } void* ByteBuffer::tag() { - return mTag; + return mTag; } void ByteBuffer::setTag(void* tag) { - mTag = tag; + mTag = tag; } bool ByteBuffer::relayed() { - return mRelayed; + return mRelayed; } void ByteBuffer::setRelayed(bool value) { - mRelayed = value; + mRelayed = value; } void ByteBuffer::initEmpty() { - mDataPtr = nullptr; - mDataSize = 0; - mCopyBehavior = CopyBehavior::CopyMemory; - mRelayed = false; - mComponent = -1; - mTag = nullptr; + mDataPtr = nullptr; + mDataSize = 0; + mCopyBehavior = CopyBehavior::CopyMemory; + mRelayed = false; + mComponent = -1; + mTag = nullptr; } void ByteBuffer::syncPointer() { - mDataPtr = mData.empty() ? nullptr : &mData[0]; - mDataSize = mData.size(); + mDataPtr = mData.empty() ? nullptr : mData.data(); + mDataSize = mData.size(); } uint8_t ByteBuffer::operator[](int index) const { - return mDataPtr[index]; + return mDataPtr[index]; } uint8_t& ByteBuffer::operator[](int index) { - return mDataPtr[index]; + return mDataPtr[index]; } // ----------------- BitReader ------------------- BitReader::BitReader(const ByteBuffer &buffer) - :mStream(buffer.data()), mStreamLen(buffer.size()), mStreamOffset(0), mCurrentBit(0) + :mStream(buffer.data()), mStreamLen(buffer.size()), mStreamOffset(0), mCurrentBit(0) { - init(); + init(); } BitReader::BitReader(const void *input, size_t bytes) - :mStream((const uint8_t*)input), mStreamLen(bytes), mStreamOffset(0), mCurrentBit(0) + :mStream(reinterpret_cast(input)), mStreamLen(bytes), mStreamOffset(0), mCurrentBit(0) { - init(); + init(); } void BitReader::init() { - mStreamOffset = mStreamLen << 3; - mCurrentPosition = 0;//mStreamOffset - 1; - mCurrentBit = 0; + mStreamOffset = mStreamLen << 3; + mCurrentPosition = 0;//mStreamOffset - 1; + mCurrentBit = 0; } BitReader::~BitReader() @@ -277,71 +276,71 @@ BitReader::~BitReader() // Check for valid position uint8_t BitReader::readBit() { - uint8_t value = 0x00; - if (mCurrentPosition < mStreamOffset) - { - // Read single BIT - size_t currentByte = mCurrentPosition >> 3; - uint8_t currentBit = (uint8_t)(mCurrentPosition % 8); - value = ((uint8_t)(mStream[currentByte] << currentBit) >> 7); - mCurrentPosition = std::max((size_t)0, std::min(mStreamOffset-1, mCurrentPosition+1)); - } + uint8_t value = 0x00; + if (mCurrentPosition < mStreamOffset) + { + // Read single BIT + size_t currentByte = mCurrentPosition >> 3; + uint8_t currentBit = (uint8_t)(mCurrentPosition % 8); + value = ((uint8_t)(mStream[currentByte] << currentBit) >> 7); + mCurrentPosition = std::max((size_t)0, std::min(mStreamOffset-1, mCurrentPosition+1)); + } - return value; + return value; } uint32_t BitReader::readBits(size_t nrOfBits) { - assert (nrOfBits <= 32); + assert (nrOfBits <= 32); - uint32_t result = 0; - BitWriter bw(&result); - for (int i=0; i<(int)nrOfBits; i++) - bw.writeBit(readBit()); + uint32_t result = 0; + BitWriter bw(&result); + for (int i=0; i(nrOfBits); i++) + bw.writeBit(readBit()); - return result; + return result; } size_t BitReader::readBits(void *output, size_t nrOfBits) { - // Check how much bits available - nrOfBits = std::min(nrOfBits, mStreamOffset - mCurrentPosition); + // Check how much bits available + nrOfBits = std::min(nrOfBits, mStreamOffset - mCurrentPosition); - BitWriter bw(output); - for (int i=0; i<(int)nrOfBits; i++) - bw.writeBit(readBit()); + BitWriter bw(output); + for (int i=0; i(nrOfBits); i++) + bw.writeBit(readBit()); - return nrOfBits; + return nrOfBits; } size_t BitReader::count() const { - return mStreamOffset; + return mStreamOffset; } size_t BitReader::position() const { - return mCurrentPosition; + return mCurrentPosition; } // ----------------- BitWriter ------------------- BitWriter::BitWriter(ByteBuffer &buffer) - :mStream(buffer.mutableData()), mStreamLen(0), mStreamOffset(0), mCurrentBit(0) + :mStream(buffer.mutableData()), mStreamLen(0), mStreamOffset(0), mCurrentBit(0) { - init(); + init(); } BitWriter::BitWriter(void *output) - :mStream((uint8_t*)output), mStreamLen(0), mStreamOffset(0), mCurrentBit(0) + :mStream((uint8_t*)output), mStreamLen(0), mStreamOffset(0), mCurrentBit(0) { - init(); + init(); } void BitWriter::init() { - mStreamOffset = mStreamLen << 3; - mCurrentPosition = 0;//mStreamOffset - 1; - mCurrentBit = 0; + mStreamOffset = mStreamLen << 3; + mCurrentPosition = 0;//mStreamOffset - 1; + mCurrentBit = 0; } BitWriter::~BitWriter() @@ -349,212 +348,212 @@ BitWriter::~BitWriter() BitWriter& BitWriter::writeBit(int bit) { - bit = bit ? 1 : 0; + bit = bit ? 1 : 0; - // Check for current bit offset - if ((mCurrentBit % 8) == 0) - { - // Write new zero byte to the end of stream - mCurrentBit = 0; - mStreamLen++; - mStream[mStreamLen-1] = 0; - } + // Check for current bit offset + if ((mCurrentBit % 8) == 0) + { + // Write new zero byte to the end of stream + mCurrentBit = 0; + mStreamLen++; + mStream[mStreamLen-1] = 0; + } - // Write single BIT - mStream[mStreamLen-1] <<= 1; - mStream[mStreamLen-1] |= bit; - mStreamOffset++; - mCurrentPosition = mStreamOffset - 1; - mCurrentBit++; + // Write single BIT + mStream[mStreamLen-1] <<= 1; + mStream[mStreamLen-1] |= bit; + mStreamOffset++; + mCurrentPosition = mStreamOffset - 1; + mCurrentBit++; - return *this; + return *this; } size_t BitWriter::count() const { - return mStreamOffset; + return mStreamOffset; } // ----------------- BufferReader ---------------- BufferReader::BufferReader(const ByteBuffer &buffer) - :mData(buffer.data()), mSize(buffer.size()), mIndex(0) + :mData(buffer.data()), mSize(buffer.size()), mIndex(0) {} BufferReader::BufferReader(const void *input, size_t bytes) - :mData((const uint8_t*)input), mSize(bytes), mIndex(0) + :mData(reinterpret_cast(input)), mSize(bytes), mIndex(0) {} uint32_t BufferReader::readUInt() { - uint32_t nresult = 0; - readBuffer(&nresult, 4); + uint32_t nresult = 0; + readBuffer(&nresult, 4); - return ntohl(nresult); + return ntohl(nresult); } uint32_t BufferReader::readNativeUInt() { - uint32_t nresult = 0; - readBuffer(&nresult, 4); + uint32_t nresult = 0; + readBuffer(&nresult, 4); - return nresult; + return nresult; } uint16_t BufferReader::readUShort() { - uint16_t result = 0; - readBuffer(&result, 2); + uint16_t result = 0; + readBuffer(&result, 2); - return ntohs(result); + return ntohs(result); } uint16_t BufferReader::readNativeUShort() { - uint16_t result = 0; - readBuffer(&result, 2); + uint16_t result = 0; + readBuffer(&result, 2); - return result; + return result; } uint8_t BufferReader::readUChar() { - uint8_t result = 0; - readBuffer(&result, 1); + uint8_t result = 0; + readBuffer(&result, 1); - return result; + return result; } NetworkAddress BufferReader::readIp(int family) { - if (family == AF_INET) - { - sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_port = 0; - readBuffer(&addr.sin_addr.s_addr, 4); + if (family == AF_INET) + { + sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_port = 0; + readBuffer(&addr.sin_addr.s_addr, 4); - return NetworkAddress((sockaddr&)addr, sizeof(addr)); - } - else - if (family == AF_INET6) - { - sockaddr_in6 addr; - memset(&addr, 0, sizeof(addr)); - addr.sin6_family = AF_INET6; - addr.sin6_port = 0; - readBuffer(&addr.sin6_addr, 16); + return NetworkAddress(reinterpret_cast(addr), sizeof(addr)); + } + else + if (family == AF_INET6) + { + sockaddr_in6 addr; + memset(&addr, 0, sizeof(addr)); + addr.sin6_family = AF_INET6; + addr.sin6_port = 0; + readBuffer(&addr.sin6_addr, 16); - return NetworkAddress((sockaddr&)addr, sizeof(addr)); - } - return NetworkAddress(); + return NetworkAddress(reinterpret_cast(addr), sizeof(addr)); + } + return NetworkAddress(); } size_t BufferReader::readBuffer(void* dataPtr, size_t dataSize) { - if (dataSize > 0) - { - size_t available = mSize - mIndex; - if (available < dataSize) - dataSize = available; - if (NULL != dataPtr) - memcpy(dataPtr, mData + mIndex, dataSize); + if (dataSize > 0) + { + size_t available = mSize - mIndex; + if (available < dataSize) + dataSize = available; + if (nullptr != dataPtr) + memcpy(dataPtr, mData + mIndex, dataSize); - mIndex += dataSize; - return dataSize; - } - return 0; + mIndex += dataSize; + return dataSize; + } + return 0; } size_t BufferReader::readBuffer(ByteBuffer& bb, size_t dataSize) { - if (dataSize > 0) - { - // Find how much data are available in fact - size_t available = mSize - mIndex; - if (available < dataSize) - dataSize = available; + if (dataSize > 0) + { + // Find how much data are available in fact + size_t available = mSize - mIndex; + if (available < dataSize) + dataSize = available; - // Extend byte buffer - size_t startIndex = bb.size(); - bb.resize(bb.size() + dataSize); - memcpy(bb.mutableData() + startIndex, mData + mIndex, dataSize); - mIndex += dataSize; - return dataSize; - } - return 0; + // Extend byte buffer + size_t startIndex = bb.size(); + bb.resize(bb.size() + dataSize); + memcpy(bb.mutableData() + startIndex, mData + mIndex, dataSize); + mIndex += dataSize; + return dataSize; + } + return 0; } size_t BufferReader::count() const { - return mIndex; + return mIndex; } // -------------- BufferWriter ---------------------- BufferWriter::BufferWriter(ByteBuffer &buffer) - :mData(buffer.mutableData()), mIndex(0) + :mData(buffer.mutableData()), mIndex(0) {} BufferWriter::BufferWriter(void *output) - :mData((uint8_t*)output), mIndex(0) + :mData(reinterpret_cast(output)), mIndex(0) {} void BufferWriter::writeUInt(uint32_t value) { - // Convert to network order bytes - uint32_t nvalue = htonl(value); + // Convert to network order bytes + uint32_t nvalue = htonl(value); - writeBuffer(&nvalue, 4); + writeBuffer(&nvalue, 4); } void BufferWriter::writeUShort(uint16_t value) { - uint16_t nvalue = htons(value); - writeBuffer(&nvalue, 2); + uint16_t nvalue = htons(value); + writeBuffer(&nvalue, 2); } void BufferWriter::writeUChar(uint8_t value) { - writeBuffer(&value, 1); + writeBuffer(&value, 1); } void BufferWriter::writeIp(const NetworkAddress& ip) { - switch (ip.stunType()) - { - case 1/*IPv4*/: - writeBuffer(&ip.sockaddr4()->sin_addr, 4); - break; + switch (ip.stunType()) + { + case 1/*IPv4*/: + writeBuffer(&ip.sockaddr4()->sin_addr, 4); + break; - case 2/*IPv6*/: - writeBuffer(&ip.sockaddr6()->sin6_addr, 16); - break; + case 2/*IPv6*/: + writeBuffer(&ip.sockaddr6()->sin6_addr, 16); + break; - default: - assert(0); - } + default: + assert(0); + } } void BufferWriter::writeBuffer(const void* dataPtr, size_t dataSize) { - memmove(mData + mIndex, dataPtr, dataSize); - mIndex += dataSize; + memmove(mData + mIndex, dataPtr, dataSize); + mIndex += dataSize; } void BufferWriter::rewind() { - mIndex = 0; + mIndex = 0; } void BufferWriter::skip(int count) { - mIndex += count; + mIndex += count; } size_t BufferWriter::offset() const { - return mIndex; + return mIndex; } diff --git a/src/libs/ice/ICEByteBuffer.h b/src/libs/ice/ICEByteBuffer.h index 2c219b2b..33f17dd5 100644 --- a/src/libs/ice/ICEByteBuffer.h +++ b/src/libs/ice/ICEByteBuffer.h @@ -68,7 +68,7 @@ namespace ice protected: std::vector mData; // Internal storage uint8_t* mDataPtr; // Pointer to internal storage or external data - uint32_t mDataSize; // Used only for mCopyBehavior == UseExternal + size_t mDataSize; // Used only for mCopyBehavior == UseExternal NetworkAddress mRemoteAddress; // Associates buffer with IP address int mComponent; // Associates buffer with component ID std::string mComment; // Comment's for this buffer - useful in debugging