From f94960ebbb135516a1ad1ccaa75d4b9109cd294d Mon Sep 17 00:00:00 2001 From: Dmytro Bogovych Date: Sun, 22 Jul 2018 22:19:04 +0300 Subject: [PATCH] - bugs fixed --- src/engine/helper/HL_SocketHeap.cpp | 359 ++++++++++++++-------------- src/engine/helper/HL_SocketHeap.h | 3 +- 2 files changed, 183 insertions(+), 179 deletions(-) diff --git a/src/engine/helper/HL_SocketHeap.cpp b/src/engine/helper/HL_SocketHeap.cpp index 4ca30919..172fcb45 100644 --- a/src/engine/helper/HL_SocketHeap.cpp +++ b/src/engine/helper/HL_SocketHeap.cpp @@ -33,20 +33,20 @@ SocketHeap::SocketHeap(unsigned short start, unsigned short finish) { - mStart = start; - mFinish = finish; + mStart = start; + mFinish = finish; } SocketHeap::~SocketHeap() { - stop(); + stop(); } void SocketHeap::start() { #if defined(USE_RESIP_INTEGRATION) - if (!mId) - run(); + if (!mId) + run(); #else #endif } @@ -54,243 +54,248 @@ void SocketHeap::start() void SocketHeap::stop() { #if defined(USE_RESIP_INTEGRATION) - if (mId) - { - shutdown(); - // Wait for worker thread - join(); - } + if (mId) + { + shutdown(); + // Wait for worker thread + join(); + } #endif } void SocketHeap::setRange(unsigned short start, unsigned short finish) { - assert(mStart <= mFinish); - - Lock l(mGuard); - mStart = start; - mFinish = finish; + assert(mStart <= mFinish); + + Lock l(mGuard); + mStart = start; + mFinish = finish; } void SocketHeap::range(unsigned short &start, unsigned short &finish) { - Lock l(mGuard); + Lock l(mGuard); - start = mStart; - finish = mFinish; + start = mStart; + finish = mFinish; } RtpPair SocketHeap::allocSocketPair(int family, SocketSink *sink, Multiplex m) { - PDatagramSocket rtp, rtcp; - for (int attempt=0; (!rtp || !rtcp) && attempt < (mFinish - mStart)/2; attempt++) - { - // Allocate RTP - try + PDatagramSocket rtp, rtcp; + for (int attempt=0; (!rtp || !rtcp) && attempt < (mFinish - mStart)/2; attempt++) { - rtp = allocSocket(family, sink); - if (m == DoMultiplexing) - rtcp = rtp; - else - rtcp = allocSocket(family, sink, rtp->localport() + 1); + // Allocate RTP + try + { + rtp = allocSocket(family, sink); + if (m == DoMultiplexing) + rtcp = rtp; + else + rtcp = allocSocket(family, sink, rtp->localport() + 1); + } + catch(...) + {} } - catch(...) - {} - } - if (!rtp || !rtcp) - { - if (rtp) - freeSocket(rtp); - if (rtcp) - freeSocket(rtcp); - throw Exception(ERR_NET_FAILED); - } - ICELogInfo(<< "Allocated socket pair " << (family == AF_INET ? "AF_INET" : "AF_INET6") << " " - << rtp->socket() << ":" << rtcp->socket() - << " at ports " << rtp->localport() << ":"<< rtcp->localport()); + if (!rtp || !rtcp) + { + if (rtp) + freeSocket(rtp); + if (rtcp) + freeSocket(rtcp); + throw Exception(ERR_NET_FAILED); + } + ICELogInfo(<< "Allocated socket pair " << (family == AF_INET ? "AF_INET" : "AF_INET6") << " " + << rtp->socket() << ":" << rtcp->socket() + << " at ports " << rtp->localport() << ":"<< rtcp->localport()); - return RtpPair(rtp, rtcp); + return RtpPair(rtp, rtcp); } void SocketHeap::freeSocketPair(const RtpPair &p) { - freeSocket(p.mRtp); - freeSocket(p.mRtcp); + freeSocket(p.mRtp); + freeSocket(p.mRtcp); } PDatagramSocket SocketHeap::allocSocket(int family, SocketSink* sink, int port) { - Lock l(mGuard); - SOCKET sock = ::socket(family, SOCK_DGRAM, IPPROTO_UDP); - if (sock == INVALID_SOCKET) - { - // Return null socket - PDatagramSocket result(new DatagramSocket()); - result->mLocalPort = port; - result->mFamily = family; - return result; - } - - // Obtain port number - sockaddr_in addr; - sockaddr_in6 addr6; - int result; - int testport; - do - { - testport = port ? port : rand() % ((mFinish - mStart) / 2) * 2 + mStart; - - switch (family) + Lock l(mGuard); + SOCKET sock = ::socket(family, SOCK_DGRAM, IPPROTO_UDP); + if (sock == INVALID_SOCKET) { - case AF_INET: - memset(&addr, 0, sizeof addr); - addr.sin_family = AF_INET; - addr.sin_port = htons(testport); - result = ::bind(sock, (const sockaddr*)&addr, sizeof addr); - if (result) - result = WSAGetLastError(); - break; - - case AF_INET6: - memset(&addr6, 0, sizeof addr6); - addr6.sin6_family = AF_INET6; - addr6.sin6_port = htons(testport); - result = ::bind(sock, (const sockaddr*)&addr6, sizeof addr6); - if (result) - result = WSAGetLastError(); - break; + // Return null socket + PDatagramSocket result(new DatagramSocket()); + result->mLocalPort = port; + result->mFamily = family; + return result; } - } while (result == WSAEADDRINUSE); + // Obtain port number + sockaddr_in addr; + sockaddr_in6 addr6; + int result; + int testport; + do + { + testport = port ? port : rand() % ((mFinish - mStart) / 2) * 2 + mStart; - if (result) - { - closesocket(sock); - throw Exception(ERR_NET_FAILED, WSAGetLastError()); - } - PDatagramSocket resultObject(new DatagramSocket()); - resultObject->mLocalPort = testport; - resultObject->mHandle = sock; - if (!resultObject->setBlocking(false)) - { - resultObject->closeSocket(); - throw Exception(ERR_NET_FAILED, WSAGetLastError()); - } + switch (family) + { + case AF_INET: + memset(&addr, 0, sizeof addr); + addr.sin_family = AF_INET; + addr.sin_port = htons(testport); + result = ::bind(sock, (const sockaddr*)&addr, sizeof addr); + if (result) + result = WSAGetLastError(); + break; - // Put socket object to the map - mSocketMap[sock].mSink = sink; - mSocketMap[sock].mSocket = resultObject; - - return resultObject; + case AF_INET6: + memset(&addr6, 0, sizeof addr6); + addr6.sin6_family = AF_INET6; + addr6.sin6_port = htons(testport); + result = ::bind(sock, (const sockaddr*)&addr6, sizeof addr6); + if (result) + result = WSAGetLastError(); + break; + } + + } while (result == WSAEADDRINUSE); + + if (result) + { + closesocket(sock); + throw Exception(ERR_NET_FAILED, WSAGetLastError()); + } + PDatagramSocket resultObject(new DatagramSocket()); + resultObject->mLocalPort = testport; + resultObject->mHandle = sock; + if (!resultObject->setBlocking(false)) + { + resultObject->closeSocket(); + throw Exception(ERR_NET_FAILED, WSAGetLastError()); + } + + // Put socket object to the map + mSocketMap[sock].mSink = sink; + mSocketMap[sock].mSocket = resultObject; + + return resultObject; } - + void SocketHeap::freeSocket(PDatagramSocket socket) { - if (!socket) - return; + if (!socket) + return; - Lock l(mDeleteGuard); - mDeleteVector.push_back(socket); + Lock l(mDeleteGuard); + mDeleteVector.push_back(socket); } void SocketHeap::processDeleted() { - Lock l(mDeleteGuard); + Lock l(mDeleteGuard); - SocketVector::iterator socketIter = mDeleteVector.begin(); - while (socketIter != mDeleteVector.end()) - { - // Find socket to delete in main socket map - SocketMap::iterator itemIter = mSocketMap.find((*socketIter)->mHandle); - - if (itemIter != mSocketMap.end()) + SocketVector::iterator socketIter = mDeleteVector.begin(); + while (socketIter != mDeleteVector.end()) { - // If found - delete socket object from map - mSocketMap.erase(itemIter); - } - - socketIter++; - } + // Find socket to delete in main socket map + SocketMap::iterator itemIter = mSocketMap.find((*socketIter)->mHandle); - mDeleteVector.clear(); + if (itemIter != mSocketMap.end()) + { + // If found - delete socket object from map + mSocketMap.erase(itemIter); + } + + socketIter++; + } + + mDeleteVector.clear(); } void SocketHeap::thread() { -/*#ifdef __linux__ + /*#ifdef __linux__ // TODO: make epoll implementation for massive polling #else*/ - while (!isShutdown()) - { - // Define socket agreggator - DatagramAgreggator agreggator; + mId = ThreadIf::selfId(); - // Make a protected copy of sockets + while (!isShutdown()) { - Lock l(mGuard); + // Define socket agreggator + DatagramAgreggator agreggator; - // Remove deleted sockets from map and close them - { - processDeleted(); - } - - // Update socket set - for (SocketMap::iterator socketIter = mSocketMap.begin(); socketIter != mSocketMap.end(); ++socketIter) + // Make a protected copy of sockets + { + Lock l(mGuard); + + // Remove deleted sockets from map and close them + { + processDeleted(); + } + + // Update socket set + for (auto& socketIter: mSocketMap) + agreggator.addSocket(socketIter.second.mSocket); + + /* for (SocketMap::iterator socketIter = mSocketMap.begin(); socketIter != mSocketMap.end(); ++socketIter) { // Add handle to set agreggator.addSocket(socketIter->second.mSocket); - } - } - - // If set is not empty - if (agreggator.count() > 0) - { - if (agreggator.waitForData(10)) - { - ICELogMedia(<< "There is data on UDP sockets"); - Lock l(mGuard); + } */ + } - // Remove deleted sockets to avoid call non-existant sinks - processDeleted(); - - for (unsigned i=0; i 0) { - if (agreggator.hasDataAtIndex(i)) - { - //ICELogInfo(<<"Got incoming UDP packet at index " << (const int)i); - PDatagramSocket sock = agreggator.socketAt(i); - - // Find corresponding data sink - SocketMap::iterator socketItemIter = mSocketMap.find(sock->mHandle); - - if (socketItemIter != mSocketMap.end()) + if (agreggator.waitForData(10)) { - InternetAddress src; - unsigned received = sock->recvDatagram(src, mTempPacket, sizeof mTempPacket); + ICELogMedia(<< "There is data on UDP sockets"); + Lock l(mGuard); - if ( received > 0 && received <= MAX_VALID_UDPPACKET_SIZE) - socketItemIter->second.mSink->onReceivedData(sock, src, mTempPacket, received); + // Remove deleted sockets to avoid call non-existant sinks + processDeleted(); + + for (unsigned i=0; imHandle); + + if (socketItemIter != mSocketMap.end()) + { + InternetAddress src; + unsigned received = sock->recvDatagram(src, mTempPacket, sizeof mTempPacket); + + if ( received > 0 && received <= MAX_VALID_UDPPACKET_SIZE) + socketItemIter->second.mSink->onReceivedData(sock, src, mTempPacket, received); + } + + // There is a call to ProcessDeleted() as OnReceivedData() could delete sockets + processDeleted(); + } + } //of for } - - // There is a call to ProcessDeleted() as OnReceivedData() could delete sockets - processDeleted(); - } - } //of for - } + } + else + std::this_thread::sleep_for(std::chrono::milliseconds(1)); } - else - SyncHelper::delay(1000); // Delay for 1 millisecond - } - mId = 0; - mShutdown = false; -//#endif + mId = 0; + mShutdown = false; + //#endif } static SocketHeap GRTPSocketHeap(20002, 25100); SocketHeap& SocketHeap::instance() { - return GRTPSocketHeap; + return GRTPSocketHeap; } diff --git a/src/engine/helper/HL_SocketHeap.h b/src/engine/helper/HL_SocketHeap.h index 63694254..bdd6d53e 100644 --- a/src/engine/helper/HL_SocketHeap.h +++ b/src/engine/helper/HL_SocketHeap.h @@ -103,10 +103,9 @@ protected: Mutex mDeleteGuard; char mTempPacket[MAX_UDPPACKET_SIZE]; - volatile bool mShutdown = false; int mId = 0; - bool isShutdown() const { return mShutdown; } + //bool isShutdown() const { return mShutdown; } virtual void thread(); // Processes mDeleteVector -> updates mSocketMap, removes socket items and closes sockets specified in mDeleteVector