- bugs fixed

This commit is contained in:
Dmytro Bogovych 2018-07-22 22:19:04 +03:00
parent 9d31979db3
commit f94960ebbb
2 changed files with 183 additions and 179 deletions

View File

@ -33,20 +33,20 @@
SocketHeap::SocketHeap(unsigned short start, unsigned short finish) SocketHeap::SocketHeap(unsigned short start, unsigned short finish)
{ {
mStart = start; mStart = start;
mFinish = finish; mFinish = finish;
} }
SocketHeap::~SocketHeap() SocketHeap::~SocketHeap()
{ {
stop(); stop();
} }
void SocketHeap::start() void SocketHeap::start()
{ {
#if defined(USE_RESIP_INTEGRATION) #if defined(USE_RESIP_INTEGRATION)
if (!mId) if (!mId)
run(); run();
#else #else
#endif #endif
} }
@ -54,243 +54,248 @@ void SocketHeap::start()
void SocketHeap::stop() void SocketHeap::stop()
{ {
#if defined(USE_RESIP_INTEGRATION) #if defined(USE_RESIP_INTEGRATION)
if (mId) if (mId)
{ {
shutdown(); shutdown();
// Wait for worker thread // Wait for worker thread
join(); join();
} }
#endif #endif
} }
void SocketHeap::setRange(unsigned short start, unsigned short finish) void SocketHeap::setRange(unsigned short start, unsigned short finish)
{ {
assert(mStart <= mFinish); assert(mStart <= mFinish);
Lock l(mGuard); Lock l(mGuard);
mStart = start; mStart = start;
mFinish = finish; mFinish = finish;
} }
void SocketHeap::range(unsigned short &start, unsigned short &finish) void SocketHeap::range(unsigned short &start, unsigned short &finish)
{ {
Lock l(mGuard); Lock l(mGuard);
start = mStart; start = mStart;
finish = mFinish; finish = mFinish;
} }
RtpPair<PDatagramSocket> SocketHeap::allocSocketPair(int family, SocketSink *sink, Multiplex m) RtpPair<PDatagramSocket> SocketHeap::allocSocketPair(int family, SocketSink *sink, Multiplex m)
{ {
PDatagramSocket rtp, rtcp; PDatagramSocket rtp, rtcp;
for (int attempt=0; (!rtp || !rtcp) && attempt < (mFinish - mStart)/2; attempt++) for (int attempt=0; (!rtp || !rtcp) && attempt < (mFinish - mStart)/2; attempt++)
{
// Allocate RTP
try
{ {
rtp = allocSocket(family, sink); // Allocate RTP
if (m == DoMultiplexing) try
rtcp = rtp; {
else rtp = allocSocket(family, sink);
rtcp = allocSocket(family, sink, rtp->localport() + 1); if (m == DoMultiplexing)
rtcp = rtp;
else
rtcp = allocSocket(family, sink, rtp->localport() + 1);
}
catch(...)
{}
} }
catch(...)
{}
}
if (!rtp || !rtcp) if (!rtp || !rtcp)
{ {
if (rtp) if (rtp)
freeSocket(rtp); freeSocket(rtp);
if (rtcp) if (rtcp)
freeSocket(rtcp); freeSocket(rtcp);
throw Exception(ERR_NET_FAILED); throw Exception(ERR_NET_FAILED);
} }
ICELogInfo(<< "Allocated socket pair " << (family == AF_INET ? "AF_INET" : "AF_INET6") << " " ICELogInfo(<< "Allocated socket pair " << (family == AF_INET ? "AF_INET" : "AF_INET6") << " "
<< rtp->socket() << ":" << rtcp->socket() << rtp->socket() << ":" << rtcp->socket()
<< " at ports " << rtp->localport() << ":"<< rtcp->localport()); << " at ports " << rtp->localport() << ":"<< rtcp->localport());
return RtpPair<PDatagramSocket>(rtp, rtcp); return RtpPair<PDatagramSocket>(rtp, rtcp);
} }
void SocketHeap::freeSocketPair(const RtpPair<PDatagramSocket> &p) void SocketHeap::freeSocketPair(const RtpPair<PDatagramSocket> &p)
{ {
freeSocket(p.mRtp); freeSocket(p.mRtp);
freeSocket(p.mRtcp); freeSocket(p.mRtcp);
} }
PDatagramSocket SocketHeap::allocSocket(int family, SocketSink* sink, int port) PDatagramSocket SocketHeap::allocSocket(int family, SocketSink* sink, int port)
{ {
Lock l(mGuard); Lock l(mGuard);
SOCKET sock = ::socket(family, SOCK_DGRAM, IPPROTO_UDP); SOCKET sock = ::socket(family, SOCK_DGRAM, IPPROTO_UDP);
if (sock == INVALID_SOCKET) if (sock == INVALID_SOCKET)
{
// Return null socket
PDatagramSocket result(new DatagramSocket());
result->mLocalPort = port;
result->mFamily = family;
return result;
}
// Obtain port number
sockaddr_in addr;
sockaddr_in6 addr6;
int result;
int testport;
do
{
testport = port ? port : rand() % ((mFinish - mStart) / 2) * 2 + mStart;
switch (family)
{ {
case AF_INET: // Return null socket
memset(&addr, 0, sizeof addr); PDatagramSocket result(new DatagramSocket());
addr.sin_family = AF_INET; result->mLocalPort = port;
addr.sin_port = htons(testport); result->mFamily = family;
result = ::bind(sock, (const sockaddr*)&addr, sizeof addr); return result;
if (result)
result = WSAGetLastError();
break;
case AF_INET6:
memset(&addr6, 0, sizeof addr6);
addr6.sin6_family = AF_INET6;
addr6.sin6_port = htons(testport);
result = ::bind(sock, (const sockaddr*)&addr6, sizeof addr6);
if (result)
result = WSAGetLastError();
break;
} }
} while (result == WSAEADDRINUSE); // Obtain port number
sockaddr_in addr;
sockaddr_in6 addr6;
int result;
int testport;
do
{
testport = port ? port : rand() % ((mFinish - mStart) / 2) * 2 + mStart;
if (result) switch (family)
{ {
closesocket(sock); case AF_INET:
throw Exception(ERR_NET_FAILED, WSAGetLastError()); memset(&addr, 0, sizeof addr);
} addr.sin_family = AF_INET;
PDatagramSocket resultObject(new DatagramSocket()); addr.sin_port = htons(testport);
resultObject->mLocalPort = testport; result = ::bind(sock, (const sockaddr*)&addr, sizeof addr);
resultObject->mHandle = sock; if (result)
if (!resultObject->setBlocking(false)) result = WSAGetLastError();
{ break;
resultObject->closeSocket();
throw Exception(ERR_NET_FAILED, WSAGetLastError());
}
// Put socket object to the map case AF_INET6:
mSocketMap[sock].mSink = sink; memset(&addr6, 0, sizeof addr6);
mSocketMap[sock].mSocket = resultObject; addr6.sin6_family = AF_INET6;
addr6.sin6_port = htons(testport);
return resultObject; result = ::bind(sock, (const sockaddr*)&addr6, sizeof addr6);
if (result)
result = WSAGetLastError();
break;
}
} while (result == WSAEADDRINUSE);
if (result)
{
closesocket(sock);
throw Exception(ERR_NET_FAILED, WSAGetLastError());
}
PDatagramSocket resultObject(new DatagramSocket());
resultObject->mLocalPort = testport;
resultObject->mHandle = sock;
if (!resultObject->setBlocking(false))
{
resultObject->closeSocket();
throw Exception(ERR_NET_FAILED, WSAGetLastError());
}
// Put socket object to the map
mSocketMap[sock].mSink = sink;
mSocketMap[sock].mSocket = resultObject;
return resultObject;
} }
void SocketHeap::freeSocket(PDatagramSocket socket) void SocketHeap::freeSocket(PDatagramSocket socket)
{ {
if (!socket) if (!socket)
return; return;
Lock l(mDeleteGuard); Lock l(mDeleteGuard);
mDeleteVector.push_back(socket); mDeleteVector.push_back(socket);
} }
void SocketHeap::processDeleted() void SocketHeap::processDeleted()
{ {
Lock l(mDeleteGuard); Lock l(mDeleteGuard);
SocketVector::iterator socketIter = mDeleteVector.begin(); SocketVector::iterator socketIter = mDeleteVector.begin();
while (socketIter != mDeleteVector.end()) while (socketIter != mDeleteVector.end())
{
// Find socket to delete in main socket map
SocketMap::iterator itemIter = mSocketMap.find((*socketIter)->mHandle);
if (itemIter != mSocketMap.end())
{ {
// If found - delete socket object from map // Find socket to delete in main socket map
mSocketMap.erase(itemIter); SocketMap::iterator itemIter = mSocketMap.find((*socketIter)->mHandle);
}
socketIter++;
}
mDeleteVector.clear(); if (itemIter != mSocketMap.end())
{
// If found - delete socket object from map
mSocketMap.erase(itemIter);
}
socketIter++;
}
mDeleteVector.clear();
} }
void SocketHeap::thread() void SocketHeap::thread()
{ {
/*#ifdef __linux__ /*#ifdef __linux__
// TODO: make epoll implementation for massive polling // TODO: make epoll implementation for massive polling
#else*/ #else*/
while (!isShutdown()) mId = ThreadIf::selfId();
{
// Define socket agreggator
DatagramAgreggator agreggator;
// Make a protected copy of sockets while (!isShutdown())
{ {
Lock l(mGuard); // Define socket agreggator
DatagramAgreggator agreggator;
// Remove deleted sockets from map and close them // Make a protected copy of sockets
{ {
processDeleted(); Lock l(mGuard);
}
// Remove deleted sockets from map and close them
// Update socket set {
for (SocketMap::iterator socketIter = mSocketMap.begin(); socketIter != mSocketMap.end(); ++socketIter) processDeleted();
}
// Update socket set
for (auto& socketIter: mSocketMap)
agreggator.addSocket(socketIter.second.mSocket);
/* for (SocketMap::iterator socketIter = mSocketMap.begin(); socketIter != mSocketMap.end(); ++socketIter)
{ {
// Add handle to set // Add handle to set
agreggator.addSocket(socketIter->second.mSocket); agreggator.addSocket(socketIter->second.mSocket);
} } */
} }
// If set is not empty
if (agreggator.count() > 0)
{
if (agreggator.waitForData(10))
{
ICELogMedia(<< "There is data on UDP sockets");
Lock l(mGuard);
// Remove deleted sockets to avoid call non-existant sinks // If set is not empty
processDeleted(); if (agreggator.count() > 0)
for (unsigned i=0; i<agreggator.count(); i++)
{ {
if (agreggator.hasDataAtIndex(i)) if (agreggator.waitForData(10))
{
//ICELogInfo(<<"Got incoming UDP packet at index " << (const int)i);
PDatagramSocket sock = agreggator.socketAt(i);
// Find corresponding data sink
SocketMap::iterator socketItemIter = mSocketMap.find(sock->mHandle);
if (socketItemIter != mSocketMap.end())
{ {
InternetAddress src; ICELogMedia(<< "There is data on UDP sockets");
unsigned received = sock->recvDatagram(src, mTempPacket, sizeof mTempPacket); Lock l(mGuard);
if ( received > 0 && received <= MAX_VALID_UDPPACKET_SIZE) // Remove deleted sockets to avoid call non-existant sinks
socketItemIter->second.mSink->onReceivedData(sock, src, mTempPacket, received); processDeleted();
for (unsigned i=0; i<agreggator.count(); i++)
{
if (agreggator.hasDataAtIndex(i))
{
//ICELogInfo(<<"Got incoming UDP packet at index " << (const int)i);
PDatagramSocket sock = agreggator.socketAt(i);
// Find corresponding data sink
SocketMap::iterator socketItemIter = mSocketMap.find(sock->mHandle);
if (socketItemIter != mSocketMap.end())
{
InternetAddress src;
unsigned received = sock->recvDatagram(src, mTempPacket, sizeof mTempPacket);
if ( received > 0 && received <= MAX_VALID_UDPPACKET_SIZE)
socketItemIter->second.mSink->onReceivedData(sock, src, mTempPacket, received);
}
// There is a call to ProcessDeleted() as OnReceivedData() could delete sockets
processDeleted();
}
} //of for
} }
}
// There is a call to ProcessDeleted() as OnReceivedData() could delete sockets else
processDeleted(); std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
} //of for
}
} }
else mId = 0;
SyncHelper::delay(1000); // Delay for 1 millisecond mShutdown = false;
} //#endif
mId = 0;
mShutdown = false;
//#endif
} }
static SocketHeap GRTPSocketHeap(20002, 25100); static SocketHeap GRTPSocketHeap(20002, 25100);
SocketHeap& SocketHeap::instance() SocketHeap& SocketHeap::instance()
{ {
return GRTPSocketHeap; return GRTPSocketHeap;
} }

View File

@ -103,10 +103,9 @@ protected:
Mutex mDeleteGuard; Mutex mDeleteGuard;
char mTempPacket[MAX_UDPPACKET_SIZE]; char mTempPacket[MAX_UDPPACKET_SIZE];
volatile bool mShutdown = false;
int mId = 0; int mId = 0;
bool isShutdown() const { return mShutdown; } //bool isShutdown() const { return mShutdown; }
virtual void thread(); virtual void thread();
// Processes mDeleteVector -> updates mSocketMap, removes socket items and closes sockets specified in mDeleteVector // Processes mDeleteVector -> updates mSocketMap, removes socket items and closes sockets specified in mDeleteVector