Compare commits

..

8 Commits

42 changed files with 856 additions and 494 deletions
+10 -2
View File
@@ -360,13 +360,21 @@ endif()
target_compile_definitions(rtphone PUBLIC ${DEFINES} ) target_compile_definitions(rtphone PUBLIC ${DEFINES} )
if (TARGET_LINUX) if (TARGET_LINUX)
target_link_options(rtphone PUBLIC -Wl,-Bstatic) # PRIVATE, not PUBLIC: rtphone is a STATIC library, so these link options are
# never used to build rtphone itself and must not propagate to consumers.
# As PUBLIC they leaked into every consumer's LINK_FLAGS as an adjacent
# "-Wl,-Bstatic -Wl,-Bdynamic" pair (the wrapped libraries land in a separate
# LINK_LIBRARIES section, so nothing is actually wrapped). The trailing
# -Bdynamic forced the linker back into dynamic-search mode, which broke
# fully-static consumers (e.g. vq-core built with SERVER_STATIC_LINKING=ON:
# "attempted static link of dynamic object libz.so").
target_link_options(rtphone PRIVATE -Wl,-Bstatic)
target_compile_options(rtphone PUBLIC -Wno-deprecated -Wno-deprecated-declarations) target_compile_options(rtphone PUBLIC -Wno-deprecated -Wno-deprecated-declarations)
endif() endif()
target_link_libraries(rtphone PUBLIC ${LIBS_STATIC}) target_link_libraries(rtphone PUBLIC ${LIBS_STATIC})
if (TARGET_LINUX) if (TARGET_LINUX)
target_link_options(rtphone PUBLIC -Wl,-Bdynamic) target_link_options(rtphone PRIVATE -Wl,-Bdynamic)
endif() endif()
target_include_directories(rtphone target_include_directories(rtphone
+9 -11
View File
@@ -181,9 +181,6 @@ void AgentImpl::processConfig(JsonCpp::Value &d, JsonCpp::Value &answer)
config()[CONFIG_IPV4] = d["ipv4"].asBool(); config()[CONFIG_IPV4] = d["ipv4"].asBool();
config()[CONFIG_IPV6] = d["ipv6"].asBool(); config()[CONFIG_IPV6] = d["ipv6"].asBool();
if (transport == "tls")
config()[CONFIG_SIPS] = true;
// Log file // Log file
std::string logfile = d["logfile"].asString(); std::string logfile = d["logfile"].asString();
ice::Logger& logger = ice::GLogger; ice::Logger& logger = ice::GLogger;
@@ -195,7 +192,7 @@ void AgentImpl::processConfig(JsonCpp::Value &d, JsonCpp::Value &answer)
mUseNativeAudio = d["nativeaudio"].asBool(); mUseNativeAudio = d["nativeaudio"].asBool();
config()[CONFIG_OWN_DNS] = d["dns_servers"].asString(); config()[CONFIG_OWN_DNS] = d["dns_servers"].asString();
config()[CONFIG_SIPS] = d["secure"].asBool(); config()[CONFIG_SIPS] = d["secure"].asBool() || transport == "tls";
config()[CONFIG_STUNSERVER_IP] = d["stun_server"].asString(); config()[CONFIG_STUNSERVER_IP] = d["stun_server"].asString();
answer["status"] = Status_Ok; answer["status"] = Status_Ok;
@@ -483,18 +480,18 @@ void AgentImpl::processDestroySession(JsonCpp::Value& request, JsonCpp::Value& a
void AgentImpl::processWaitForEvent(JsonCpp::Value &request, JsonCpp::Value &answer) void AgentImpl::processWaitForEvent(JsonCpp::Value &request, JsonCpp::Value &answer)
{ {
std::unique_lock<std::recursive_mutex> l(mAgentMutex); // Deliberately does NOT take mAgentMutex: events are produced by the worker
// thread inside process(), which needs mAgentMutex. Holding it here would
//int x = 0; // stall all SIP/media processing for the whole timeout and guarantee that
//int y = 1/x; // the awaited event can never arrive during the wait.
int timeout = 0; int timeout = 0;
if (request.isMember("timeout")) if (request.isMember("timeout"))
timeout = request["timeout"].asInt(); timeout = request["timeout"].asInt();
std::unique_lock<std::mutex> eventLock(mEventListMutex); std::unique_lock<std::mutex> eventLock(mEventListMutex);
if (mEventList.empty()) mEventListChangeCondVar.wait_for(eventLock, chrono::milliseconds(timeout),
mEventListChangeCondVar.wait_for(eventLock, chrono::milliseconds(timeout)); [this]() { return !mEventList.empty(); });
if (!mEventList.empty()) if (!mEventList.empty())
{ {
@@ -521,7 +518,7 @@ void AgentImpl::processGetMediaStats(JsonCpp::Value& request, JsonCpp::Value& an
answer["codec"] = result[SessionInfo_AudioCodec].asStdString(); answer["codec"] = result[SessionInfo_AudioCodec].asStdString();
if (result.exists(SessionInfo_NetworkMos)) if (result.exists(SessionInfo_NetworkMos))
answer["network_mos"] = result[SessionInfo_NetworkMos].asFloat(); answer["network_mos"] = result[SessionInfo_NetworkMos].asFloat();
if (result.exists(SessionInfo_PacketLoss)) if (result.exists(SessionInfo_LostRtp))
answer["rtp_lost"] = result[SessionInfo_LostRtp].asInt(); answer["rtp_lost"] = result[SessionInfo_LostRtp].asInt();
if (result.exists(SessionInfo_DroppedRtp)) if (result.exists(SessionInfo_DroppedRtp))
answer["rtp_dropped"] = result[SessionInfo_DroppedRtp].asInt(); answer["rtp_dropped"] = result[SessionInfo_DroppedRtp].asInt();
@@ -749,6 +746,7 @@ void AgentImpl::onSessionTerminated(PSession s, int responsecode, int reason)
if (mOutgoingAudioDump) if (mOutgoingAudioDump)
mOutgoingAudioDump->close(); mOutgoingAudioDump->close();
*/ */
if (mAudioManager)
mAudioManager->stop(mUseNativeAudio ? AudioManager::atReceiver : AudioManager::atNull); mAudioManager->stop(mUseNativeAudio ? AudioManager::atReceiver : AudioManager::atNull);
// Gather statistics before // Gather statistics before
EVENT_WITH_NAME("session_terminated"); EVENT_WITH_NAME("session_terminated");
+2 -1
View File
@@ -13,6 +13,7 @@
#include "Agent_AudioManager.h" #include "Agent_AudioManager.h"
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <atomic>
class AgentImpl: public UserAgent, public MT::Stream::MediaObserver class AgentImpl: public UserAgent, public MT::Stream::MediaObserver
@@ -32,7 +33,7 @@ protected:
std::shared_ptr<std::thread> mThread; std::shared_ptr<std::thread> mThread;
volatile bool mShutdown; std::atomic<bool> mShutdown;
std::shared_ptr<MT::Terminal> mTerminal; std::shared_ptr<MT::Terminal> mTerminal;
std::shared_ptr<AudioManager> mAudioManager; std::shared_ptr<AudioManager> mAudioManager;
Audio::DataConnection* mAudioMonitoring = nullptr; Audio::DataConnection* mAudioMonitoring = nullptr;
+22 -19
View File
@@ -9,11 +9,7 @@
using namespace Audio; using namespace Audio;
DataWindow::DataWindow() DataWindow::DataWindow()
{ {}
mFilled = 0;
mData = nullptr;
mCapacity = 0;
}
DataWindow::~DataWindow() DataWindow::~DataWindow()
{ {
@@ -24,10 +20,15 @@ DataWindow::~DataWindow()
} }
} }
void DataWindow::setCapacity(int capacity) void DataWindow::setCapacity(size_t capacity)
{ {
Lock l(mMutex); Lock l(mMutex);
int tail = capacity - mCapacity;
// The window only ever grows; a smaller request keeps the current buffer.
if (capacity <= mCapacity)
return;
size_t tail = capacity - mCapacity;
char* buffer = mData; char* buffer = mData;
mData = (char*)realloc(mData, capacity); mData = (char*)realloc(mData, capacity);
if (!mData) if (!mData)
@@ -41,7 +42,7 @@ void DataWindow::setCapacity(int capacity)
mCapacity = capacity; mCapacity = capacity;
} }
void DataWindow::addZero(int length) void DataWindow::addZero(size_t length)
{ {
Lock l(mMutex); Lock l(mMutex);
@@ -60,7 +61,7 @@ void DataWindow::addZero(int length)
} }
void DataWindow::add(const void* data, int length) void DataWindow::add(const void* data, size_t length)
{ {
Lock l(mMutex); Lock l(mMutex);
@@ -94,7 +95,7 @@ void DataWindow::add(short sample)
add(&sample, sizeof sample); add(&sample, sizeof sample);
} }
void DataWindow::erase(int length) void DataWindow::erase(size_t length)
{ {
Lock l(mMutex); Lock l(mMutex);
if (length > mFilled) if (length > mFilled)
@@ -120,21 +121,21 @@ void DataWindow::clear()
mFilled = 0; mFilled = 0;
} }
short DataWindow::shortAt(int index) const short DataWindow::shortAt(size_t index) const
{ {
Lock l(mMutex); Lock l(mMutex);
assert(index < mFilled / 2); assert(index < mFilled / 2);
return ((short*)mData)[index]; return ((short*)mData)[index];
} }
void DataWindow::setShortAt(short value, int index) void DataWindow::setShortAt(short value, size_t index)
{ {
Lock l(mMutex); Lock l(mMutex);
assert(index < mFilled / 2); assert(index < mFilled / 2);
((short*)mData)[index] = value; ((short*)mData)[index] = value;
} }
int DataWindow::read(void* buffer, int length) size_t DataWindow::read(void* buffer, size_t length)
{ {
Lock l(mMutex); Lock l(mMutex);
if (length > mFilled) if (length > mFilled)
@@ -150,25 +151,27 @@ int DataWindow::read(void* buffer, int length)
return length; return length;
} }
int DataWindow::filled() const size_t DataWindow::filled() const
{ {
Lock l(mMutex); Lock l(mMutex);
return mFilled; return mFilled;
} }
void DataWindow::setFilled(int filled) void DataWindow::setFilled(size_t filled)
{ {
Lock l(mMutex); Lock l(mMutex);
if (filled > mCapacity)
throw std::bad_alloc();
mFilled = filled; mFilled = filled;
} }
int DataWindow::capacity() const size_t DataWindow::capacity() const
{ {
Lock l(mMutex); Lock l(mMutex);
return mCapacity; return mCapacity;
} }
void DataWindow::zero(int length) void DataWindow::zero(size_t length)
{ {
Lock l(mMutex); Lock l(mMutex);
assert(length <= mCapacity); assert(length <= mCapacity);
@@ -189,10 +192,10 @@ size_t DataWindow::moveTo(DataWindow& dst, size_t size)
return avail; return avail;
} }
std::chrono::milliseconds DataWindow::getTimeLength(int samplerate, int channels) const std::chrono::milliseconds DataWindow::getTimeLength(const Audio::Format& fmt) const
{ {
Lock l(mMutex); Lock l(mMutex);
return std::chrono::milliseconds(mFilled / sizeof(short) / channels / (samplerate / 1000)); return std::chrono::milliseconds(mFilled / sizeof(short) / fmt.channels() / (fmt.rate()/ 1000));
} }
void DataWindow::makeStereoFromMono(DataWindow& dst, DataWindow& src) void DataWindow::makeStereoFromMono(DataWindow& dst, DataWindow& src)
+17 -16
View File
@@ -8,6 +8,7 @@
#include "../helper/HL_ByteBuffer.h" #include "../helper/HL_ByteBuffer.h"
#include "../helper/HL_Sync.h" #include "../helper/HL_Sync.h"
#include "Audio_Interface.h"
namespace Audio namespace Audio
{ {
@@ -17,34 +18,34 @@ public:
DataWindow(); DataWindow();
~DataWindow(); ~DataWindow();
void setCapacity(int capacity); void setCapacity(size_t capacity);
int capacity() const; size_t capacity() const;
void addZero(int length); void addZero(size_t length);
void add(const void* data, int length); void add(const void* data, size_t length);
void add(short sample); void add(short sample);
int read(void* buffer, int length); size_t read(void* buffer, size_t length);
void erase(int length = -1); void erase(size_t length);
const char* data() const; const char* data() const;
char* mutableData(); char* mutableData();
int filled() const; size_t filled() const;
void setFilled(int filled); void setFilled(size_t filled);
void clear(); void clear();
short shortAt(int index) const; short shortAt(size_t index) const;
void setShortAt(short value, int index); void setShortAt(short value, size_t index);
void zero(int length); void zero(size_t length);
size_t moveTo(DataWindow& dst, size_t size); size_t moveTo(DataWindow& dst, size_t size /* in bytes*/ );
std::chrono::milliseconds getTimeLength(int samplerate, int channels) const; std::chrono::milliseconds getTimeLength(const Format& fmt) const;
static void makeStereoFromMono(DataWindow& dst, DataWindow& src); static void makeStereoFromMono(DataWindow& dst, DataWindow& src);
protected: protected:
mutable Mutex mMutex; mutable Mutex mMutex;
char* mData; char* mData = nullptr;
int mFilled; size_t mFilled = 0;
int mCapacity; size_t mCapacity = 0;
}; };
} }
#endif #endif
+24 -4
View File
@@ -94,6 +94,7 @@ Mixer::~Mixer()
void Mixer::unregisterChannel(void* channel) void Mixer::unregisterChannel(void* channel)
{ {
Lock l(mMutex);
for (int i=0; i<AUDIO_MIX_CHANNEL_COUNT; i++) for (int i=0; i<AUDIO_MIX_CHANNEL_COUNT; i++)
{ {
Stream& c = mChannelList[i]; Stream& c = mChannelList[i];
@@ -108,6 +109,7 @@ void Mixer::unregisterChannel(void* channel)
void Mixer::clear(void* context, unsigned ssrc) void Mixer::clear(void* context, unsigned ssrc)
{ {
Lock l(mMutex);
for (int i=0; i<AUDIO_MIX_CHANNEL_COUNT; i++) for (int i=0; i<AUDIO_MIX_CHANNEL_COUNT; i++)
{ {
Stream& c = mChannelList[i]; Stream& c = mChannelList[i];
@@ -147,6 +149,7 @@ void Mixer::addPcm(void* context, unsigned ssrc,
{ {
assert(inputRate == 8000 || inputRate == 16000 || inputRate == 32000); assert(inputRate == 8000 || inputRate == 16000 || inputRate == 32000);
Lock l(mMutex);
int i; int i;
// Locate a channel // Locate a channel
@@ -172,6 +175,7 @@ void Mixer::addPcm(void* context, unsigned ssrc, Audio::DataWindow& w, int rate,
{ {
assert(rate == 8000 || rate == 16000 || rate == 32000 || rate == 48000); assert(rate == 8000 || rate == 16000 || rate == 32000 || rate == 48000);
Lock l(mMutex);
int i; int i;
// Locate a channel // Locate a channel
@@ -196,6 +200,8 @@ void Mixer::addPcm(void* context, unsigned ssrc, Audio::DataWindow& w, int rate,
void Mixer::mix() void Mixer::mix()
{ {
Lock l(mMutex);
// Current sample // Current sample
int sample = 0; int sample = 0;
@@ -310,6 +316,8 @@ void Mixer::mix()
int Mixer::getPcm(void* outputData, int outputLength) int Mixer::getPcm(void* outputData, int outputLength)
{ {
Lock l(mMutex);
if (mOutput.filled() < outputLength) if (mOutput.filled() < outputLength)
mix(); mix();
@@ -320,14 +328,26 @@ int Mixer::getPcm(void* outputData, int outputLength)
int Mixer::mixAndGetPcm(Audio::DataWindow& output) int Mixer::mixAndGetPcm(Audio::DataWindow& output)
{ {
Lock l(mMutex);
// Mix // Mix
mix(); mix();
// Set output space size_t avail = mOutput.filled();
output.setCapacity(mOutput.filled()); if (!avail)
{
output.setFilled(0);
return 0;
}
// Read mixed data to output // Make sure output has enough space (setCapacity only ever grows the window)
return mOutput.read(output.mutableData(), output.capacity()); if (output.capacity() < avail)
output.setCapacity(avail);
// Read mixed data to output and publish the real byte count
size_t got = mOutput.read(output.mutableData(), avail);
output.setFilled(got);
return static_cast<int>(got);
} }
int Mixer::available() int Mixer::available()
+10 -7
View File
@@ -18,9 +18,7 @@ namespace Audio
SpeexResampler::SpeexResampler() SpeexResampler::SpeexResampler()
:mContext(NULL), mErrorCode(0), mSourceRate(0), mDestRate(0), mLastSample(0), mChannels(0) {}
{
}
void SpeexResampler::start(int channels, int sourceRate, int destRate) void SpeexResampler::start(int channels, int sourceRate, int destRate)
{ {
@@ -51,6 +49,11 @@ void SpeexResampler::stop()
} }
} }
bool SpeexResampler::isOpened() const
{
return mContext != nullptr;
}
SpeexResampler::~SpeexResampler() SpeexResampler::~SpeexResampler()
{ {
stop(); stop();
@@ -113,22 +116,22 @@ size_t SpeexResampler::processBuffer(const void* src, size_t sourceLength, size_
return outLen * sizeof(short) * mChannels; return outLen * sizeof(short) * mChannels;
} }
int SpeexResampler::sourceRate() int SpeexResampler::sourceRate() const
{ {
return mSourceRate; return mSourceRate;
} }
int SpeexResampler::destRate() int SpeexResampler::destRate() const
{ {
return mDestRate; return mDestRate;
} }
size_t SpeexResampler::getDestLength(size_t sourceLen) size_t SpeexResampler::getDestLength(size_t sourceLen) const
{ {
return size_t(sourceLen * (float(mDestRate) / mSourceRate) + 0.5f); return size_t(sourceLen * (float(mDestRate) / mSourceRate) + 0.5f);
} }
size_t SpeexResampler::getSourceLength(size_t destLen) size_t SpeexResampler::getSourceLength(size_t destLen) const
{ {
// Here we want to get 'destLen' number of samples // Here we want to get 'destLen' number of samples
return size_t(destLen * (float(mSourceRate) / mDestRate) + 0.5f); return size_t(destLen * (float(mSourceRate) / mDestRate) + 0.5f);
+12 -10
View File
@@ -24,23 +24,25 @@ namespace Audio
void start(int channels, int sourceRate, int destRate); void start(int channels, int sourceRate, int destRate);
void stop(); void stop();
bool isOpened() const;
size_t processBuffer(const void* source, size_t sourceLength, size_t& sourceProcessed, size_t processBuffer(const void* source, size_t sourceLength, size_t& sourceProcessed,
void* dest, size_t destCapacity); void* dest, size_t destCapacity);
int sourceRate(); int sourceRate() const;
int destRate(); int destRate() const;
size_t getDestLength(size_t sourceLen); size_t getDestLength(size_t sourceLen) const;
size_t getSourceLength(size_t destLen); size_t getSourceLength(size_t destLen) const;
// Returns instance + speex encoder size in bytes // Returns instance + speex encoder size in bytes
size_t getSize() const; size_t getSize() const;
protected: protected:
void* mContext; void* mContext = nullptr;
int mErrorCode; int mErrorCode = 0;
int mSourceRate, int mSourceRate = 0,
mDestRate, mDestRate = 0,
mChannels; mChannels = 0;
short mLastSample; short mLastSample = 0;
}; };
typedef SpeexResampler Resampler; typedef SpeexResampler Resampler;
+15 -16
View File
@@ -60,17 +60,11 @@ std::string WavFileReader::readChunk()
uint32_t size = 0; uint32_t size = 0;
readBuffer(&size, 4); readBuffer(&size, 4);
if (result == "fact") if (result == "data")
{
uint32_t dataLength = 0;
readBuffer(&dataLength, sizeof dataLength);
mDataLength = dataLength;
}
else
if (result != "data")
mInput->seekg(size, std::ios_base::beg);
else
mDataLength = size; mDataLength = size;
else
// Skip the chunk body; RIFF chunks are word-aligned, so odd sizes carry a pad byte
mInput->seekg(std::streamoff(size + (size & 1)), std::ios_base::cur);
return result; return result;
} }
@@ -151,7 +145,9 @@ bool WavFileReader::open(const std::filesystem::path& p)
mBits = 0; mBits = 0;
readBuffer(&mBits, sizeof(mBits)); readBuffer(&mBits, sizeof(mBits));
if (mBits !=8 && mBits != 16) // Only 16-bit PCM is supported: the read path feeds the data
// directly into a 16-bit resampler.
if (mBits != 16)
THROW_READERROR; THROW_READERROR;
// Look for the chunk 'data' // Look for the chunk 'data'
@@ -222,7 +218,8 @@ size_t WavFileReader::read(short* buffer, size_t samples)
auto filePosition = mInput->tellg(); auto filePosition = mInput->tellg();
// Check how much data we can read // Check how much data we can read
size_t fileAvailable = mDataLength + mDataOffset - filePosition; std::streamoff dataEnd = std::streamoff(mDataLength) + mDataOffset;
size_t fileAvailable = filePosition < dataEnd ? size_t(dataEnd - filePosition) : 0;
requiredBytes = fileAvailable < requiredBytes ? fileAvailable : requiredBytes; requiredBytes = fileAvailable < requiredBytes ? fileAvailable : requiredBytes;
} }
@@ -254,8 +251,9 @@ size_t WavFileReader::readRaw(short* buffer, size_t samples)
auto filePosition = mInput->tellg(); auto filePosition = mInput->tellg();
// Check how much data we can read // Check how much data we can read
size_t fileAvailable = mDataLength + mDataOffset - filePosition; std::streamoff dataEnd = std::streamoff(mDataLength) + mDataOffset;
requiredBytes = (int)fileAvailable < requiredBytes ? (int)fileAvailable : requiredBytes; size_t fileAvailable = filePosition < dataEnd ? size_t(dataEnd - filePosition) : 0;
requiredBytes = fileAvailable < requiredBytes ? fileAvailable : requiredBytes;
} }
size_t readBytes = tryReadBuffer(buffer, requiredBytes); size_t readBytes = tryReadBuffer(buffer, requiredBytes);
@@ -332,10 +330,11 @@ bool WavFileWriter::open(const std::filesystem::path& p, int samplerate, int cha
mChannels = channels; mChannels = channels;
mOutput = std::make_unique<std::ofstream>(p, std::ios::binary | std::ios::trunc); mOutput = std::make_unique<std::ofstream>(p, std::ios::binary | std::ios::trunc);
if (!mOutput) if (!mOutput->is_open())
{ {
int errorcode = errno; int errorcode = errno;
ICELogError(<< "Failed to create .wav file: filename = " << p << " , error = " << errorcode); ICELogError(<< "Failed to create .wav file: filename = " << p << " , error = " << errorcode);
mOutput.reset();
return false; return false;
} }
@@ -420,7 +419,7 @@ size_t WavFileWriter::write(const void* buffer, size_t bytes)
bool WavFileWriter::isOpened() const bool WavFileWriter::isOpened() const
{ {
LOCK; LOCK;
return mOutput.get(); return mOutput && mOutput->is_open();
} }
std::filesystem::path WavFileWriter::path() const std::filesystem::path WavFileWriter::path() const
+15 -7
View File
@@ -93,11 +93,13 @@ void AudioProvider::updateSdpOffer(resip::SdpContents::Session::Medium& sdp, Sdp
// Check if SRTP suite is found already or not // Check if SRTP suite is found already or not
if (mSrtpSuite == SRTP_NONE) if (mSrtpSuite == SRTP_NONE)
{ {
// RFC 4568 requires a unique tag per crypto attribute; use the suite id.
for (int suite = SRTP_AES_128_AUTH_80; suite <= SRTP_LAST; suite++) for (int suite = SRTP_AES_128_AUTH_80; suite <= SRTP_LAST; suite++)
sdp.addAttribute("crypto", resip::Data(createCryptoAttribute((SrtpSuite)suite))); sdp.addAttribute("crypto", resip::Data(createCryptoAttribute((SrtpSuite)suite, suite)));
} }
else else
sdp.addAttribute("crypto", resip::Data(createCryptoAttribute(mSrtpSuite))); // Answer/re-offer: echo the tag of the negotiated attribute.
sdp.addAttribute("crypto", resip::Data(createCryptoAttribute(mSrtpSuite, mSrtpTag)));
} }
// Use CodecListPriority mCodecPriority adapter to work with codec priorities // Use CodecListPriority mCodecPriority adapter to work with codec priorities
@@ -246,11 +248,13 @@ bool AudioProvider::processSdpOffer(const resip::SdpContents::Session::Medium& m
{ {
const resip::Data& attr = *attrIter; const resip::Data& attr = *attrIter;
ByteBuffer tempkey; ByteBuffer tempkey;
SrtpSuite suite = processCryptoAttribute(attr, tempkey); int tag = 1;
if (suite > ss) SrtpSuite suite = processCryptoAttribute(attr, tempkey, &tag);
if (srtpSuiteStrength(suite) > srtpSuiteStrength(ss))
{ {
ss = suite; ss = suite;
mSrtpSuite = suite; mSrtpSuite = suite;
mSrtpTag = tag;
key = tempkey; key = tempkey;
} }
} }
@@ -295,26 +299,30 @@ MT::PStream AudioProvider::activeStream()
return mActiveStream; return mActiveStream;
} }
std::string AudioProvider::createCryptoAttribute(SrtpSuite suite) std::string AudioProvider::createCryptoAttribute(SrtpSuite suite, int tag)
{ {
if (!mActiveStream) if (!mActiveStream)
return ""; return "";
// Print key to base64 string // Print key to base64 string
PByteBuffer keyBuffer = mActiveStream->srtp().outgoingKey(suite).first; PByteBuffer keyBuffer = mActiveStream->srtp().outgoingKey(suite).first;
if (!keyBuffer)
return "";
resip::Data d(keyBuffer->data(), keyBuffer->size()); resip::Data d(keyBuffer->data(), keyBuffer->size());
resip::Data keyText = d.base64encode(); resip::Data keyText = d.base64encode();
return std::format("{} {} inline:{}", 1, toString(suite), keyText.c_str()); return std::format("{} {} inline:{}", tag, toString(suite), keyText.c_str());
} }
SrtpSuite AudioProvider::processCryptoAttribute(const resip::Data& value, ByteBuffer& key) SrtpSuite AudioProvider::processCryptoAttribute(const resip::Data& value, ByteBuffer& key, int* tag)
{ {
int srtpTag = 0; int srtpTag = 0;
char suite[64], keyChunk[256]; char suite[64], keyChunk[256];
int components = sscanf(value.c_str(), "%d %63s inline: %255s", &srtpTag, suite, keyChunk); int components = sscanf(value.c_str(), "%d %63s inline: %255s", &srtpTag, suite, keyChunk);
if (components != 3) if (components != 3)
return SRTP_NONE; return SRTP_NONE;
if (tag)
*tag = srtpTag;
const char* delimiter = strchr(keyChunk, '|'); const char* delimiter = strchr(keyChunk, '|');
resip::Data keyText; resip::Data keyText;
+3 -2
View File
@@ -74,7 +74,7 @@ public:
void setupMirror(bool enable); void setupMirror(bool enable);
void configureMediaObserver(MT::Stream::MediaObserver* observer, void* userTag); void configureMediaObserver(MT::Stream::MediaObserver* observer, void* userTag);
static SrtpSuite processCryptoAttribute(const resip::Data& value, ByteBuffer& key); static SrtpSuite processCryptoAttribute(const resip::Data& value, ByteBuffer& key, int* tag = nullptr);
protected: protected:
// SDP's stream name // SDP's stream name
@@ -93,6 +93,7 @@ protected:
unsigned mState; unsigned mState;
SrtpSuite mSrtpSuite; SrtpSuite mSrtpSuite;
int mSrtpTag = 1; // RFC 4568 tag of the negotiated crypto attribute
struct RemoteCodec struct RemoteCodec
{ {
RemoteCodec(MT::Codec::Factory* factory, int payloadType) RemoteCodec(MT::Codec::Factory* factory, int payloadType)
@@ -109,7 +110,7 @@ protected:
MT::Stream::MediaObserver* mMediaObserver = nullptr; MT::Stream::MediaObserver* mMediaObserver = nullptr;
void* mMediaObserverTag = nullptr; void* mMediaObserverTag = nullptr;
std::string createCryptoAttribute(SrtpSuite suite); std::string createCryptoAttribute(SrtpSuite suite, int tag);
void findRfc2833(const resip::SdpContents::Session::Medium::CodecContainer& codecs); void findRfc2833(const resip::SdpContents::Session::Medium::CodecContainer& codecs);
// Implements setState() logic. This allows to be called from constructor (it is not virtual function) // Implements setState() logic. This allows to be called from constructor (it is not virtual function)
+23 -6
View File
@@ -470,7 +470,9 @@ void UserAgent::process()
// Send generated packet via provider's method to allow custom scheme of encryption // Send generated packet via provider's method to allow custom scheme of encryption
ICELogDebug(<<"Sending ICE packet to " << buffer->remoteAddress().toStdString() << " with " << buffer->comment()); ICELogDebug(<<"Sending ICE packet to " << buffer->remoteAddress().toStdString() << " with " << buffer->comment());
PDatagramSocket s = iceComponentId == ICE_RTP_ID ? stream.socket4().mRtp : stream.socket4().mRtcp; RtpPair<PDatagramSocket>& pair = buffer->remoteAddress().family() == AF_INET6 ? stream.socket6() : stream.socket4();
PDatagramSocket s = iceComponentId == ICE_RTP_ID ? pair.mRtp : pair.mRtcp;
if (s)
stream.provider()->sendData(s, buffer->remoteAddress(), buffer->data(), buffer->size()); stream.provider()->sendData(s, buffer->remoteAddress(), buffer->data(), buffer->size());
break; break;
} }
@@ -805,7 +807,10 @@ void UserAgent::onEarlyMedia(resip::ClientInviteSessionHandle h, const resip::Si
/// called when dialog enters the Early state - typically after getting 18x /// called when dialog enters the Early state - typically after getting 18x
void UserAgent::onProvisional(resip::ClientInviteSessionHandle h, const resip::SipMessage& msg) void UserAgent::onProvisional(resip::ClientInviteSessionHandle h, const resip::SipMessage& msg)
{ {
PSession s = getUserSession(CAST2RESIPSESSION(h)->mSessionId); ResipSession* rs = CAST2RESIPSESSION(h);
if (!rs)
return;
PSession s = getUserSession(rs->mSessionId);
if (!s) if (!s)
return; return;
@@ -821,7 +826,10 @@ void UserAgent::onProvisional(resip::ClientInviteSessionHandle h, const resip::S
/// called when a dialog initiated as a UAC enters the connected state /// called when a dialog initiated as a UAC enters the connected state
void UserAgent::onConnected(resip::ClientInviteSessionHandle h, const resip::SipMessage& msg) void UserAgent::onConnected(resip::ClientInviteSessionHandle h, const resip::SipMessage& msg)
{ {
PSession s = getUserSession(CAST2RESIPSESSION(h)->mSessionId); ResipSession* rs = CAST2RESIPSESSION(h);
if (!rs)
return;
PSession s = getUserSession(rs->mSessionId);
if (!s) if (!s)
return; return;
@@ -874,7 +882,10 @@ void UserAgent::onConnected(resip::InviteSessionHandle h, const resip::SipMessag
void UserAgent::onTerminated(resip::InviteSessionHandle h, resip::InviteSessionHandler::TerminatedReason reason, const resip::SipMessage* related) void UserAgent::onTerminated(resip::InviteSessionHandle h, resip::InviteSessionHandler::TerminatedReason reason, const resip::SipMessage* related)
{ {
PSession s = getUserSession(CAST2RESIPSESSION(h)->mSessionId); ResipSession* rs = CAST2RESIPSESSION(h);
if (!rs)
return;
PSession s = getUserSession(rs->mSessionId);
if (!s) if (!s)
return; return;
@@ -920,6 +931,8 @@ void UserAgent::onAnswer(resip::InviteSessionHandle h, const resip::SipMessage&
if (!resipSession) if (!resipSession)
return; return;
Session* s = resipSession->session(); Session* s = resipSession->session();
if (!s)
return;
bool iceAvailable = true; bool iceAvailable = true;
@@ -1069,7 +1082,8 @@ void UserAgent::onAnswer(resip::InviteSessionHandle h, const resip::SipMessage&
/// Called when an SDP offer is received - must send an answer soon after this /// Called when an SDP offer is received - must send an answer soon after this
void UserAgent::onOffer(resip::InviteSessionHandle h, const resip::SipMessage& msg, const resip::SdpContents& sdp) void UserAgent::onOffer(resip::InviteSessionHandle h, const resip::SipMessage& msg, const resip::SdpContents& sdp)
{ {
PSession s = getUserSession(CAST2RESIPSESSION(h)->mSessionId); ResipSession* rs = CAST2RESIPSESSION(h);
PSession s = rs ? getUserSession(rs->mSessionId) : PSession();
if (!s) if (!s)
{ {
h->reject(488); h->reject(488);
@@ -1091,7 +1105,8 @@ void UserAgent::onOffer(resip::InviteSessionHandle h, const resip::SipMessage& m
uint64_t version = sdp.session().origin().getVersion(); uint64_t version = sdp.session().origin().getVersion();
std::string remoteIp = sdp.session().connection().getAddress().c_str(); std::string remoteIp = sdp.session().connection().getAddress().c_str();
int code; // Default to 200: a retransmitted offer (same origin version) keeps the session.
int code = 200;
if ((uint64_t)-1 == s->mRemoteOriginVersion) if ((uint64_t)-1 == s->mRemoteOriginVersion)
{ {
code = s->processSdp(version, iceAvailable, icePwd, iceUfrag, remoteIp, sdp.session().media()); code = s->processSdp(version, iceAvailable, icePwd, iceUfrag, remoteIp, sdp.session().media());
@@ -1299,6 +1314,8 @@ void UserAgent::onPresenceUpdate(PClientObserver observer, const std::string& pe
void UserAgent::onNewSubscription(resip::ServerSubscriptionHandle h, const resip::SipMessage& sub) void UserAgent::onNewSubscription(resip::ServerSubscriptionHandle h, const resip::SipMessage& sub)
{ {
ResipSession* s = CAST2RESIPSESSION(h); ResipSession* s = CAST2RESIPSESSION(h);
if (!s)
return;
// Get the event package name // Get the event package name
const char* event = sub.header(resip::h_Event).value().c_str(); const char* event = sub.header(resip::h_Event).value().c_str();
+12 -4
View File
@@ -346,6 +346,10 @@ void Session::stop()
// Free socket // Free socket
SocketHeap::instance().freeSocketPair( dataStream.socket4() ); SocketHeap::instance().freeSocketPair( dataStream.socket4() );
SocketHeap::instance().freeSocketPair( dataStream.socket6() ); SocketHeap::instance().freeSocketPair( dataStream.socket6() );
// Drop the references so the destructor's cleanup does not free them again
dataStream.setSocket4(RtpPair<PDatagramSocket>());
dataStream.setSocket6(RtpPair<PDatagramSocket>());
} }
} }
@@ -475,7 +479,7 @@ void Session::getSessionInfo(Session::InfoOptions options, VariantMap& info)
if (stat.mReceivedRtp) if (stat.mReceivedRtp)
info[SessionInfo_PacketLoss] = static_cast<int>((stat.mPacketLoss * 1000) / stat.mReceivedRtp); info[SessionInfo_PacketLoss] = static_cast<int>((stat.mPacketLoss * 1000) / stat.mReceivedRtp);
if (media) if (media && mIceStack)
info[SessionInfo_AudioPeer] = mIceStack->remoteAddress(media->iceInfo().mStreamId, media->iceInfo().mComponentId.mRtp).toStdString(); info[SessionInfo_AudioPeer] = mIceStack->remoteAddress(media->iceInfo().mStreamId, media->iceInfo().mComponentId.mRtp).toStdString();
info[SessionInfo_Jitter] = stat.mJitter; info[SessionInfo_Jitter] = stat.mJitter;
@@ -485,7 +489,8 @@ void Session::getSessionInfo(Session::InfoOptions options, VariantMap& info)
info[SessionInfo_BitrateSwitchCounter] = stat.mBitrateSwitchCounter; info[SessionInfo_BitrateSwitchCounter] = stat.mBitrateSwitchCounter;
info[SessionInfo_CngCounter] = stat.mCng; info[SessionInfo_CngCounter] = stat.mCng;
#endif #endif
info[SessionInfo_SSRC] = stat.mSsrc; // Variant stores VTYPE_INT here; keep the 32 bits (consumers read it back with asInt()).
info[SessionInfo_SSRC] = static_cast<int>(stat.mSsrc);
info[SessionInfo_RemotePeer] = stat.mRemotePeer.toStdString(); info[SessionInfo_RemotePeer] = stat.mRemotePeer.toStdString();
} }
@@ -741,9 +746,12 @@ PDataProvider Session::findProviderByPort(int family, unsigned short port)
{ {
Stream& s = mStreamList[i]; Stream& s = mStreamList[i];
if ((s.socket4().mRtp->localport() == port || s.socket4().mRtcp->localport() == port) && family == AF_INET) // Sockets may not be allocated yet (stream created from SDP, sockets follow later)
if (family == AF_INET && s.socket4().mRtp && s.socket4().mRtcp &&
(s.socket4().mRtp->localport() == port || s.socket4().mRtcp->localport() == port))
return s.provider(); return s.provider();
if ((s.socket6().mRtp->localport() == port || s.socket6().mRtcp->localport() == port) && family == AF_INET6) if (family == AF_INET6 && s.socket6().mRtp && s.socket6().mRtcp &&
(s.socket6().mRtp->localport() == port || s.socket6().mRtcp->localport() == port))
return s.provider(); return s.provider();
} }
+11
View File
@@ -5,6 +5,10 @@ bool IuUP::TwoBytePseudoheader = false;
bool IuUP::parse(const uint8_t *packet, int size, IuUP::Frame &result) bool IuUP::parse(const uint8_t *packet, int size, IuUP::Frame &result)
{ {
// Data-with-CRC frames carry a 4 byte header
if (size < 4)
return false;
// Wrap incoming packet in byte buffer // Wrap incoming packet in byte buffer
BitReader reader(packet, size); BitReader reader(packet, size);
@@ -45,6 +49,10 @@ bool IuUP::parse2(const uint8_t* packet, int size, Frame& result)
size -= 2; size -= 2;
} }
// Frame header is 3 bytes (no CRC) or 4 bytes (with CRC)
if (size < 3)
return false;
BitReader reader(packet, size); BitReader reader(packet, size);
result.mPduType = (PduType)reader.readBits(4); result.mPduType = (PduType)reader.readBits(4);
@@ -52,6 +60,9 @@ bool IuUP::parse2(const uint8_t* packet, int size, Frame& result)
if (result.mPduType != PduType::DataNoCrc && result.mPduType != PduType::DataWithCrc) if (result.mPduType != PduType::DataNoCrc && result.mPduType != PduType::DataWithCrc)
return false; return false;
if (result.mPduType == PduType::DataWithCrc && size < 4)
return false;
result.mFrameNumber = reader.readBits(4); result.mFrameNumber = reader.readBits(4);
result.mFqc = reader.readBits(2); result.mFqc = reader.readBits(2);
result.mRfci = reader.readBits(6); result.mRfci = reader.readBits(6);
+4 -2
View File
@@ -93,7 +93,8 @@ bool RtpHelper::isRtp(const void* buffer, size_t length)
bool RtpHelper::isRtpOrRtcp(const void* buffer, size_t length) bool RtpHelper::isRtpOrRtcp(const void* buffer, size_t length)
{ {
if (length < 12) // A minimal RTCP packet (e.g. an empty receiver report) is 8 bytes
if (length < 8)
return false; return false;
const RtcpHeader* h = reinterpret_cast<const RtcpHeader*>(buffer); const RtcpHeader* h = reinterpret_cast<const RtcpHeader*>(buffer);
return h->version == 2; return h->version == 2;
@@ -390,7 +391,8 @@ void RtpDump::add(const void* buffer, size_t len, uint32_t offsetMs)
if (!buffer || len == 0) if (!buffer || len == 0)
return; return;
if (len > MAX_RTP_PACKET_SIZE) // The record length field is 16-bit and covers payload + 8 byte header
if (len > MAX_RTP_PACKET_SIZE - 8)
throw std::runtime_error("Packet too large: " + std::to_string(len)); throw std::runtime_error("Packet too large: " + std::to_string(len));
RtpData entry; RtpData entry;
+18 -2
View File
@@ -97,7 +97,20 @@ RtpPair<PDatagramSocket> SocketHeap::allocSocketPair(int family, SocketSink *sin
rtcp = allocSocket(family, sink, rtp->localport() + 1); rtcp = allocSocket(family, sink, rtp->localport() + 1);
} }
catch(...) catch(...)
{} {
// Release a partially allocated pair before retrying - otherwise
// the RTP socket from this attempt leaks into the socket map.
if (rtp)
{
freeSocket(rtp);
rtp.reset();
}
if (rtcp)
{
freeSocket(rtcp);
rtcp.reset();
}
}
} }
if (!rtp || !rtcp) if (!rtp || !rtcp)
@@ -139,6 +152,9 @@ PDatagramSocket SocketHeap::allocSocket(int family, SocketSink* sink, int port)
sockaddr_in6 addr6; sockaddr_in6 addr6;
int result = 0; int result = 0;
int testport; int testport;
// A fixed port cannot be retried (it would loop forever if the port is
// owned by another process); random ports get a bounded number of attempts.
int attemptsLeft = port ? 1 : 100;
do do
{ {
testport = port ? port : rand() % ((mFinish - mStart) / 2) * 2 + mStart; testport = port ? port : rand() % ((mFinish - mStart) / 2) * 2 + mStart;
@@ -164,7 +180,7 @@ PDatagramSocket SocketHeap::allocSocket(int family, SocketSink* sink, int port)
break; break;
} }
} while (result == WSAEADDRINUSE); } while (result == WSAEADDRINUSE && --attemptsLeft > 0);
if (result) if (result)
{ {
+29 -24
View File
@@ -48,9 +48,12 @@ std::string strx::appendPath(const std::string& s1, const std::string& s2)
std::string strx::makeUtf8(const std::tstring &arg) std::string strx::makeUtf8(const std::tstring &arg)
{ {
#if defined(TARGET_WIN) #if defined(TARGET_WIN)
size_t required = WideCharToMultiByte(CP_UTF8, 0, arg.c_str(), -1, NULL, 0, NULL, NULL); int required = WideCharToMultiByte(CP_UTF8, 0, arg.c_str(), -1, NULL, 0, NULL, NULL);
char *result = (char*)_alloca(required + 1); if (required <= 0)
WideCharToMultiByte(CP_UTF8, 0, arg.c_str(), -1, result, required+1, NULL, NULL); return std::string();
std::string result(static_cast<size_t>(required), '\0');
WideCharToMultiByte(CP_UTF8, 0, arg.c_str(), -1, &result[0], required, NULL, NULL);
result.resize(strlen(result.c_str())); // strip the trailing NUL written by the API
return result; return result;
#else #else
return arg; return arg;
@@ -65,9 +68,12 @@ std::string strx::toUtf8(const std::tstring &arg)
std::tstring strx::makeTstring(const std::string& arg) std::tstring strx::makeTstring(const std::string& arg)
{ {
#if defined(TARGET_WIN) #if defined(TARGET_WIN)
size_t count = MultiByteToWideChar(CP_UTF8, 0, arg.c_str(), -1, NULL, 0); int count = MultiByteToWideChar(CP_UTF8, 0, arg.c_str(), -1, NULL, 0);
wchar_t* result = (wchar_t*)_alloca(count * 2); if (count <= 0)
MultiByteToWideChar(CP_UTF8, 0, arg.c_str(), -1, result, count); return std::tstring();
std::wstring result(static_cast<size_t>(count), L'\0');
MultiByteToWideChar(CP_UTF8, 0, arg.c_str(), -1, &result[0], count);
result.resize(wcslen(result.c_str())); // strip the trailing NUL written by the API
return result; return result;
#else #else
return arg; return arg;
@@ -93,11 +99,7 @@ int strx::toInt(const char *s, int defaultValue, bool* isOk)
uint64_t strx::toUint64(const char* s, uint64_t def, bool *isOk) uint64_t strx::toUint64(const char* s, uint64_t def, bool *isOk)
{ {
uint64_t result = def; uint64_t result = def;
#if defined(TARGET_WIN) if (sscanf(s, "%" SCNu64, &result) != 1)
if (sscanf(s, "%I64d", &result) != 1)
#else
if (sscanf(s, "%llu", &result) != 1)
#endif
{ {
if (isOk) if (isOk)
*isOk = false; *isOk = false;
@@ -143,7 +145,6 @@ std::string strx::toHex(const uint8_t* input, size_t inputLength)
*r++ = hexmap[hi]; *r++ = hexmap[hi];
*r++ = hexmap[low]; *r++ = hexmap[low];
} }
*r = 0;
return result; return result;
} }
@@ -171,11 +172,9 @@ std::string strx::doubleToString(double value, int precision)
const char* strx::findSubstring(const char* buffer, const char* substring, size_t bufferLength) const char* strx::findSubstring(const char* buffer, const char* substring, size_t bufferLength)
{ {
#if defined(TARGET_WIN) // The buffer is not necessarily NUL-terminated, so a bounded search is
return (const char*)strstr(buffer, substring); // required on every platform (a memmem replacement for MSVC is provided below).
#else
return (const char*)memmem(buffer, bufferLength, substring, strlen(substring)); return (const char*)memmem(buffer, bufferLength, substring, strlen(substring));
#endif
} }
@@ -332,7 +331,7 @@ std::string strx::fromHex2String(const std::string& s)
std::string result; result.resize(s.size() / 2); std::string result; result.resize(s.size() / 2);
const char* t = s.c_str(); const char* t = s.c_str();
for (size_t i = 0; i < result.size(); i++) for (size_t i = 0; i < result.size(); i++)
result[i] = hex2code(t[i*2]); result[i] = static_cast<char>((hex2code(t[i*2]) << 4) | hex2code(t[i*2+1]));
return result; return result;
} }
@@ -367,12 +366,13 @@ std::string strx::decodeUri(const std::string& s)
char ch; char ch;
int i, ii; int i, ii = 0;
for (i=0; i<(int)s.length(); i++) for (i=0; i<(int)s.length(); i++)
{ {
if (s[i] == 37) if (s[i] == '%' && i + 2 < (int)s.length())
{
if (sscanf(s.substr(i+1,2).c_str(), "%x", &ii) == 1)
{ {
sscanf(s.substr(i+1,2).c_str(), "%x", &ii);
ch = static_cast<char>(ii); ch = static_cast<char>(ii);
ret += ch; ret += ch;
i += 2; i += 2;
@@ -380,19 +380,24 @@ std::string strx::decodeUri(const std::string& s)
else else
ret += s[i]; ret += s[i];
} }
else
ret += s[i];
}
return ret; return ret;
} }
bool strx::startsWith(const std::string& s, const std::string& prefix) bool strx::startsWith(const std::string& s, const std::string& prefix)
{ {
std::string::size_type p = s.find(prefix); if (prefix.size() > s.size())
return p == 0; return false;
return s.compare(0, prefix.size(), prefix) == 0;
} }
bool strx::endsWith(const std::string& s, const std::string& suffix) bool strx::endsWith(const std::string& s, const std::string& suffix)
{ {
std::string::size_type p = s.rfind(suffix); if (suffix.size() > s.size())
return (p == s.size() - suffix.size()); return false;
return s.compare(s.size() - suffix.size(), suffix.size(), suffix) == 0;
} }
int strx::stringToDuration(const std::string& s) int strx::stringToDuration(const std::string& s)
+9 -9
View File
@@ -20,7 +20,8 @@
void SyncHelper::delay(unsigned int microseconds) void SyncHelper::delay(unsigned int microseconds)
{ {
#ifdef TARGET_WIN #ifdef TARGET_WIN
::Sleep(microseconds/1000); // Round up so sub-millisecond delays do not become Sleep(0)
::Sleep((microseconds + 999) / 1000);
#endif #endif
#if defined(TARGET_OSX) || defined(TARGET_LINUX) #if defined(TARGET_OSX) || defined(TARGET_LINUX)
timespec requested, remaining; timespec requested, remaining;
@@ -93,8 +94,9 @@ uint32_t chronox::getDelta(uint32_t later, uint32_t earlier)
if (later > earlier) if (later > earlier)
return later - earlier; return later - earlier;
// Counter wrapped: unsigned subtraction yields the correct modulo-2^32 delta
if (later < earlier && later < 0x7FFFFFFF && earlier >= 0x7FFFFFFF) if (later < earlier && later < 0x7FFFFFFF && earlier >= 0x7FFFFFFF)
return 0xFFFFFFFF - earlier + later; return later - earlier;
return 0; return 0;
} }
@@ -115,8 +117,8 @@ uint64_t chronox::toTimestamp(const timeval& ts)
int64_t chronox::getDelta(const timespec& a, const timespec& b) int64_t chronox::getDelta(const timespec& a, const timespec& b)
{ {
uint64_t ms_a = (uint64_t)a.tv_sec * 1000 + a.tv_nsec / 10000000; int64_t ms_a = (int64_t)a.tv_sec * 1000 + a.tv_nsec / 1000000;
uint64_t ms_b = (uint64_t)b.tv_sec * 1000 + b.tv_nsec / 10000000; int64_t ms_b = (int64_t)b.tv_sec * 1000 + b.tv_nsec / 1000000;
return ms_a - ms_b; return ms_a - ms_b;
} }
@@ -162,13 +164,11 @@ void BufferQueue::push(const void* data, int bytes)
BufferQueue::PBlock BufferQueue::pull(int milliseconds) BufferQueue::PBlock BufferQueue::pull(int milliseconds)
{ {
std::unique_lock<std::mutex> l(mMutex); std::unique_lock<std::mutex> l(mMutex);
std::cv_status status = mBlockList.empty() ? std::cv_status::timeout : std::cv_status::no_timeout; mSignal.wait_for(l, std::chrono::milliseconds(milliseconds),
[this]() { return !mBlockList.empty(); });
if (mBlockList.empty())
status = mSignal.wait_for(l, std::chrono::milliseconds(milliseconds));
PBlock r; PBlock r;
if (status == std::cv_status::no_timeout && !mBlockList.empty()) if (!mBlockList.empty())
{ {
r = mBlockList.front(); r = mBlockList.front();
mBlockList.pop_front(); mBlockList.pop_front();
+27
View File
@@ -265,7 +265,19 @@ PCodec AmrNbCodec::CodecFactory::create()
AmrNbCodec::AmrNbCodec(const AmrCodecConfig& config) AmrNbCodec::AmrNbCodec(const AmrCodecConfig& config)
:mConfig(config) :mConfig(config)
{ {
// Contexts are created lazily (see ensureEncoder/ensureDecoder) - a codec
// resolved only for network-MOS metadata never allocates them.
}
void AmrNbCodec::ensureEncoder()
{
if (!mEncoderCtx)
mEncoderCtx = Encoder_Interface_init(1); mEncoderCtx = Encoder_Interface_init(1);
}
void AmrNbCodec::ensureDecoder()
{
if (!mDecoderCtx)
mDecoderCtx = Decoder_Interface_init(); mDecoderCtx = Decoder_Interface_init();
} }
@@ -298,6 +310,8 @@ Codec::Info AmrNbCodec::info()
Codec::EncodeResult AmrNbCodec::encode(std::span<const uint8_t> input, std::span<uint8_t> output) Codec::EncodeResult AmrNbCodec::encode(std::span<const uint8_t> input, std::span<uint8_t> output)
{ {
ensureEncoder();
if (input.size_bytes() % pcmLength()) if (input.size_bytes() % pcmLength())
return {.mEncoded = 0}; return {.mEncoded = 0};
@@ -324,6 +338,8 @@ Codec::EncodeResult AmrNbCodec::encode(std::span<const uint8_t> input, std::span
#define AMR_BITRATE_DTX 15 #define AMR_BITRATE_DTX 15
Codec::DecodeResult AmrNbCodec::decode(std::span<const uint8_t> input, std::span<uint8_t> output) Codec::DecodeResult AmrNbCodec::decode(std::span<const uint8_t> input, std::span<uint8_t> output)
{ {
ensureDecoder();
if (mConfig.mOctetAligned) if (mConfig.mOctetAligned)
return {.mDecoded = 0}; return {.mDecoded = 0};
@@ -427,6 +443,8 @@ Codec::DecodeResult AmrNbCodec::decode(std::span<const uint8_t> input, std::span
size_t AmrNbCodec::plc(int lostFrames, std::span<uint8_t> output) size_t AmrNbCodec::plc(int lostFrames, std::span<uint8_t> output)
{ {
ensureDecoder();
if (output.size_bytes() < lostFrames * pcmLength()) if (output.size_bytes() < lostFrames * pcmLength())
return 0; return 0;
@@ -496,6 +514,13 @@ AmrWbStatistics MT::GAmrWbStatistics;
AmrWbCodec::AmrWbCodec(const AmrCodecConfig& config) AmrWbCodec::AmrWbCodec(const AmrCodecConfig& config)
:mConfig(config) :mConfig(config)
{ {
// Decoder context is created lazily (see ensureDecoder) - a codec resolved
// only for network-MOS metadata never allocates the AMR-WB decoder state.
}
void AmrWbCodec::ensureDecoder()
{
if (!mDecoderCtx)
mDecoderCtx = D_IF_init(); mDecoderCtx = D_IF_init();
} }
@@ -630,6 +655,8 @@ Codec::DecodeResult AmrWbCodec::decodePlain(std::span<const uint8_t> input, std:
Codec::DecodeResult AmrWbCodec::decode(std::span<const uint8_t> input, std::span<uint8_t> output) Codec::DecodeResult AmrWbCodec::decode(std::span<const uint8_t> input, std::span<uint8_t> output)
{ {
ensureDecoder();
if (mConfig.mIuUP) if (mConfig.mIuUP)
return decodeIuup(input, output); return decodeIuup(input, output);
else else
+11
View File
@@ -33,6 +33,13 @@ protected:
int mPreviousPacketLength = 0; int mPreviousPacketLength = 0;
size_t mCngCounter = 0; size_t mCngCounter = 0;
size_t mSwitchCounter = 0; size_t mSwitchCounter = 0;
// opencore-amr encoder/decoder state is allocated lazily on first encode/decode.
// Network-MOS-only streams resolve codec metadata (name/samplerate/frame timing)
// but never decode, so they must not pay for a context they never use - at scale
// this is ~a decoder state (several KB) saved per network-only stream.
void ensureEncoder();
void ensureDecoder();
public: public:
class CodecFactory: public Factory class CodecFactory: public Factory
{ {
@@ -85,6 +92,10 @@ protected:
int mPreviousPacketLength; int mPreviousPacketLength;
// Decoder state is allocated lazily on first decode/plc (see AmrNbCodec) so
// network-MOS-only streams never instantiate the AMR-WB decoder.
void ensureDecoder();
DecodeResult decodeIuup(std::span<const uint8_t> input, std::span<uint8_t> output); DecodeResult decodeIuup(std::span<const uint8_t> input, std::span<uint8_t> output);
DecodeResult decodePlain(std::span<const uint8_t> input, std::span<uint8_t> output); DecodeResult decodePlain(std::span<const uint8_t> input, std::span<uint8_t> output);
+103 -63
View File
@@ -24,6 +24,7 @@
#include <memory.h> #include <memory.h>
#include <string.h> #include <string.h>
#include <algorithm> #include <algorithm>
#include <vector>
#define LOG_SUBSYSTEM "media" #define LOG_SUBSYSTEM "media"
@@ -434,9 +435,10 @@ Codec::Info OpusCodec::info() {
Codec::EncodeResult OpusCodec::encode(std::span<const uint8_t> input, std::span<uint8_t> output) Codec::EncodeResult OpusCodec::encode(std::span<const uint8_t> input, std::span<uint8_t> output)
{ {
// Send number of samples for input and number of bytes for output // opus_encode() takes the frame size in samples per channel and the output
// capacity in bytes.
int written = opus_encode(mEncoderCtx, (const opus_int16*)input.data(), input.size_bytes() / (sizeof(short) * channels()), int written = opus_encode(mEncoderCtx, (const opus_int16*)input.data(), input.size_bytes() / (sizeof(short) * channels()),
output.data(), output.size_bytes() / (sizeof(short) * channels())); output.data(), output.size_bytes());
if (written < 0) if (written < 0)
return {.mEncoded = 0}; return {.mEncoded = 0};
else else
@@ -445,10 +447,13 @@ Codec::EncodeResult OpusCodec::encode(std::span<const uint8_t> input, std::span<
Codec::DecodeResult OpusCodec::decode(std::span<const uint8_t> input, std::span<uint8_t> output) Codec::DecodeResult OpusCodec::decode(std::span<const uint8_t> input, std::span<uint8_t> output)
{ {
int result = 0; if (input.empty())
return {0};
// Examine the number of channels available in incoming packet // Examine the number of channels available in incoming packet
int nr_of_channels = opus_packet_get_nb_channels(input.data()); int nr_of_channels = opus_packet_get_nb_channels(input.data());
if (nr_of_channels != 1 && nr_of_channels != 2)
return {0};
// Recreate decoder if needed // Recreate decoder if needed
if (mDecoderChannels != nr_of_channels) if (mDecoderChannels != nr_of_channels)
@@ -473,80 +478,97 @@ Codec::DecodeResult OpusCodec::decode(std::span<const uint8_t> input, std::span<
if (nr_of_frames <= 0) if (nr_of_frames <= 0)
return {0}; return {0};
// We support stereo and mono here. // Output must match channels() - that is what info() promises downstream.
int buffer_capacity = nr_of_frames * sizeof(opus_int16) * nr_of_channels; size_t needed = (size_t)nr_of_frames * sizeof(opus_int16) * channels();
opus_int16 *buffer_decode = (opus_int16 *)alloca(buffer_capacity); if (needed > output.size_bytes())
return {0};
if (nr_of_channels == channels())
{
int decoded = opus_decode(mDecoderCtx, input.data(), input.size_bytes(), int decoded = opus_decode(mDecoderCtx, input.data(), input.size_bytes(),
buffer_decode, nr_of_frames, 0); (opus_int16*)output.data(), nr_of_frames, 0);
if (decoded < 0)
{
ICELogCritical(<< "opus_decode() returned " << decoded);
return {0};
}
return {.mDecoded = (size_t)decoded * sizeof(opus_int16) * nr_of_channels};
}
// Channel count differs from the negotiated one - decode to a temporary
// buffer and convert.
std::vector<opus_int16> temp((size_t)nr_of_frames * nr_of_channels);
int decoded = opus_decode(mDecoderCtx, input.data(), input.size_bytes(),
temp.data(), nr_of_frames, 0);
if (decoded < 0) if (decoded < 0)
{ {
ICELogCritical(<< "opus_decode() returned " << decoded); ICELogCritical(<< "opus_decode() returned " << decoded);
return {0}; return {0};
} }
opus_int16 *buffer_stereo = nullptr; opus_int16* out = (opus_int16*)output.data();
int buffer_stereo_capacity = buffer_capacity * 2; if (channels() == 2 && nr_of_channels == 1)
{
switch (nr_of_channels) { for (int i = 0; i < decoded; i++)
case 1: out[i * 2] = out[i * 2 + 1] = temp[i];
// Convert to stereo before return {.mDecoded = (size_t)decoded * sizeof(opus_int16) * 2};
buffer_stereo = (opus_int16 *) alloca(buffer_stereo_capacity);
for (int i = 0; i < nr_of_frames; i++) {
buffer_stereo[i * 2 + 1] = buffer_decode[i];
buffer_stereo[i * 2] = buffer_decode[i];
} }
assert(buffer_stereo_capacity <= output.size_bytes()); else // mono negotiated, stereo packet
memcpy(output.data(), buffer_stereo, buffer_stereo_capacity); {
result = buffer_stereo_capacity; for (int i = 0; i < decoded; i++)
break; out[i] = (opus_int16)((int(temp[i * 2]) + temp[i * 2 + 1]) / 2);
return {.mDecoded = (size_t)decoded * sizeof(opus_int16)};
case 2:
assert(buffer_capacity <= output.size_bytes());
memcpy(output.data(), buffer_decode, buffer_capacity);
result = buffer_capacity;
break;
default:
assert(0);
} }
return {.mDecoded = (size_t)result};
} }
size_t OpusCodec::plc(int lostPackets, std::span<uint8_t> output) size_t OpusCodec::plc(int lostPackets, std::span<uint8_t> output)
{ {
// Find how much frames do we need to produce and prefill it with silence if (lostPackets <= 0 || output.empty())
int frames_per_packet = (int)pcmLength() / (sizeof(opus_int16) * channels()); return 0;
memset(output.data(), 0, output.size_bytes());
// Use this pointer as output // Total bytes we are asked to conceal, clamped to the output capacity.
opus_int16* data_output = reinterpret_cast<opus_int16*>(output.data()); size_t packet_bytes = (size_t)pcmLength();
size_t total = std::min(output.size_bytes(), packet_bytes * (size_t)lostPackets);
memset(output.data(), 0, total);
int nr_of_decoded_frames = 0; // No decoder yet (PLC before the first decoded packet) - leave silence.
if (!mDecoderCtx || (mDecoderChannels != 1 && mDecoderChannels != 2))
return total;
// Buffer for single lost frame int samples_per_packet = (int)(packet_bytes / (sizeof(opus_int16) * channels()));
opus_int16* buffer_plc = (opus_int16*)alloca(frames_per_packet * mDecoderChannels * sizeof(opus_int16)); if (samples_per_packet <= 0)
for (int i=0; i<lostPackets; i++) return total;
opus_int16* out = reinterpret_cast<opus_int16*>(output.data());
std::vector<opus_int16> temp((size_t)samples_per_packet * mDecoderChannels);
for (int packet = 0; packet < lostPackets; packet++)
{ {
nr_of_decoded_frames = opus_decode(mDecoderCtx, nullptr, 0, buffer_plc, frames_per_packet, 0); size_t offset_bytes = (size_t)packet * packet_bytes;
assert(nr_of_decoded_frames == frames_per_packet); if (offset_bytes + packet_bytes > total)
switch (mDecoderChannels)
{
case 1:
// Convert mono to stereo
for (int i=0; i < nr_of_decoded_frames; i++)
data_output[i * 2] = data_output[i * 2 + 1] = buffer_plc[i];
data_output += frames_per_packet * mChannels;
break; break;
case 2: int decoded = opus_decode(mDecoderCtx, nullptr, 0, temp.data(), samples_per_packet, 0);
// Just copy data if (decoded <= 0)
memcpy(data_output, buffer_plc, frames_per_packet * sizeof(opus_int16) * mDecoderChannels); break; // keep the pre-filled silence
data_output += frames_per_packet * mChannels;
break; opus_int16* dst = out + offset_bytes / sizeof(opus_int16);
if (mDecoderChannels == channels())
{
memcpy(dst, temp.data(), (size_t)decoded * sizeof(opus_int16) * mDecoderChannels);
}
else if (channels() == 2 && mDecoderChannels == 1)
{
for (int i = 0; i < decoded; i++)
dst[i * 2] = dst[i * 2 + 1] = temp[i];
}
else // mono negotiated, stereo decoder
{
for (int i = 0; i < decoded; i++)
dst[i] = (opus_int16)((int(temp[i * 2]) + temp[i * 2 + 1]) / 2);
} }
} }
return ((uint8_t*)data_output - output.data()); return total;
} }
size_t OpusCodec::getNumberOfSamples(std::span<const uint8_t> payload) size_t OpusCodec::getNumberOfSamples(std::span<const uint8_t> payload)
@@ -1021,14 +1043,29 @@ Codec::EncodeResult GsmCodec::encode(std::span<const uint8_t> input, std::span<u
Codec::DecodeResult GsmCodec::decode(std::span<const uint8_t> input, std::span<uint8_t> output) Codec::DecodeResult GsmCodec::decode(std::span<const uint8_t> input, std::span<uint8_t> output)
{ {
if (input.size_bytes() % rtpLength() != 0) const size_t frameSize = (size_t)rtpLength();
if (!frameSize || input.size_bytes() % frameSize != 0)
return {.mDecoded = 0}; return {.mDecoded = 0};
int i=0; // Bytes_65 carries a WAV49 frame pair (33 + 32 bytes) and produces 320 samples
for (i = 0; i < input.size_bytes() / rtpLength(); i++) const size_t pcmPerFrame = (mCodecType == Type::Bytes_65) ? 640 : 320;
gsm_decode(mGSM, (gsm_byte *)input.data() + 33 * i, (gsm_signal *)output.data() + 160 * i); size_t frames = input.size_bytes() / frameSize;
return {.mDecoded = (size_t)i * 320}; size_t i;
for (i = 0; i < frames; i++)
{
if ((i + 1) * pcmPerFrame > output.size_bytes())
break;
const gsm_byte* in = (const gsm_byte*)input.data() + frameSize * i;
gsm_signal* out = (gsm_signal*)output.data() + (pcmPerFrame / 2) * i;
gsm_decode(mGSM, (gsm_byte*)in, out);
if (mCodecType == Type::Bytes_65)
gsm_decode(mGSM, (gsm_byte*)(in + 33), out + 160);
}
return {.mDecoded = i * pcmPerFrame};
} }
size_t GsmCodec::plc(int lostFrames, std::span<uint8_t> output) size_t GsmCodec::plc(int lostFrames, std::span<uint8_t> output)
@@ -1327,8 +1364,11 @@ hr_ref_from_canon(uint16_t *hr_ref, const uint8_t *canon)
*/ */
Codec::DecodeResult GsmHrCodec::decode(std::span<const uint8_t> input, std::span<uint8_t> output) Codec::DecodeResult GsmHrCodec::decode(std::span<const uint8_t> input, std::span<uint8_t> output)
{ {
ByteBuffer bb(input, ByteBuffer::CopyBehavior::UseExternal); // hr_ref_from_canon() reads 112 bits (14 bytes) starting at offset 1,
BitReader br(bb); // and the decoder produces 160 samples (320 bytes).
if (input.size_bytes() < 15 || output.size_bytes() < 320)
return {.mDecoded = 0};
uint16_t hr_ref[22]; uint16_t hr_ref[22];
hr_ref_from_canon(hr_ref, input.data() + 1); hr_ref_from_canon(hr_ref, input.data() + 1);
+212 -201
View File
@@ -28,8 +28,7 @@ using namespace MT;
// ----------------- RtpBuffer::Packet -------------- // ----------------- RtpBuffer::Packet --------------
RtpBuffer::Packet::Packet(const std::shared_ptr<RTPPacket>& packet, std::chrono::milliseconds timelength, int samplerate) RtpBuffer::Packet::Packet(const std::shared_ptr<RTPPacket>& packet, std::chrono::milliseconds timelength, int samplerate)
:mRtp(packet), mTimelength(timelength), mSamplerate(samplerate) :mRtp(packet), mTimelength(timelength), mSamplerate(samplerate)
{ {}
}
std::shared_ptr<RTPPacket> RtpBuffer::Packet::rtp() const std::shared_ptr<RTPPacket> RtpBuffer::Packet::rtp() const
{ {
@@ -60,12 +59,11 @@ std::vector<short>& RtpBuffer::Packet::pcm()
RtpBuffer::RtpBuffer(Statistics& stat) RtpBuffer::RtpBuffer(Statistics& stat)
:mStat(stat) :mStat(stat)
{ {
if (mStat.mPacketLoss)
std::cout << "Warning: packet loss is not zero" << std::endl;
} }
RtpBuffer::~RtpBuffer() RtpBuffer::~RtpBuffer()
{ {
if (mAddCounter)
ICELogDebug(<< "Number of add packets: " << mAddCounter << ", number of retrieved packets " << mReturnedCounter); ICELogDebug(<< "Number of add packets: " << mAddCounter << ", number of retrieved packets " << mReturnedCounter);
} }
@@ -126,19 +124,19 @@ std::shared_ptr<RtpBuffer::Packet> RtpBuffer::add(const std::shared_ptr<jrtplib:
mStat.mPacketInterval.process(t - mLastAddTime); mStat.mPacketInterval.process(t - mLastAddTime);
mLastAddTime = t; mLastAddTime = t;
} }
mStat.mSsrc = static_cast<uint16_t>(packet->GetSSRC()); mStat.mSsrc = packet->GetSSRC();
// Update jitter // Update jitter
ICELogMedia(<< "Adding new packet into jitter buffer"); ICELogMedia(<< "Adding new packet seqno " << packet->GetSequenceNumber() << " into jitter buffer");
mAddCounter++; mAddCounter++;
// Look for maximum&minimal sequence number; check for dublicates // Look for maximum&minimal sequence number; check for dublicates
unsigned maxno = 0xFFFFFFFF, minno = 0; unsigned maxno = 0, minno = 0xFFFFFFFF;
// New sequence number // New sequence number
unsigned newSeqno = packet->GetExtendedSequenceNumber(); unsigned newSeqno = packet->GetExtendedSequenceNumber();
for (std::shared_ptr<Packet>& p: mPacketList) for (auto& p: mPacketList)
{ {
unsigned seqno = p->rtp()->GetExtendedSequenceNumber(); unsigned seqno = p->rtp()->GetExtendedSequenceNumber();
@@ -171,7 +169,7 @@ std::shared_ptr<RtpBuffer::Packet> RtpBuffer::add(const std::shared_ptr<jrtplib:
available = findTimelength(); available = findTimelength();
if (available > mHigh) if (available > mHigh)
ICELogMedia(<< "Available " << available << "ms with limit " << mHigh << "ms"); ICELogMedia(<< "Available " << available << " with limit " << mHigh);
return p; return p;
} }
@@ -186,16 +184,18 @@ std::shared_ptr<RtpBuffer::Packet> RtpBuffer::add(const std::shared_ptr<jrtplib:
return std::shared_ptr<Packet>(); return std::shared_ptr<Packet>();
} }
RtpBuffer::FetchResult RtpBuffer::fetch() void RtpBuffer::trimToHighWater(size_t maxPackets)
{ {
Lock l(mGuard); Lock l(mGuard);
FetchResult result;
// See if there is enough information in buffer
auto total = findTimelength(); auto total = findTimelength();
while (total > mHigh && mPacketList.size() > 1 && 0ms != mHigh) // Drop the oldest packet while either bound is exceeded: the time-based
// high-water mark (mHigh, when set) or, if maxPackets != 0, the packet-count
// cap. Always keep at least one packet so loss/gap accounting has a reference.
while (mPacketList.size() > 1 &&
((0ms != mHigh && total > mHigh) ||
(maxPackets != 0 && mPacketList.size() > maxPackets)))
{ {
ICELogMedia( << "Dropping RTP packets from jitter buffer"); ICELogMedia( << "Dropping RTP packets from jitter buffer");
total -= mPacketList.front()->timelength(); total -= mPacketList.front()->timelength();
@@ -235,6 +235,19 @@ RtpBuffer::FetchResult RtpBuffer::fetch()
// Increase number in statistics // Increase number in statistics
mStat.mPacketDropped++; mStat.mPacketDropped++;
} }
}
RtpBuffer::FetchResult RtpBuffer::fetch()
{
Lock l(mGuard);
FetchResult result;
// Bound the buffer to the high-water mark before fetching.
trimToHighWater();
// See how much audio is buffered now.
auto total = findTimelength();
if (total < mLow || total == 0ms) if (total < mLow || total == 0ms)
{ {
@@ -346,33 +359,32 @@ int RtpBuffer::getNumberOfAddPackets() const
//-------------- Receiver --------------- //-------------- Receiver ---------------
Receiver::Receiver(Statistics& stat) Receiver::Receiver(Statistics& stat)
:mStat(stat) :mStat(stat)
{ {}
}
Receiver::~Receiver() Receiver::~Receiver()
{ {}
}
//-------------- AudioReceiver ---------------- //-------------- AudioReceiver ----------------
AudioReceiver::AudioReceiver(const CodecList::Settings& settings, MT::Statistics &stat) AudioReceiver::AudioReceiver(const CodecList::Settings& settings, MT::Statistics &stat)
:Receiver(stat), mBuffer(stat), mDtmfBuffer(stat), mCodecSettings(settings), mCodecList(settings), mDtmfReceiver(stat) :Receiver(stat), mRtpBuffer(stat), mDtmfBuffer(stat), mCodecSettings(settings), mCodecList(settings), mDtmfReceiver(stat)
{ {
// Init resamplers
mResampler8.start(AUDIO_CHANNELS, 8000, AUDIO_SAMPLERATE);
mResampler16.start(AUDIO_CHANNELS, 16000, AUDIO_SAMPLERATE);
mResampler32.start(AUDIO_CHANNELS, 32000, AUDIO_SAMPLERATE);
mResampler48.start(AUDIO_CHANNELS, 48000, AUDIO_SAMPLERATE);
// Init codecs // Init codecs
mCodecList.setSettings(settings); mCodecList.setSettings(settings);
mCodecList.fillCodecMap(mCodecMap); mCodecList.fillCodecMap(mCodecMap);
mAvailable.setCapacity(AUDIO_SAMPLERATE * sizeof(short));
mDtmfBuffer.setPrebuffer(0ms); mDtmfBuffer.setPrebuffer(0ms);
mDtmfBuffer.setLow(0ms); mDtmfBuffer.setLow(0ms);
mDtmfBuffer.setHigh(1ms); mDtmfBuffer.setHigh(1ms);
// Avoid collecting too much data
mRtpBuffer.setHigh(240ms);
// Resamplers are lazy inside; there is no actual memory allocation
mResampler8.start(AUDIO_CHANNELS, 8000, AUDIO_SAMPLERATE);
mResampler16.start(AUDIO_CHANNELS, 16000, AUDIO_SAMPLERATE);
mResampler32.start(AUDIO_CHANNELS, 32000, AUDIO_SAMPLERATE);
mResampler48.start(AUDIO_CHANNELS, 48000, AUDIO_SAMPLERATE);
#if defined(DUMP_DECODED) #if defined(DUMP_DECODED)
mDecodedDump = std::make_shared<Audio::WavFileWriter>(); mDecodedDump = std::make_shared<Audio::WavFileWriter>();
mDecodedDump->open("decoded.wav", 8000 /*G711*/, AUDIO_CHANNELS); mDecodedDump->open("decoded.wav", 8000 /*G711*/, AUDIO_CHANNELS);
@@ -386,6 +398,11 @@ AudioReceiver::~AudioReceiver()
mResampler32.stop(); mResampler32.stop();
mResampler48.stop(); mResampler48.stop();
mDecodedDump.reset(); mDecodedDump.reset();
if (mRequestedAudio != 0ms)
ICELogDebug(<< "Requested " << mRequestedAudio << ", produced " << mProducedAudio);
if (mDecodeCount)
ICELogDebug(<< "Average interval between packet decoding " << mIntervalBetweenDecode / mDecodeCount);
} }
// Update codec settings // Update codec settings
@@ -406,40 +423,6 @@ CodecList::Settings& AudioReceiver::getCodecSettings()
return mCodecSettings; return mCodecSettings;
} }
size_t decode_packet(Codec& codec, RTPPacket& p, void* output_buffer, size_t output_capacity)
{
// How much data was produced
size_t result = 0;
// Handle here regular RTP packets
// Check if payload length is ok
int tail = codec.rtpLength() ? p.GetPayloadLength() % codec.rtpLength() : 0;
if (!tail)
{
// Find number of frames
int frame_count = codec.rtpLength() ? p.GetPayloadLength() / codec.rtpLength() : 1;
int frame_length = codec.rtpLength() ? codec.rtpLength() : (int)p.GetPayloadLength();
// Save last packet time length
// mLastPacketTimeLength = mFrameCount * mCodec->frameTime();
// Decode
for (int i=0; i < frame_count; i++)
{
auto r = codec.decode({p.GetPayloadData() + i * codec.rtpLength(), (size_t)frame_length},
{(uint8_t*)output_buffer, output_capacity});
result += r.mDecoded;
}
}
else
ICELogMedia(<< "RTP packet with tail.");
return result;
}
Codec* AudioReceiver::add(const std::shared_ptr<jrtplib::RTPPacket>& p) Codec* AudioReceiver::add(const std::shared_ptr<jrtplib::RTPPacket>& p)
{ {
Codec* codec = nullptr; Codec* codec = nullptr;
@@ -450,7 +433,7 @@ Codec* AudioReceiver::add(const std::shared_ptr<jrtplib::RTPPacket>& p)
payloadLength = p->GetPayloadLength(), payloadLength = p->GetPayloadLength(),
ptype = p->GetPayloadType(); ptype = p->GetPayloadType();
ICELogMedia(<< "Adding packet No " << p->GetSequenceNumber()); // ICELogMedia(<< "Adding packet No " << p->GetSequenceNumber());
// Increase codec counter // Increase codec counter
mStat.mCodecCount[ptype]++; mStat.mCodecCount[ptype]++;
@@ -508,12 +491,12 @@ Codec* AudioReceiver::add(const std::shared_ptr<jrtplib::RTPPacket>& p)
{ {
// It will cause statistics to report about bad RTP packet // It will cause statistics to report about bad RTP packet
// I have to replay last packet payload here to avoid report about lost packet // I have to replay last packet payload here to avoid report about lost packet
mBuffer.add(p, std::chrono::milliseconds(time_length), samplerate); mRtpBuffer.add(p, std::chrono::milliseconds(time_length), samplerate);
return nullptr; return nullptr;
} }
// Queue packet to buffer // Queue packet to buffer
mBuffer.add(p, std::chrono::milliseconds(time_length), samplerate).get(); mRtpBuffer.add(p, std::chrono::milliseconds(time_length), samplerate).get();
} }
return codec; return codec;
} }
@@ -522,32 +505,36 @@ void AudioReceiver::processDecoded(Audio::DataWindow& output, DecodeOptions opti
{ {
// Write to audio dump if requested // Write to audio dump if requested
if (mDecodedDump && mDecodedLength) if (mDecodedDump && mDecodedLength)
mDecodedDump->write(mDecodedFrame, mDecodedLength); mDecodedDump->write(mDecodedFrame.data(), mDecodedLength);
// Resample to target rate // Resample to target rate
makeMonoAndResample(options.mResampleToMainRate ? mCodec->samplerate() : 0, mCodec->channels()); makeMonoAndResample(options.mResampleToMainRate ? mCodec->samplerate() : 0, mCodec->channels());
// Send to output // Send to output
output.add(mResampledFrame, mResampledLength); output.add(mResampledFrame.data(), mResampledLength);
} }
void AudioReceiver::produceSilence(std::chrono::milliseconds length, Audio::DataWindow& output, DecodeOptions options) void AudioReceiver::produceSilence(std::chrono::milliseconds length, Audio::DataWindow& output, DecodeOptions options)
{ {
if (!mCodec)
return;
// Fill mDecodeBuffer as much as needed and call processDecoded() // Fill mDecodeBuffer as much as needed and call processDecoded()
// Depending on used codec mono or stereo silence should be produced // Depending on used codec mono or stereo silence should be produced
size_t chunks = length.count() / 10; size_t chunks = length.count() / 10;
size_t tail = length.count() % 10; size_t tail = length.count() % 10;
size_t chunk_size = 10 * sizeof(int16_t) * mCodec->samplerate() / 1000 * mCodec->channels(); size_t chunk_size = 10 * sizeof(int16_t) * mCodec->samplerate() / 1000 * mCodec->channels();
size_t tail_size = tail * sizeof(int16_t) * mCodec->samplerate() / 1000 * mCodec->channels(); size_t tail_size = tail * sizeof(int16_t) * mCodec->samplerate() / 1000 * mCodec->channels();
for (size_t i = 0; i < chunks; i++) for (size_t i = 0; i < chunks; i++)
{ {
memset(mDecodedFrame, 0, chunk_size); memset(mDecodedFrame.data(), 0, chunk_size);
mDecodedLength = chunk_size; mDecodedLength = chunk_size;
processDecoded(output, options); processDecoded(output, options);
} }
if (tail) if (tail)
{ {
memset(mDecodedFrame, 0, tail_size); memset(mDecodedFrame.data(), 0, tail_size);
mDecodedLength = tail_size; mDecodedLength = tail_size;
processDecoded(output, options); processDecoded(output, options);
} }
@@ -561,7 +548,7 @@ void AudioReceiver::produceCNG(std::chrono::milliseconds length, Audio::DataWind
if (options.mSkipDecode) if (options.mSkipDecode)
mDecodedLength = 0; mDecodedLength = 0;
else else
mDecodedLength = mCngDecoder.produce(mCodec->samplerate(), 100, mDecodedFrame, false); mDecodedLength = mCngDecoder.produce(mCodec->samplerate(), 100, mDecodedFrame.data(), false);
if (mDecodedLength) if (mDecodedLength)
processDecoded(output, options); processDecoded(output, options);
@@ -574,7 +561,7 @@ void AudioReceiver::produceCNG(std::chrono::milliseconds length, Audio::DataWind
if (options.mSkipDecode) if (options.mSkipDecode)
mDecodedLength = 0; mDecodedLength = 0;
else else
mDecodedLength = mCngDecoder.produce(mCodec->samplerate(), tail, reinterpret_cast<short*>(mDecodedFrame), false); mDecodedLength = mCngDecoder.produce(mCodec->samplerate(), tail, reinterpret_cast<short*>(mDecodedFrame.data()), false);
if (mDecodedLength) if (mDecodedLength)
processDecoded(output, options); processDecoded(output, options);
@@ -592,7 +579,7 @@ AudioReceiver::DecodeResult AudioReceiver::decodeGapTo(Audio::DataWindow& output
{ {
// Synthesize comfort noise. It will be done on AUDIO_SAMPLERATE rate directly to mResampledFrame buffer. // Synthesize comfort noise. It will be done on AUDIO_SAMPLERATE rate directly to mResampledFrame buffer.
// Do not forget to send this noise to analysis // Do not forget to send this noise to analysis
mDecodedLength = mCngDecoder.produce(mCodec->samplerate(), mLastPacketTimeLength, reinterpret_cast<short*>(mDecodedFrame), false); mDecodedLength = mCngDecoder.produce(mCodec->samplerate(), mLastPacketTimeLength, reinterpret_cast<short*>(mDecodedFrame.data()), false);
} }
else else
decodePacketTo(output, options, mCngPacket); decodePacketTo(output, options, mCngPacket);
@@ -605,14 +592,14 @@ AudioReceiver::DecodeResult AudioReceiver::decodeGapTo(Audio::DataWindow& output
mDecodedLength = 0; mDecodedLength = 0;
else else
{ {
mDecodedLength = mCodec->plc(mFrameCount, {(uint8_t*)mDecodedFrame, sizeof mDecodedFrame}); mDecodedLength = mCodec->plc(mFrameCount, {(uint8_t*)mDecodedFrame.data(), mDecodedFrame.size() * sizeof(int16_t)});
if (!mDecodedLength) if (!mDecodedLength)
{ {
// PLC is not support or failed // PLC is not support or failed
// So substitute the silence // So substitute the silence
size_t nr_of_samples = mCodec->frameTime() * mCodec->samplerate() / 1000 * sizeof(short); size_t nr_of_samples = mCodec->frameTime() * mCodec->samplerate() / 1000 * sizeof(short);
mDecodedLength = nr_of_samples * sizeof(short); mDecodedLength = nr_of_samples * sizeof(short);
memset(mDecodedFrame, 0, mDecodedLength); memset(mDecodedFrame.data(), 0, mDecodedLength);
} }
} }
} }
@@ -635,7 +622,8 @@ AudioReceiver::DecodeResult AudioReceiver::decodePacketTo(Audio::DataWindow& out
auto& rtp = *packet->rtp(); // Syntax sugar auto& rtp = *packet->rtp(); // Syntax sugar
mFailedCount = 0; mFailedCount = 0;
// Check if we need to emit silence or CNG - previously CNG packet was detected. Emit CNG audio here if needed.
// Check if we need to emit silence - it may happen in the case if next packet has RTP timestamp much beyond the previous one; maybe DTX was active.
if (mLastPacketTimestamp && mLastPacketTimeLength && mCodec) if (mLastPacketTimestamp && mLastPacketTimeLength && mCodec)
{ {
int units = rtp.GetTimestamp() - *mLastPacketTimestamp; int units = rtp.GetTimestamp() - *mLastPacketTimestamp;
@@ -643,7 +631,8 @@ AudioReceiver::DecodeResult AudioReceiver::decodePacketTo(Audio::DataWindow& out
if (milliseconds > mLastPacketTimeLength) if (milliseconds > mLastPacketTimeLength)
{ {
auto silenceLength = std::chrono::milliseconds(milliseconds - mLastPacketTimeLength); auto silenceLength = std::chrono::milliseconds(milliseconds - mLastPacketTimeLength);
ICELogDebug(<< "Emit " << silenceLength << " silence while requested " << options.mElapsed);
silenceLength = std::min(silenceLength, options.mElapsed);
if (mCngPacket && options.mFillGapByCNG) if (mCngPacket && options.mFillGapByCNG)
produceCNG(silenceLength, output, options); produceCNG(silenceLength, output, options);
else else
@@ -677,11 +666,12 @@ AudioReceiver::DecodeResult AudioReceiver::decodePacketTo(Audio::DataWindow& out
mDecodedLength = 0; mDecodedLength = 0;
else else
{ {
ICELogDebug(<< "Decoding CNG");
mCngPacket = packet; mCngPacket = packet;
mCngDecoder.decode3389(rtp.GetPayloadData(), rtp.GetPayloadLength()); mCngDecoder.decode3389(rtp.GetPayloadData(), rtp.GetPayloadLength());
// Emit CNG mLastPacketLength milliseconds // Emit CNG mLastPacketLength milliseconds
mDecodedLength = mCngDecoder.produce(mCodec->samplerate(), mLastPacketTimeLength, (short*)mDecodedFrame, true); mDecodedLength = mCngDecoder.produce(mCodec->samplerate(), mLastPacketTimeLength, (short*)mDecodedFrame.data(), true);
if (mDecodedLength) if (mDecodedLength)
processDecoded(output, options); processDecoded(output, options);
} }
@@ -717,7 +707,7 @@ AudioReceiver::DecodeResult AudioReceiver::decodePacketTo(Audio::DataWindow& out
{ {
// Decode frame by frame // Decode frame by frame
auto codecInput = std::span{rtp.GetPayloadData() + i * mCodec->rtpLength(), (size_t)frameLength}; auto codecInput = std::span{rtp.GetPayloadData() + i * mCodec->rtpLength(), (size_t)frameLength};
auto codecOutput = std::span{(uint8_t*)mDecodedFrame, sizeof mDecodedFrame}; auto codecOutput = std::span{(uint8_t*)mDecodedFrame.data(), mDecodedFrame.size() * sizeof(int16_t)};
auto r = mCodec->decode(codecInput, codecOutput); auto r = mCodec->decode(codecInput, codecOutput);
mDecodedLength = r.mDecoded; mDecodedLength = r.mDecoded;
if (mDecodedLength > 0) if (mDecodedLength > 0)
@@ -760,8 +750,15 @@ AudioReceiver::DecodeResult AudioReceiver::decodeEmptyTo(Audio::DataWindow& outp
// Try to decode it - replay previous audio decoded or use CNG decoder (if payload type is 13) // Try to decode it - replay previous audio decoded or use CNG decoder (if payload type is 13)
if (mCngPacket->rtp()->GetPayloadType() == 13) if (mCngPacket->rtp()->GetPayloadType() == 13)
{ {
// Using latest CNG packet to produce comfort noise // Using latest CNG packet to produce comfort noise.
auto produced = mCngDecoder.produce(fmt.rate(), options.mElapsed.count(), (short*)(output.data() + output.filled()), false); // Clamp the produced amount to the remaining capacity of the output window -
// the CNG decoder writes straight into its buffer.
size_t bytesPerMs = (size_t)fmt.rate() / 1000 * sizeof(short) * fmt.channels();
size_t room = output.capacity() - output.filled();
int ms = bytesPerMs ? (int)std::min<int64_t>(options.mElapsed.count(), (int64_t)(room / bytesPerMs)) : 0;
if (ms <= 0)
return {.mStatus = DecodeResult::Status::Skip};
auto produced = mCngDecoder.produce(fmt.rate(), ms, (short*)(output.mutableData() + output.filled()), false);
output.setFilled(output.filled() + produced); output.setFilled(output.filled() + produced);
return {.mStatus = DecodeResult::Status::Ok, .mSamplerate = fmt.rate(), .mChannels = fmt.channels()}; return {.mStatus = DecodeResult::Status::Ok, .mSamplerate = fmt.rate(), .mChannels = fmt.channels()};
} }
@@ -775,7 +772,7 @@ AudioReceiver::DecodeResult AudioReceiver::decodeEmptyTo(Audio::DataWindow& outp
else else
{ {
// Emit silence if codec information is available - it is to properly handle the gaps // Emit silence if codec information is available - it is to properly handle the gaps
auto avail = output.getTimeLength(fmt.rate(), fmt.channels()); auto avail = output.getTimeLength(fmt);
if (options.mElapsed > avail) if (options.mElapsed > avail)
output.addZero(fmt.sizeFromTime(options.mElapsed - avail)); output.addZero(fmt.sizeFromTime(options.mElapsed - avail));
} }
@@ -785,86 +782,18 @@ AudioReceiver::DecodeResult AudioReceiver::decodeEmptyTo(Audio::DataWindow& outp
return {.mStatus = DecodeResult::Status::Skip}; return {.mStatus = DecodeResult::Status::Skip};
} }
AudioReceiver::DecodeResult AudioReceiver::getAudioTo(Audio::DataWindow& output, DecodeOptions options) void MT::AudioReceiver::processDtmf()
{
if (mDtmfBuffer.getCount())
{ {
DecodeResult result = {.mStatus = DecodeResult::Status::Skip};
// Process RFC2833 here; it doesn't result in any audio - only callbacks and statistics
auto fr = mDtmfBuffer.fetch(); auto fr = mDtmfBuffer.fetch();
if (fr.mPacket && fr.mStatus == RtpBuffer::FetchResult::Status::RegularPacket) if (fr.mPacket && fr.mStatus == RtpBuffer::FetchResult::Status::RegularPacket)
mDtmfReceiver.add(fr.mPacket->rtp()); mDtmfReceiver.add(fr.mPacket->rtp());
auto produced = 0ms;
if (mAvailable.filled() && mCodec && options.mElapsed != 0ms)
{
Audio::Format fmt = options.mResampleToMainRate ? Audio::Format(AUDIO_SAMPLERATE, 1) : mCodec->getAudioFormat();
auto initiallyAvailable = mCodec ? mAvailable.getTimeLength(fmt.rate(), fmt.channels()) : 0ms;
if (initiallyAvailable != 0ms)
{
std::chrono::milliseconds resultTime = std::min(initiallyAvailable, options.mElapsed);
auto resultLen = fmt.sizeFromTime(resultTime);
mAvailable.moveTo(output, resultLen);
produced += resultTime;
// Maybe request is satisfied ?
if (produced >= options.mElapsed)
return {.mStatus = DecodeResult::Status::Ok, .mSamplerate = fmt.rate(), .mChannels = fmt.channels()};
} }
} }
std::chrono::milliseconds decoded = 0ms; void MT::AudioReceiver::updateDecodingTimeStatistics()
do
{ {
// Get next packet from buffer
RtpBuffer::ResultList rl;
RtpBuffer::FetchResult fr = mBuffer.fetch();
// ICELogDebug(<< fr.toString() << " " << mBuffer.findTimelength());
switch (fr.mStatus)
{
case RtpBuffer::FetchResult::Status::Gap: result = decodeGapTo(mAvailable, options); break;
case RtpBuffer::FetchResult::Status::NoPacket: result = decodeEmptyTo(mAvailable, options); break;
case RtpBuffer::FetchResult::Status::RegularPacket: result = decodePacketTo(mAvailable, options, fr.mPacket); break;
default:
assert(0);
}
// Was there decoding at all ?
if (!mCodec)
break; // No sense to continue - we have no information at all
Audio::Format fmt = options.mResampleToMainRate ? Audio::Format(AUDIO_SAMPLERATE, 1) : mCodec->getAudioFormat();
result.mSamplerate = fmt.rate();
result.mChannels = fmt.channels();
// Have we anything interesting in the buffer ?
auto bufferAvailable = mAvailable.getTimeLength(fmt.rate(), fmt.channels());
if (bufferAvailable == 0ms)
break; // No sense to continue - decoding / CNG / PLC stopped totally
// How much data should be moved to result buffer ?
if (options.mElapsed != 0ms)
{
std::chrono::milliseconds resultTime = std::min(bufferAvailable, options.mElapsed - produced);
auto resultLen = fmt.sizeFromTime(resultTime);
mAvailable.moveTo(output, resultLen);
produced += resultTime;
}
else
mAvailable.moveTo(output, mAvailable.filled());
decoded += bufferAvailable;
}
while (produced < options.mElapsed);
if (produced != 0ms)
result.mStatus = DecodeResult::Status::Ok;
// Time statistics
if (result.mStatus == DecodeResult::Status::Ok)
{
// Decode statistics
if (!mDecodeTimestamp) if (!mDecodeTimestamp)
mDecodeTimestamp = std::chrono::steady_clock::now(); mDecodeTimestamp = std::chrono::steady_clock::now();
else else
@@ -874,9 +803,114 @@ AudioReceiver::DecodeResult AudioReceiver::getAudioTo(Audio::DataWindow& output,
mDecodeTimestamp = t; mDecodeTimestamp = t;
} }
} }
AudioReceiver::DecodeResult AudioReceiver::getAudioTo(Audio::DataWindow& output, DecodeOptions options)
{
// ICELogDebug(<< "getAudioTo() for " << options.mElapsed);
assert (options.mElapsed != 0ms);
// First decode on this receiver: allocate the scratch buffers. Network-MOS-only
// streams never reach this point, so they never pay for them.
ensureDecodeBuffers();
// Increase counter of requested audio
mRequestedAudio += options.mElapsed;
DecodeResult result = {.mStatus = DecodeResult::Status::Skip};
// Process RFC2833 here; it doesn't result in any audio - only callbacks and statistics
processDtmf();
// How much time length audio we produced here
auto produced = 0ms;
Audio::Format fmt;
// Have we anything from the previous decode attempts ?
if (mAvailable.filled())
{
// Find what audio format is used in mAvailable data
fmt = options.mResampleToMainRate ? Audio::Format(AUDIO_SAMPLERATE, 1) : mCodec->getAudioFormat();
// How much milliseconds are available ?
auto availTime = mAvailable.getTimeLength(fmt);
if (availTime != 0ms)
{
// How much we can consume from the mAvailable buffer ?
std::chrono::milliseconds resultTime = std::min(availTime, options.mElapsed);
// Number of bytes
mAvailable.moveTo(output, fmt.sizeFromTime(resultTime));
// Increase the counter of produced milliseconds
produced += resultTime;
}
}
while (produced < options.mElapsed)
{
// Get next packet from buffer
RtpBuffer::FetchResult fr = mRtpBuffer.fetch();
// Decode to mAvailable buffer
switch (fr.mStatus)
{
case RtpBuffer::FetchResult::Status::Gap: result = decodeGapTo(mAvailable, options.decreaseElapsedBy(produced)); break;
case RtpBuffer::FetchResult::Status::NoPacket: result = decodeEmptyTo(mAvailable, options.decreaseElapsedBy(produced)); break;
case RtpBuffer::FetchResult::Status::RegularPacket: result = decodePacketTo(mAvailable, options.decreaseElapsedBy(produced), fr.mPacket); updateDecodeIntervalStatistics(); break;
default:
assert(0);
}
// Was there decoding at all ?
if (!mCodec)
break; // No sense to continue - we have no information at all
fmt = options.mResampleToMainRate ? Audio::Format(AUDIO_SAMPLERATE, 1) : mCodec->getAudioFormat();
result.mSamplerate = fmt.rate();
result.mChannels = fmt.channels();
// How much milliseconds we have in audio buffer ?
auto bufferAvailable = mAvailable.getTimeLength(fmt);
if (bufferAvailable == 0ms)
break; // No sense to continue - decoding / CNG / PLC stopped totally
// How much data should be moved to result buffer ?
std::chrono::milliseconds resultTime = std::min(bufferAvailable, options.mElapsed - produced);
mAvailable.moveTo(output, fmt.sizeFromTime(resultTime));
produced += resultTime;
}
if (produced != 0ms)
{
result.mStatus = DecodeResult::Status::Ok;
updateDecodingTimeStatistics();
}
mProducedAudio += produced;
// ICELogDebug(<< "Requested " << options.mElapsed << ", produced " << produced << ", remains " << mAvailable.getTimeLength(fmt) << ", packets " << getRtpBuffer().getCount());
return result; return result;
} }
void AudioReceiver::ensureDecodeBuffers()
{
// Allocate the decode/convert/resample scratch buffers to full capacity on the
// first decode. mDecodedFrame being empty means none are allocated yet; they
// are always allocated together, so checking one is enough.
if (mDecodedFrame.empty())
{
mDecodedFrame.resize(MT_MAX_DECODEBUFFER);
mConvertedFrame.resize(MT_MAX_DECODEBUFFER * 2);
mResampledFrame.resize(MT_MAX_DECODEBUFFER);
}
if (!mAvailable.capacity())
{
// 10 seconds is the maximum length of decoded audio in single step
// It is important - DTX may produce silence up to few seconds easily
mAvailable.setCapacity(AUDIO_SAMPLERATE * 10 * sizeof(short));
}
}
void AudioReceiver::makeMonoAndResample(int rate, int channels) void AudioReceiver::makeMonoAndResample(int rate, int channels)
{ {
// Make mono from stereo - engine works with mono only for now // Make mono from stereo - engine works with mono only for now
@@ -884,12 +918,12 @@ void AudioReceiver::makeMonoAndResample(int rate, int channels)
if (channels != AUDIO_CHANNELS) if (channels != AUDIO_CHANNELS)
{ {
if (channels == 1) if (channels == 1)
mConvertedLength = Audio::ChannelConverter::monoToStereo(mDecodedFrame, mDecodedLength, mConvertedFrame, mDecodedLength * 2); mConvertedLength = Audio::ChannelConverter::monoToStereo(mDecodedFrame.data(), mDecodedLength, mConvertedFrame.data(), mDecodedLength * 2);
else else
mDecodedLength = Audio::ChannelConverter::stereoToMono(mDecodedFrame, mDecodedLength, mDecodedFrame, mDecodedLength / 2); mDecodedLength = Audio::ChannelConverter::stereoToMono(mDecodedFrame.data(), mDecodedLength, mDecodedFrame.data(), mDecodedLength / 2);
} }
void* frames = mConvertedLength ? mConvertedFrame : mDecodedFrame; void* frames = mConvertedLength ? (void*)mConvertedFrame.data() : (void*)mDecodedFrame.data();
unsigned length = mConvertedLength ? mConvertedLength : mDecodedLength; unsigned length = mConvertedLength ? mConvertedLength : mDecodedLength;
Audio::Resampler* r = nullptr; Audio::Resampler* r = nullptr;
@@ -900,13 +934,13 @@ void AudioReceiver::makeMonoAndResample(int rate, int channels)
case 32000: r = &mResampler32; break; case 32000: r = &mResampler32; break;
case 48000: r = &mResampler48; break; case 48000: r = &mResampler48; break;
default: default:
memcpy(mResampledFrame, frames, length); memcpy(mResampledFrame.data(), frames, length);
mResampledLength = length; mResampledLength = length;
return; return;
} }
size_t processedInput = 0; size_t processedInput = 0;
mResampledLength = r->processBuffer(frames, length, processedInput, mResampledFrame, r->getDestLength(length)); mResampledLength = r->processBuffer(frames, length, processedInput, mResampledFrame.data(), r->getDestLength(length));
// processedInput result value is ignored - it is always equal to length as internal sample rate is 8/16/32/48K // processedInput result value is ignored - it is always equal to length as internal sample rate is 8/16/32/48K
} }
@@ -987,43 +1021,16 @@ AudioReceiver::MediaInfo AudioReceiver::infoFor(jrtplib::RTPPacket& p)
return {packetTime, codec->samplerate()}; return {packetTime, codec->samplerate()};
} }
// int AudioReceiver::timelengthFor(jrtplib::RTPPacket& p) void AudioReceiver::updateDecodeIntervalStatistics()
// { {
// CodecMap::iterator codecIter = mCodecMap.find(p.GetPayloadType()); auto now = std::chrono::steady_clock::now();
// if (codecIter == mCodecMap.end()) if (mLastDecodeTimestamp)
// return 0; {
mIntervalBetweenDecode += std::chrono::duration_cast<std::chrono::microseconds>(now - *mLastDecodeTimestamp);
// PCodec codec = codecIter->second; mDecodeCount ++;
// if (codec) }
// { mLastDecodeTimestamp = now;
// int frame_count = 0; }
// if (codec->rtpLength() != 0)
// {
// frame_count = static_cast<int>(p.GetPayloadLength() / codec->rtpLength());
// if (p.GetPayloadType() == 9/*G729A silence*/ && p.GetPayloadLength() % codec->rtpLength())
// frame_count++;
// }
// else
// frame_count = 1;
// return frame_count * codec->frameTime();
// }
// else
// return 0;
// }
// int AudioReceiver::samplerateFor(jrtplib::RTPPacket& p)
// {
// CodecMap::iterator codecIter = mCodecMap.find(p.GetPayloadType());
// if (codecIter != mCodecMap.end())
// {
// PCodec codec = codecIter->second;
// if (codec)
// return codec->samplerate();
// }
// return 8000;
// }
// ----------------------- DtmfReceiver ------------------- // ----------------------- DtmfReceiver -------------------
DtmfReceiver::DtmfReceiver(Statistics& stat) DtmfReceiver::DtmfReceiver(Statistics& stat)
@@ -1036,21 +1043,25 @@ DtmfReceiver::~DtmfReceiver()
void DtmfReceiver::add(const std::shared_ptr<RTPPacket>& p) void DtmfReceiver::add(const std::shared_ptr<RTPPacket>& p)
{ {
auto ev = DtmfBuilder::parseRfc2833({p->GetPayloadData(), p->GetPayloadLength()}); auto ev = DtmfBuilder::parseRfc2833({p->GetPayloadData(), p->GetPayloadLength()});
if (ev.mTone != mEvent || ev.mEnd != mEventEnded) if (!ev.mTone)
return; // Malformed or unknown event payload
// A new digit begins when the tone changes, or when the same tone starts
// again after the previous occurrence ended. Retransmitted start/end
// packets keep both fields unchanged and are ignored. The end packet of
// the current tone only updates state - the digit was already reported.
bool newEvent = (ev.mTone != mEvent) || (mEventEnded && !ev.mEnd);
if (newEvent)
{ {
if (!(mEvent == ev.mTone && !mEventEnded && ev.mEnd))
{
// New tone is here
if (mCallback) if (mCallback)
mCallback(ev.mTone); mCallback(ev.mTone);
// Queue statistics item // Queue statistics item
mStat.mDtmf2833Timeline.emplace_back(Dtmf2833Event{.mTone = ev.mTone, mStat.mDtmf2833Timeline.emplace_back(Dtmf2833Event{.mTone = ev.mTone,
.mTimestamp = RtpHelper::toMicroseconds(p->GetReceiveTime())}); .mTimestamp = RtpHelper::toMicroseconds(p->GetReceiveTime())});
}
// Store to avoid triggering on the packet
mEvent = ev.mTone; mEvent = ev.mTone;
mEventEnded = ev.mEnd; mEventEnded = ev.mEnd;
} }
}
}
+56 -10
View File
@@ -20,6 +20,8 @@
#include <optional> #include <optional>
#include <chrono> #include <chrono>
#include <vector>
#include <cstdint>
using namespace std::chrono_literals; using namespace std::chrono_literals;
namespace MT namespace MT
@@ -104,6 +106,18 @@ public:
FetchResult fetch(); FetchResult fetch();
// Drop oldest packets so buffered audio stays within the high-water mark,
// recording packet-loss events for any sequence gaps crossed (the same
// accounting fetch() performs). Used to bound memory on streams that never
// call fetch() - i.e. network-MOS-only streams with audio decode disabled,
// which would otherwise retain every packet for the whole call.
//
// maxPackets, when non-zero, additionally caps the buffer to that many packets
// regardless of buffered time. The decode path (fetch()) leaves it 0 so jitter
// tolerance stays governed by the time-based high-water mark; the network-only
// path passes a small cap since those packets are never decoded.
void trimToHighWater(size_t maxPackets = 0);
protected: protected:
unsigned mSsrc = 0; unsigned mSsrc = 0;
std::chrono::milliseconds mHigh = std::chrono::milliseconds(RTP_BUFFER_HIGH), std::chrono::milliseconds mHigh = std::chrono::milliseconds(RTP_BUFFER_HIGH),
@@ -122,6 +136,7 @@ protected:
std::optional<uint32_t> mLastSeqno; std::optional<uint32_t> mLastSeqno;
std::optional<jrtplib::RTPTime> mLastReceiveTime; std::optional<jrtplib::RTPTime> mLastReceiveTime;
// To calculate average interval between packet add. It is close to jitter but more useful in debugging. // To calculate average interval between packet add. It is close to jitter but more useful in debugging.
float mLastAddTime = 0.0f; float mLastAddTime = 0.0f;
}; };
@@ -169,10 +184,22 @@ public:
struct DecodeOptions struct DecodeOptions
{ {
bool mRealtimeProcessing = false; // Target PCAP parsing by default
bool mResampleToMainRate = true; // Resample all decoded audio to AUDIO_SAMPLERATE bool mResampleToMainRate = true; // Resample all decoded audio to AUDIO_SAMPLERATE
bool mFillGapByCNG = false; // Use CNG information if available bool mFillGapByCNG = false; // Use CNG information if available
bool mSkipDecode = false; // Don't do decode, just dry run - fetch packets, remove them from the jitter buffer bool mSkipDecode = false; // Don't do decode, just dry run - fetch packets, remove them from the jitter buffer
std::chrono::milliseconds mElapsed = 0ms; // How much milliseconds should be decoded; zero value means "decode just next packet from the buffer" std::chrono::milliseconds mElapsed = 0ms; // How much milliseconds should be decoded; zero value means "decode just next packet from the buffer"
DecodeOptions decreaseElapsedBy(std::chrono::milliseconds delta)
{
return
{
.mRealtimeProcessing = mRealtimeProcessing,
.mResampleToMainRate = mResampleToMainRate,
.mFillGapByCNG = mFillGapByCNG,
.mSkipDecode = mSkipDecode,
.mElapsed = std::max(mElapsed - delta, 0ms)
};
}
}; };
struct DecodeResult struct DecodeResult
@@ -193,7 +220,7 @@ public:
// Looks for codec by payload type // Looks for codec by payload type
Codec* findCodec(int payloadType); Codec* findCodec(int payloadType);
RtpBuffer& getRtpBuffer() { return mBuffer; } RtpBuffer& getRtpBuffer() { return mRtpBuffer; }
// Returns size of AudioReceiver's instance in bytes (including size of all data + codecs + etc.) // Returns size of AudioReceiver's instance in bytes (including size of all data + codecs + etc.)
int getSize() const; int getSize() const;
@@ -205,14 +232,12 @@ public:
}; };
MediaInfo infoFor(jrtplib::RTPPacket& p); MediaInfo infoFor(jrtplib::RTPPacket& p);
// // Returns timelength for given packet void processDtmf();
// int timelengthFor(jrtplib::RTPPacket& p);
// // Return samplerate for given packet void updateDecodingTimeStatistics();
// int samplerateFor(jrtplib::RTPPacket& p);
protected: protected:
RtpBuffer mBuffer; // Jitter buffer itself RtpBuffer mRtpBuffer; // RTP jitter buffer itself; here are audio packets
RtpBuffer mDtmfBuffer; // These two (mDtmfBuffer / mDtmfReceiver) are for our analyzer stack only; in normal softphone logic DTMF packets goes via SingleAudioStream::mDtmfReceiver RtpBuffer mDtmfBuffer; // These two (mDtmfBuffer / mDtmfReceiver) are for our analyzer stack only; in normal softphone logic DTMF packets goes via SingleAudioStream::mDtmfReceiver
DtmfReceiver mDtmfReceiver; DtmfReceiver mDtmfReceiver;
@@ -229,16 +254,22 @@ protected:
// Already decoded data that can be retrieved without actual decoding - it may happen because of getAudioTo() may be limited by time interval // Already decoded data that can be retrieved without actual decoding - it may happen because of getAudioTo() may be limited by time interval
Audio::DataWindow mAvailable; Audio::DataWindow mAvailable;
// Temporary buffer to hold decoded data (it is better than allocate data on stack) // Decode/convert/resample scratch buffers. These were inline arrays
int16_t mDecodedFrame[MT_MAX_DECODEBUFFER]; // (MT_MAX_DECODEBUFFER * {1,2,1} * int16_t = 256 KB total) carried by every
// AudioReceiver, hence by every StreamDecoder - including network-MOS-only
// streams that never decode. They are now allocated lazily on the first
// getAudioTo() call via ensureDecodeBuffers(); non-decoding streams keep them
// empty. Once allocated they are sized to full capacity and reused, so decode
// behaviour is unchanged.
std::vector<int16_t> mDecodedFrame; // sized to MT_MAX_DECODEBUFFER
size_t mDecodedLength = 0; size_t mDecodedLength = 0;
// Buffer to hold data converted to stereo/mono; there is multiplier 2 as it can be stereo audio // Buffer to hold data converted to stereo/mono; there is multiplier 2 as it can be stereo audio
int16_t mConvertedFrame[MT_MAX_DECODEBUFFER * 2]; std::vector<int16_t> mConvertedFrame; // sized to MT_MAX_DECODEBUFFER * 2
size_t mConvertedLength = 0; size_t mConvertedLength = 0;
// Buffer to hold data resampled to AUDIO_SAMPLERATE // Buffer to hold data resampled to AUDIO_SAMPLERATE
int16_t mResampledFrame[MT_MAX_DECODEBUFFER]; std::vector<int16_t> mResampledFrame; // sized to MT_MAX_DECODEBUFFER
size_t mResampledLength = 0; size_t mResampledLength = 0;
// Last packet time length // Last packet time length
@@ -258,6 +289,15 @@ protected:
float mIntervalSum = 0.0f; float mIntervalSum = 0.0f;
int mIntervalCount = 0; int mIntervalCount = 0;
std::chrono::milliseconds mRequestedAudio = 0ms;
std::chrono::milliseconds mProducedAudio = 0ms;
// Lazily allocate the decode/convert/resample scratch buffers (mDecodedFrame,
// mConvertedFrame, mResampledFrame) to full capacity on the first decode. A
// no-op once allocated. Called at the top of getAudioTo(); network-MOS-only
// streams never reach it, so they never pay the 256 KB.
void ensureDecodeBuffers();
// Zero rate will make audio mono but resampling will be skipped // Zero rate will make audio mono but resampling will be skipped
void makeMonoAndResample(int rate, int channels); void makeMonoAndResample(int rate, int channels);
@@ -272,6 +312,12 @@ protected:
DecodeResult decodeGapTo(Audio::DataWindow& output, DecodeOptions options); DecodeResult decodeGapTo(Audio::DataWindow& output, DecodeOptions options);
DecodeResult decodePacketTo(Audio::DataWindow& output, DecodeOptions options, const std::shared_ptr<RtpBuffer::Packet>& p); DecodeResult decodePacketTo(Audio::DataWindow& output, DecodeOptions options, const std::shared_ptr<RtpBuffer::Packet>& p);
DecodeResult decodeEmptyTo(Audio::DataWindow& output, DecodeOptions options); DecodeResult decodeEmptyTo(Audio::DataWindow& output, DecodeOptions options);
std::optional<std::chrono::steady_clock::time_point> mLastDecodeTimestamp;
std::chrono::microseconds mIntervalBetweenDecode = 0us;
size_t mDecodeCount = 0;
void updateDecodeIntervalStatistics();
}; };
} }
+12 -4
View File
@@ -238,6 +238,9 @@ void AudioStream::addData(const void* buffer, int bytes)
void AudioStream::copyDataTo(Audio::Mixer& mixer, int needed) void AudioStream::copyDataTo(Audio::Mixer& mixer, int needed)
{ {
// mStreamMap is also mutated from the network thread (dataArrived)
Lock l(mMutex);
// Local audio mixer - used to send audio to media observer // Local audio mixer - used to send audio to media observer
Audio::Mixer localMixer; Audio::Mixer localMixer;
Audio::DataWindow forObserver; Audio::DataWindow forObserver;
@@ -282,22 +285,27 @@ void AudioStream::copyDataTo(Audio::Mixer& mixer, int needed)
if (mMediaObserver) if (mMediaObserver)
{ {
localMixer.mixAndGetPcm(forObserver); int mixedBytes = localMixer.mixAndGetPcm(forObserver);
mMediaObserver->onMedia(forObserver.data(), forObserver.capacity(), MT::Stream::MediaDirection::Incoming, this, mMediaObserverTag); if (mixedBytes > 0)
mMediaObserver->onMedia(forObserver.data(), mixedBytes, MT::Stream::MediaDirection::Incoming, this, mMediaObserverTag);
} }
} }
void AudioStream::dataArrived(PDatagramSocket s, const void* buffer, int length, InternetAddress& source) void AudioStream::dataArrived(PDatagramSocket s, const void* buffer, int length, InternetAddress& source)
{ {
// Protects mStreamMap (also iterated by copyDataTo on the audio thread)
// and the receive/decrypt buffers.
Lock l(mMutex);
jrtplib::RTPIPv6Address addr6; jrtplib::RTPIPv6Address addr6;
jrtplib::RTPIPv4Address addr4; jrtplib::RTPIPv4Address addr4;
jrtplib::RTPExternalTransmissionInfo* info = dynamic_cast<jrtplib::RTPExternalTransmissionInfo*>(mRtpSession.GetTransmissionInfo()); jrtplib::RTPExternalTransmissionInfo* info = dynamic_cast<jrtplib::RTPExternalTransmissionInfo*>(mRtpSession.GetTransmissionInfo());
assert(info); assert(info);
// Drop RTP packets if stream is not receiving now; let RTCP go // Drop RTP packets if stream is not receiving now; let RTCP go
if (!(state() & (int)StreamState::Receiving) && RtpHelper::isRtpOrRtcp(buffer, length)) if (!(state() & (int)StreamState::Receiving) && RtpHelper::isRtp(buffer, length))
{ {
ICELogMedia(<< "Stream is not allowed to receive RTP stream. Ignore the RT(C)P packet"); ICELogMedia(<< "Stream is not allowed to receive RTP stream. Ignore the RTP packet");
return; return;
} }
+1 -1
View File
@@ -135,7 +135,7 @@ namespace MT
// Get noise level // Get noise level
unsigned char noiseLevel = *dataIn; unsigned char noiseLevel = *dataIn;
float linear = float(1.0 / noiseLevel ? noiseLevel : 1); float linear = 1.0f / float(noiseLevel ? noiseLevel : 1);
// Generate white noise for 16KHz sample rate // Generate white noise for 16KHz sample rate
LPFilter lpf; HPFilter hpf; LPFilter lpf; HPFilter hpf;
+1 -2
View File
@@ -137,7 +137,6 @@ std::string CodecList::Settings::toString() const
oss << "OPUS ptype: " << spec.mPayloadType << ", rate: " << spec.mRate << ", channels: " << spec.mChannels << std::endl; oss << "OPUS ptype: " << spec.mPayloadType << ", rate: " << spec.mRate << ", channels: " << spec.mChannels << std::endl;
} }
return oss.str(); return oss.str();
} }
@@ -234,7 +233,7 @@ static int findOctetMode(const char* line)
p += strlen(param_name); p += strlen(param_name);
char int_buf[8] = {0}; char int_buf[8] = {0};
size_t int_buf_offset = 0; size_t int_buf_offset = 0;
while (*p && isdigit(*p) && int_buf_offset < sizeof(int_buf)) while (*p && isdigit(*p) && int_buf_offset < sizeof(int_buf) - 1)
int_buf[int_buf_offset++] = *p++; int_buf[int_buf_offset++] = *p++;
return atoi(int_buf); return atoi(int_buf);
} }
+15 -14
View File
@@ -37,12 +37,11 @@ void DtmfBuilder::buildRfc2833(const Rfc2833Event& ev, void* output)
char* packet = (char*)output; char* packet = (char*)output;
// RFC 4733: byte 1 is E(1) R(1) volume(6)
packet[0] = toneValue; packet[0] = toneValue;
packet[1] = 1 | (ev.mVolume << 2); packet[1] = ev.mVolume & 0x3F;
if (ev.mEnd) if (ev.mEnd)
packet[1] |= 128; packet[1] |= 0x80;
else
packet[1] &= 127;
unsigned short durationValue = htons(ev.mDuration); unsigned short durationValue = htons(ev.mDuration);
memcpy(packet + 2, &durationValue, 2); memcpy(packet + 2, &durationValue, 2);
@@ -58,11 +57,11 @@ DtmfBuilder::Rfc2833Event DtmfBuilder::parseRfc2833(std::span<uint8_t> payload)
uint8_t b0 = payload[0]; uint8_t b0 = payload[0];
uint8_t b1 = payload[1]; uint8_t b1 = payload[1];
if (b0 >=0 && b0 <= 9) if (b0 <= 9)
r.mTone = '0' + b0; r.mTone = '0' + b0;
else else
if (b0 >= 12 && b0 <= 17) if (b0 >= 12 && b0 <= 15)
r.mTone = 'A' + b0; r.mTone = 'A' + b0 - 12;
else else
if (b0 == 10) if (b0 == 10)
r.mTone = '*'; r.mTone = '*';
@@ -70,9 +69,10 @@ DtmfBuilder::Rfc2833Event DtmfBuilder::parseRfc2833(std::span<uint8_t> payload)
if (b0 == 11) if (b0 == 11)
r.mTone = '#'; r.mTone = '#';
r.mEnd = (b1 & 128); // RFC 4733: byte 1 is E(1) R(1) volume(6); duration is bytes 2-3, network order
r.mVolume = (b1 & 127) >> 2; r.mEnd = (b1 & 0x80) != 0;
r.mDuration = ntohs(*(uint16_t*)payload.data()+2); r.mVolume = b1 & 0x3F;
r.mDuration = (uint16_t(payload[2]) << 8) | payload[3];
return r; return r;
} }
@@ -202,7 +202,7 @@ void PDTMFEncoder_AddTone(double f1, double f2, unsigned ms1, unsigned ms2, unsi
int ival = ifix(val); int ival = ifix(val);
if (ival < -32768) if (ival < -32768)
ival = -32768; ival = -32768;
else if (val > 32767) else if (ival > 32767)
ival = 32767; ival = 32767;
result[dataPtr++] = ival / 2; result[dataPtr++] = ival / 2;
@@ -280,8 +280,9 @@ void DtmfContext::stopTone()
switch (mType) switch (mType)
{ {
case Dtmf_Rfc2833: case Dtmf_Rfc2833:
// Mark stopped but keep the entry: getRfc2833() emits the end
// packet(s) for a stopped tone and erases it afterwards.
mQueue.front().mStopped = true; mQueue.front().mStopped = true;
mQueue.erase(mQueue.begin());
break; break;
case Dtmf_Inband: case Dtmf_Inband:
@@ -769,7 +770,7 @@ int zap_dtmf_detect (dtmf_detect_state_t *s,
s->fax_tone.v2 = s->fax_tone.v3; s->fax_tone.v2 = s->fax_tone.v3;
s->fax_tone.v3 = s->fax_tone.fac*s->fax_tone.v2 - v1 + famp; s->fax_tone.v3 = s->fax_tone.fac*s->fax_tone.v2 - v1 + famp;
v1 = s->fax_tone.v2; v1 = s->fax_tone2nd.v2;
s->fax_tone2nd.v2 = s->fax_tone2nd.v3; s->fax_tone2nd.v2 = s->fax_tone2nd.v3;
s->fax_tone2nd.v3 = s->fax_tone2nd.fac*s->fax_tone2nd.v2 - v1 + famp; s->fax_tone2nd.v3 = s->fax_tone2nd.fac*s->fax_tone2nd.v2 - v1 + famp;
} }
@@ -865,7 +866,7 @@ printf("Fax energy/Second Harmonic: %f/%f\n", fax_energy, fax_energy_2nd);
s->detected_digits++; s->detected_digits++;
if (s->current_digits < MAX_DTMF_DIGITS) if (s->current_digits < MAX_DTMF_DIGITS)
{ {
s->digits[s->current_digits++] = hit; s->digits[s->current_digits++] = 'f';
s->digits[s->current_digits] = '\0'; s->digits[s->current_digits] = '\0';
} }
else else
+25 -2
View File
@@ -152,6 +152,27 @@ EVSCodec::EVSCodec(const StreamParameters &sp)
{ {
EVSCodec::sp = sp; EVSCodec::sp = sp;
// Metadata only - the heavy decoder state is created lazily (ensureDecoder()).
mOutputFs = outputFsFromBw(sp.bw);
}
int EVSCodec::outputFsFromBw(int bw)
{
switch (bw)
{
case NB: return 8000;
case WB: return 16000;
case SWB: return 32000;
case FB: return 48000;
}
return 0;
}
void EVSCodec::ensureDecoder()
{
if (st_dec)
return;
if ((st_dec = reinterpret_cast<evs::Decoder_State*>(malloc(sizeof(evs::Decoder_State)))) == nullptr) if ((st_dec = reinterpret_cast<evs::Decoder_State*>(malloc(sizeof(evs::Decoder_State)))) == nullptr)
throw std::bad_alloc(); throw std::bad_alloc();
@@ -170,9 +191,9 @@ EVSCodec::~EVSCodec()
Codec::Info EVSCodec::info() { Codec::Info EVSCodec::info() {
return { return {
.mName = MT_EVS_CODECNAME, .mName = MT_EVS_CODECNAME,
.mSamplerate = st_dec->output_Fs, .mSamplerate = mOutputFs,
.mChannels = 1, .mChannels = 1,
.mPcmLength = st_dec->output_Fs / 1000 * sp.ptime * 2, .mPcmLength = mOutputFs / 1000 * sp.ptime * 2,
.mFrameTime = sp.ptime, .mFrameTime = sp.ptime,
.mRtpLength = 0 .mRtpLength = 0
}; };
@@ -187,6 +208,8 @@ Codec::EncodeResult EVSCodec::encode(std::span<const uint8_t> input, std::span<u
Codec::DecodeResult EVSCodec::decode(std::span<const uint8_t> input, std::span<uint8_t> output) Codec::DecodeResult EVSCodec::decode(std::span<const uint8_t> input, std::span<uint8_t> output)
{ {
ensureDecoder();
if (output.size_bytes() < pcmLength()) if (output.size_bytes() < pcmLength())
return {.mDecoded = 0}; return {.mDecoded = 0};
+14
View File
@@ -57,7 +57,21 @@ public:
private: private:
evs::Decoder_State* st_dec = nullptr; evs::Decoder_State* st_dec = nullptr;
StreamParameters sp; StreamParameters sp;
// Output sample rate, derived from the negotiated bandwidth (sp.bw) at
// construction. Cached so info()/samplerate()/pcmLength() work for network-MOS
// metadata without allocating the (large) EVS decoder state - see ensureDecoder.
int mOutputFs = 0;
void initDecoder(const StreamParameters& sp); void initDecoder(const StreamParameters& sp);
// Allocate + initialize the EVS decoder state lazily on first decode().
// Network-MOS-only streams resolve metadata but never decode, so they never
// pay for the EVS decoder (Decoder_State + CLDFB/FD-CNG sub-allocations).
void ensureDecoder();
// Maps an EVS bandwidth (NB/WB/SWB/FB) to its output sample rate in Hz.
static int outputFsFromBw(int bw);
}; };
} // End of namespace } // End of namespace
+15 -1
View File
@@ -34,7 +34,21 @@ void SingleAudioStream::copyPcmTo(Audio::DataWindow& output, int needed)
// Packet by packet // Packet by packet
while (output.filled() < needed) while (output.filled() < needed)
{ {
if (mReceiver.getAudioTo(output, {}).mStatus != AudioReceiver::DecodeResult::Status::Ok) // Number of bytes to fill on this step
auto requested = needed - output.filled();
auto options = AudioReceiver::DecodeOptions{
.mRealtimeProcessing = true,
.mResampleToMainRate = true,
.mSkipDecode = false,
.mElapsed = std::chrono::milliseconds(requested / (AUDIO_SAMPLERATE / 1000))
};
// Try to get the data from receiver / decoder
if (options.mElapsed != 0ms) {
if (mReceiver.getAudioTo(output, options).mStatus != AudioReceiver::DecodeResult::Status::Ok)
break;
} else
break; break;
} }
+23 -2
View File
@@ -48,6 +48,24 @@ extern std::string_view toString(SrtpSuite suite)
return {}; return {};
} }
extern int srtpSuiteStrength(SrtpSuite suite)
{
switch (suite)
{
case SRTP_NONE: return 0;
case SRTP_AES_128_AUTH_NULL: return 1; // no authentication - weakest
case SRTP_AES_128_AUTH_32: return 2;
case SRTP_AES_192_AUTH_32: return 3;
case SRTP_AES_256_AUTH_32: return 4;
case SRTP_AES_128_AUTH_80: return 5;
case SRTP_AES_192_AUTH_80: return 6;
case SRTP_AES_256_AUTH_80: return 7;
case SRTP_AED_AES_128_GCM: return 8;
case SRTP_AED_AES_256_GCM: return 9;
}
return 0;
}
typedef void (*set_srtp_policy_function) (srtp_crypto_policy_t*); typedef void (*set_srtp_policy_function) (srtp_crypto_policy_t*);
set_srtp_policy_function findPolicyFunction(SrtpSuite suite) set_srtp_policy_function findPolicyFunction(SrtpSuite suite)
@@ -95,6 +113,7 @@ SrtpSession::SrtpSession()
// Generate outgoing keys for all ciphers // Generate outgoing keys for all ciphers
auto putKey = [this](SrtpSuite suite, size_t length){ auto putKey = [this](SrtpSuite suite, size_t length){
assert(suite > SRTP_NONE && suite <= SRTP_LAST);
auto key = std::make_shared<ByteBuffer>(); auto key = std::make_shared<ByteBuffer>();
key->resize(length); key->resize(length);
RAND_bytes(key->mutableData(), key->size()); RAND_bytes(key->mutableData(), key->size());
@@ -103,9 +122,9 @@ SrtpSession::SrtpSession()
putKey(SRTP_AES_128_AUTH_80, 30); putKey(SRTP_AES_128_AUTH_32, 30); putKey(SRTP_AES_128_AUTH_80, 30); putKey(SRTP_AES_128_AUTH_32, 30);
putKey(SRTP_AES_192_AUTH_80, 38); putKey(SRTP_AES_192_AUTH_32, 38); putKey(SRTP_AES_192_AUTH_80, 38); putKey(SRTP_AES_192_AUTH_32, 38);
putKey(SRTP_AES_256_AUTH_80, 46); putKey(SRTP_AES_256_AUTH_32, 46); putKey(SRTP_AES_256_AUTH_80, 46); putKey(SRTP_AES_256_AUTH_32, 46);
putKey(SRTP_AES_128_AUTH_NULL, 30); // NULL auth still encrypts - it needs a key+salt
putKey(SRTP_AED_AES_128_GCM, 28); putKey(SRTP_AED_AES_128_GCM, 28);
putKey(SRTP_AED_AES_256_GCM, 44); putKey(SRTP_AED_AES_256_GCM, 44);
} }
SrtpSession::~SrtpSession() SrtpSession::~SrtpSession()
@@ -214,7 +233,9 @@ SrtpKeySalt& SrtpSession::outgoingKey(SrtpSuite suite)
{ {
assert(suite > SRTP_NONE && suite <= SRTP_LAST); assert(suite > SRTP_NONE && suite <= SRTP_LAST);
Lock l(mGuard); Lock l(mGuard);
return mOutgoingKey[int(suite)-1]; // The automated review sometimes give the hints about the possible underflow array index access // Must use the same indexing as the constructor and open(): the SDP
// crypto attribute has to advertise the key the session encrypts with.
return mOutgoingKey[int(suite)];
} }
bool SrtpSession::protectRtp(void* buffer, int* length) bool SrtpSession::protectRtp(void* buffer, int* length)
+7 -1
View File
@@ -36,6 +36,10 @@ enum SrtpSuite
extern SrtpSuite toSrtpSuite(const std::string_view& s); extern SrtpSuite toSrtpSuite(const std::string_view& s);
extern std::string_view toString(SrtpSuite suite); extern std::string_view toString(SrtpSuite suite);
// Relative cryptographic strength used to pick a suite from an SDP offer.
// Bigger is stronger. The raw enum values do NOT follow strength order.
extern int srtpSuiteStrength(SrtpSuite suite);
typedef std::pair<PByteBuffer, PByteBuffer> SrtpKeySalt; typedef std::pair<PByteBuffer, PByteBuffer> SrtpKeySalt;
typedef std::pair<unsigned, srtp_policy_t> SrtpStream; typedef std::pair<unsigned, srtp_policy_t> SrtpStream;
@@ -68,8 +72,10 @@ protected:
srtp_t mInboundSession, srtp_t mInboundSession,
mOutboundSession; mOutboundSession;
// Outgoing keys are indexed by the SrtpSuite enum value directly;
// index 0 (SRTP_NONE) is unused.
SrtpKeySalt mIncomingKey, SrtpKeySalt mIncomingKey,
mOutgoingKey[SRTP_LAST]; mOutgoingKey[SRTP_LAST + 1];
srtp_policy_t mInboundPolicy; srtp_policy_t mInboundPolicy;
srtp_policy_t mOutboundPolicy; srtp_policy_t mOutboundPolicy;
SrtpSuite mSuite; SrtpSuite mSuite;
+1 -1
View File
@@ -112,7 +112,7 @@ public:
std::map<int,int> mLoss; // Every item is number of loss of corresping length std::map<int,int> mLoss; // Every item is number of loss of corresping length
std::chrono::milliseconds mAudioTime = 0ms; // Decoded/found time in milliseconds std::chrono::milliseconds mAudioTime = 0ms; // Decoded/found time in milliseconds
size_t mDecodedSize = 0; // Number of decoded bytes size_t mDecodedSize = 0; // Number of decoded bytes
uint16_t mSsrc = 0; // Last known SSRC ID in a RTP stream uint32_t mSsrc = 0; // Last known SSRC ID in a RTP stream
ice::NetworkAddress mRemotePeer; // Last known remote RTP address ice::NetworkAddress mRemotePeer; // Last known remote RTP address
// AMR codec bitrate switch counter // AMR codec bitrate switch counter
-2
View File
@@ -342,8 +342,6 @@ void Logger::beginLine(LogLevel level, const char* filename, int linenumber, con
mFilename = filenamestart; mFilename = filenamestart;
mLine = linenumber; mLine = linenumber;
mSubsystem = subsystem; mSubsystem = subsystem;
// mStream << std::setw(8) << ICETimeHelper::timestamp() << " | " << std::setw(8) << ThreadInfo::currentThread() << " | " << std::setw(30) << filenamestart << " | " << std::setw(4) << linenumber << " | " << std::setw(12) << subsystem << " | ";
} }
void void
+10 -1
View File
@@ -163,7 +163,7 @@ void NetworkHelper::reload(int networkType)
fillUwpInterfaceList(AF_INET6, networkType, mIPList); fillUwpInterfaceList(AF_INET6, networkType, mIPList);
#else #else
// https://github.com/golang/go/issues/40569 // https://github.com/golang/go/issues/40569
struct ifaddrs* il = NULL; struct ifaddrs* il = nullptr;
if (getifaddrs(&il)) if (getifaddrs(&il))
throw Exception(GETIFADDRS_FAILED, errno); throw Exception(GETIFADDRS_FAILED, errno);
if (il) if (il)
@@ -171,6 +171,15 @@ void NetworkHelper::reload(int networkType)
struct ifaddrs* current = il; struct ifaddrs* current = il;
while (current) while (current)
{ {
// getifaddrs() may return entries with a null ifa_addr (interfaces with
// no assigned address, point-to-point/tunnel interfaces, link-layer
// entries, ...). Dereferencing it would crash, so skip such entries.
if (current->ifa_addr == nullptr)
{
current = current->ifa_next;
continue;
}
//char ipbuffer[64]; //char ipbuffer[64];
NetworkAddress addr; NetworkAddress addr;
addr.setPort(1000); // Set fake address to keep NetworkAddress initialized addr.setPort(1000); // Set fake address to keep NetworkAddress initialized
+13 -1
View File
@@ -44,7 +44,19 @@
#include "rtptypes.h" #include "rtptypes.h"
#include "rtpmemoryobject.h" #include "rtpmemoryobject.h"
#define RTPSOURCES_HASHSIZE 8317 // Number of buckets in the per-RTPSession SSRC->source hash table. This is an
// inline array of pointers in every RTPSources instance (sizeof == hashsize *
// sizeof(void*)), so it is paid by every RTPSession object regardless of how many
// sources it actually tracks. The original jrtplib default (8317) targets RTP
// mixers/conferences that demultiplex thousands of distinct SSRCs on one session;
// it costs ~65 KB per session. Sevana's per-stream capture sessions carry ~1 SSRC,
// so a far smaller table is ample - collisions are resolved by linked lists, so a
// small size only affects lookup cost (negligible at our source counts), never
// correctness. Overridable at build time for products that genuinely need many
// sources per session.
#ifndef RTPSOURCES_HASHSIZE
#define RTPSOURCES_HASHSIZE 251
#endif
namespace jrtplib namespace jrtplib
{ {
+5 -1
View File
@@ -9,8 +9,12 @@
using namespace resip; using namespace resip;
std::atomic<long> Message::sInstanceCount{0};
Message::Message() : mTu(0) Message::Message() : mTu(0)
{} {
++sInstanceCount;
}
Message::Brief Message::Brief
Message::brief() const Message::brief() const
+6 -1
View File
@@ -7,6 +7,7 @@
#include "rutil/Data.hxx" #include "rutil/Data.hxx"
#include <iosfwd> #include <iosfwd>
#include <atomic>
#include "rutil/resipfaststreams.hxx" #include "rutil/resipfaststreams.hxx"
namespace resip namespace resip
@@ -21,7 +22,11 @@ class Message
{ {
public: public:
Message(); Message();
virtual ~Message() {} virtual ~Message() { --sInstanceCount; }
/// Live instance count of all Message-derived objects (leak indicator).
static std::atomic<long> sInstanceCount;
static long getInstanceCount() { return sInstanceCount.load(std::memory_order_relaxed); }
/// facet for brief output to streams /// facet for brief output to streams
class Brief class Brief
@@ -28,6 +28,8 @@ using namespace std;
bool SipMessage::checkContentLength=true; bool SipMessage::checkContentLength=true;
std::atomic<long> SipMessage::sInstanceCount{0};
SipMessage::SipMessage(const Tuple *receivedTransportTuple) SipMessage::SipMessage(const Tuple *receivedTransportTuple)
: mIsDecorated(false), : mIsDecorated(false),
mIsBadAck200(false), mIsBadAck200(false),
@@ -51,6 +53,7 @@ SipMessage::SipMessage(const Tuple *receivedTransportTuple)
// !bwc! TODO make this tunable // !bwc! TODO make this tunable
mHeaders.reserve(16); mHeaders.reserve(16);
clear(); clear();
++sInstanceCount;
} }
SipMessage::SipMessage(const SipMessage& from) SipMessage::SipMessage(const SipMessage& from)
@@ -63,6 +66,7 @@ SipMessage::SipMessage(const SipMessage& from)
mCreatedTime(Timer::getTimeMicroSec()) mCreatedTime(Timer::getTimeMicroSec())
{ {
init(from); init(from);
++sInstanceCount;
} }
Message* Message*
@@ -98,6 +102,7 @@ SipMessage::~SipMessage()
} }
#endif #endif
freeMem(); freeMem();
--sInstanceCount;
} }
void void
@@ -7,6 +7,7 @@
#include <vector> #include <vector>
#include <utility> #include <utility>
#include <memory> #include <memory>
#include <atomic>
#include "resip/stack/Contents.hxx" #include "resip/stack/Contents.hxx"
#include "resip/stack/Headers.hxx" #include "resip/stack/Headers.hxx"
@@ -154,6 +155,10 @@ class SipMessage : public TransactionMessage
{ {
public: public:
RESIP_HeapCount(SipMessage); RESIP_HeapCount(SipMessage);
/// Live instance count of SipMessage objects (leak indicator).
static std::atomic<long> sInstanceCount;
static long getInstanceCount() { return sInstanceCount.load(std::memory_order_relaxed); }
#ifndef __SUNPRO_CC #ifndef __SUNPRO_CC
typedef std::list< std::pair<Data, HeaderFieldValueList*>, StlPoolAllocator<std::pair<Data, HeaderFieldValueList*>, PoolBase > > UnknownHeaders; typedef std::list< std::pair<Data, HeaderFieldValueList*>, StlPoolAllocator<std::pair<Data, HeaderFieldValueList*>, PoolBase > > UnknownHeaders;
#else #else