From 23b4283b89355f63efe412daa74383d3c1bef8ca Mon Sep 17 00:00:00 2001 From: Dmytro Bogovych Date: Mon, 6 Sep 2021 14:21:44 +0300 Subject: [PATCH] - initial work to move decoding from audio callback --- src/engine/media/MT_AudioReceiver.cpp | 270 +++++++++++++++----------- src/engine/media/MT_AudioReceiver.h | 30 ++- src/engine/media/MT_Box.cpp | 4 +- src/engine/media/MT_Box.h | 3 +- src/engine/media/MT_Codec.h | 5 + src/engine/media/MT_Stream.cpp | 8 +- src/engine/media/MT_Stream.h | 6 +- src/libs/resiprocate/rutil/Data.hxx | 4 + 8 files changed, 199 insertions(+), 131 deletions(-) diff --git a/src/engine/media/MT_AudioReceiver.cpp b/src/engine/media/MT_AudioReceiver.cpp index 7331a1c7..a334f838 100644 --- a/src/engine/media/MT_AudioReceiver.cpp +++ b/src/engine/media/MT_AudioReceiver.cpp @@ -48,11 +48,19 @@ int RtpBuffer::Packet::rate() const return mRate; } +const std::vector& RtpBuffer::Packet::pcm() const +{ + return mPcm; +} + +std::vector& RtpBuffer::Packet::pcm() +{ + return mPcm; +} + // ------------ RtpBuffer ---------------- RtpBuffer::RtpBuffer(Statistics& stat) - :mStat(stat), mSsrc(0), mHigh(RTP_BUFFER_HIGH), mLow(RTP_BUFFER_LOW), mPrebuffer(RTP_BUFFER_PREBUFFER), - mFirstPacketWillGo(true), mReturnedCounter(0), mAddCounter(0), - mFetchedPacket(std::shared_ptr(), 0, 0) + :mStat(stat) { } @@ -97,15 +105,15 @@ int RtpBuffer::getCount() const return static_cast(mPacketList.size()); } -bool SequenceSort(const RtpBuffer::Packet& p1, const RtpBuffer::Packet& p2) +bool SequenceSort(const std::shared_ptr& p1, const std::shared_ptr& p2) { - return p1.rtp()->GetExtendedSequenceNumber() < p2.rtp()->GetExtendedSequenceNumber(); + return p1->rtp()->GetExtendedSequenceNumber() < p2->rtp()->GetExtendedSequenceNumber(); } -bool RtpBuffer::add(std::shared_ptr packet, int timelength, int rate) +std::shared_ptr RtpBuffer::add(std::shared_ptr packet, int timelength, int rate) { if (!packet) - return false; + return std::shared_ptr(); Lock l(mGuard); @@ -131,15 +139,15 @@ bool RtpBuffer::add(std::shared_ptr packet, int timelength, // New sequence number unsigned newSeqno = packet->GetExtendedSequenceNumber(); - for (Packet& p: mPacketList) + for (std::shared_ptr& p: mPacketList) { - unsigned seqno = p.rtp()->GetExtendedSequenceNumber(); + unsigned seqno = p->rtp()->GetExtendedSequenceNumber(); if (seqno == newSeqno) { mStat.mDuplicatedRtp++; ICELogMedia(<< "Discovered duplicated packet, skipping"); - return false; + return std::shared_ptr(); } if (seqno > maxno) @@ -153,8 +161,11 @@ bool RtpBuffer::add(std::shared_ptr packet, int timelength, if (newSeqno > minno || (available < mHigh)) { - Packet p(packet, timelength, rate); + // Insert into queue + auto p = std::make_shared(packet, timelength, rate); mPacketList.push_back(p); + + // Sort again std::sort(mPacketList.begin(), mPacketList.end(), SequenceSort); // Limit by max timelength @@ -162,21 +173,18 @@ bool RtpBuffer::add(std::shared_ptr packet, int timelength, if (available > mHigh) ICELogMedia(<< "Available " << available << "ms with limit " << mHigh << "ms"); - /*while (available > mHigh && mPacketList.size()) - { - ICELogDebug( << "Dropping RTP packet from jitter buffer"); - available -= mPacketList.front().timelength(); - mPacketList.erase(mPacketList.begin()); - }*/ + + return p; } else { ICELogMedia(<< "Too old packet, skipping"); mStat.mOldRtp++; - return false; + + return std::shared_ptr(); } - return true; + return std::shared_ptr(); } RtpBuffer::FetchResult RtpBuffer::fetch(ResultList& rl) @@ -192,7 +200,7 @@ RtpBuffer::FetchResult RtpBuffer::fetch(ResultList& rl) while (total > mHigh && mPacketList.size()) { ICELogMedia( << "Dropping RTP packets from jitter buffer"); - total -= mPacketList.front().timelength(); + total -= mPacketList.front()->timelength(); // Save it as last packet however - to not confuse loss packet counter mFetchedPacket = mPacketList.front(); @@ -208,7 +216,7 @@ RtpBuffer::FetchResult RtpBuffer::fetch(ResultList& rl) result = FetchResult::NoPacket; else { - if (mFetchedPacket.rtp()) + if (mFetchedPacket->rtp()) { if (mPacketList.empty()) { @@ -218,10 +226,10 @@ RtpBuffer::FetchResult RtpBuffer::fetch(ResultList& rl) else { // Current sequence number ? - unsigned seqno = mPacketList.front().rtp()->GetExtendedSequenceNumber(); + unsigned seqno = mPacketList.front()->rtp()->GetExtendedSequenceNumber(); // Gap between new packet and previous on - int gap = seqno - mFetchedPacket.rtp()->GetSequenceNumber() - 1; + int gap = seqno - mFetchedPacket->rtp()->GetSequenceNumber() - 1; gap = std::min(gap, 127); if (gap > 0 && mPacketList.empty()) { @@ -238,29 +246,7 @@ RtpBuffer::FetchResult RtpBuffer::fetch(ResultList& rl) } result = FetchResult::RegularPacket; - Packet& p = mPacketList.front(); - rl.push_back(p.rtp()); - - // Maybe it is time to replay packet right now ? For case of AMR SID packets - /*if (mFetchedPacket.rtp() && gap == 0 && mFetchedPacket.timelength() >= 10 && p.timelength() >= 10) - { - int timestampDelta; - // Timestamp difference - if (p.rtp()->GetTimestamp() > mFetchedPacket.rtp()->GetTimestamp()) - timestampDelta = TimeHelper::getDelta(p.rtp()->GetTimestamp(), mFetchedPacket.rtp()->GetTimestamp()); - else - timestampDelta = TimeHelper::getDelta(mFetchedPacket.rtp()->GetTimestamp(), p.rtp()->GetTimestamp()); - - // Timestamp units per packet - int nrOfPackets = timestampDelta / (p.timelength() * (p.rate() / 1000)); - - // Add more copies of SID (most probably) packets - for (int i = 0; i < nrOfPackets - 1; i++) - { - //assert(false); - rl.push_back(p.rtp()); - } - }*/ + rl.push_back(mPacketList.front()); // Save last returned normal packet mFetchedPacket = mPacketList.front(); @@ -279,7 +265,7 @@ RtpBuffer::FetchResult RtpBuffer::fetch(ResultList& rl) result = FetchResult::RegularPacket; // Put it to output list - rl.push_back(mPacketList.front().rtp()); + rl.push_back(mPacketList.front()); // Remember returned packet mFetchedPacket = mPacketList.front(); @@ -305,7 +291,7 @@ int RtpBuffer::findTimelength() { int available = 0; for (unsigned i = 0; i < mPacketList.size(); i++) - available += mPacketList[i].timelength(); + available += mPacketList[i]->timelength(); return available; } @@ -331,7 +317,7 @@ Receiver::~Receiver() //-------------- AudioReceiver ---------------- AudioReceiver::AudioReceiver(const CodecList::Settings& settings, MT::Statistics &stat) - :Receiver(stat), mBuffer(stat), mFrameCount(0), mFailedCount(0), mCodecSettings(settings), + :Receiver(stat), mBuffer(stat), mCodecSettings(settings), mCodecList(settings) { // Init resamplers @@ -364,6 +350,41 @@ AudioReceiver::~AudioReceiver() mDecodedDump.reset(); } +size_t decode_packet(Codec& codec, RTPPacket& p, void* output_buffer, size_t output_capacity) +{ + // How much data was produced + size_t result = 0; + + // Handle here regular RTP packets + // Check if payload length is ok + int tail = codec.rtpLength() ? p.GetPayloadLength() % codec.rtpLength() : 0; + + if (!tail) + { + // Find number of frames + int frame_count = codec.rtpLength() ? p.GetPayloadLength() / codec.rtpLength() : 1; + int frame_length = codec.rtpLength() ? codec.rtpLength() : (int)p.GetPayloadLength(); + + // Save last packet time length + // mLastPacketTimeLength = mFrameCount * mCodec->frameTime(); + + // Decode + + for (int i=0; i < frame_count; i++) + { + auto decoded_length = codec.decode(p.GetPayloadData() + i * codec.rtpLength(), + frame_length, + output_buffer, + output_capacity); + + result += decoded_length; + } + } + else + ICELogMedia(<< "RTP packet with tail."); + + return result; +} bool AudioReceiver::add(const std::shared_ptr& p, Codec** codec) { @@ -421,7 +442,22 @@ bool AudioReceiver::add(const std::shared_ptr& p, Codec** co } // Queue packet to buffer - return mBuffer.add(p, time_length, codecIter->second->samplerate()); + auto packet = mBuffer.add(p, time_length, codecIter->second->samplerate()).get(); + + if (packet) + { + // Check if early decoding configured + if (mEarlyDecode && *codec) + { + // Move data to packet buffer + size_t available = decode_packet(**codec, *p, mDecodedFrame, sizeof mDecodedFrame); + packet->pcm().resize(available / 2); + memcpy(packet->pcm().data(), mDecodedFrame, available / 2); + } + return true; + } + else + return false; } void AudioReceiver::processDecoded(Audio::DataWindow& output, int options) @@ -446,7 +482,7 @@ void AudioReceiver::processDecoded(Audio::DataWindow& output, int options) bool AudioReceiver::getAudio(Audio::DataWindow& output, int options, int* rate) { - bool result = false, had_cng = false, had_decode = false; + bool result = false, /*had_cng = false, */had_decode = false; // Get next packet from buffer RtpBuffer::ResultList rl; @@ -493,14 +529,14 @@ bool AudioReceiver::getAudio(Audio::DataWindow& output, int options, int* rate) case RtpBuffer::FetchResult::RegularPacket: mFailedCount = 0; - for (std::shared_ptr& p: rl) + for (std::shared_ptr& p: rl) { assert(p); // Check if previously CNG packet was detected. Emit CNG audio here if needed. if (options & DecodeOptions_FillCngGap && mCngPacket && mCodec) { // Fill CNG audio is server mode is present - int units = p->GetTimestamp() - mCngPacket->GetTimestamp(); + int units = p->rtp()->GetTimestamp() - mCngPacket->GetTimestamp(); int milliseconds = units / (mCodec->samplerate() / 1000); if (milliseconds > mLastPacketTimeLength) { @@ -533,73 +569,83 @@ bool AudioReceiver::getAudio(Audio::DataWindow& output, int options, int* rate) } } - // Find codec - mCodec = mCodecMap[p->GetPayloadType()]; - if (mCodec) + if (mEarlyDecode) { - if (rate) - *rate = mCodec->samplerate(); + // ToDo - copy the decoded data to output buffer - // Check if it is CNG packet - if ((p->GetPayloadType() == 0 || p->GetPayloadType() == 8) && p->GetPayloadLength() >= 1 && p->GetPayloadLength() <= 6) + } + else + { + // Find codec by payload type + int ptype = p->rtp()->GetPayloadType(); + mCodec = mCodecMap[ptype]; + if (mCodec) { - if (options & DecodeOptions_SkipDecode) - mDecodedLength = 0; - else + if (rate) + *rate = mCodec->samplerate(); + + // Check if it is CNG packet + if ((ptype == 0 || ptype == 8) && p->rtp()->GetPayloadLength() >= 1 && p->rtp()->GetPayloadLength() <= 6) { - mCngPacket = p; - mCngDecoder.decode3389(p->GetPayloadData(), p->GetPayloadLength()); - // Emit CNG mLastPacketLength milliseconds - mDecodedLength = mCngDecoder.produce(mCodec->samplerate(), mLastPacketTimeLength, - (short*)mDecodedFrame, true); - if (mDecodedLength) - processDecoded(output, options); - } - result = true; - } - else - { - // Reset CNG packet - mCngPacket.reset(); - - // Handle here regular RTP packets - // Check if payload length is ok - int tail = mCodec->rtpLength() ? p->GetPayloadLength() % mCodec->rtpLength() : 0; - - if (!tail) - { - // Find number of frames - mFrameCount = mCodec->rtpLength() ? p->GetPayloadLength() / mCodec->rtpLength() : 1; - int frameLength = mCodec->rtpLength() ? mCodec->rtpLength() : (int)p->GetPayloadLength(); - - // Save last packet time length - mLastPacketTimeLength = mFrameCount * mCodec->frameTime(); - - // Decode - for (int i=0; idecode(p->GetPayloadData() + i * mCodec->rtpLength(), - frameLength, mDecodedFrame, sizeof mDecodedFrame); - // mDecodedLength = 3840; // Opus 20 ms stereo - if (mDecodedLength) - processDecoded(output, options); - } + mCngPacket = p->rtp(); + mCngDecoder.decode3389(p->rtp()->GetPayloadData(), p->rtp()->GetPayloadLength()); + // Emit CNG mLastPacketLength milliseconds + mDecodedLength = mCngDecoder.produce(mCodec->samplerate(), mLastPacketTimeLength, + (short*)mDecodedFrame, true); + if (mDecodedLength) + processDecoded(output, options); } - result = mFrameCount > 0; - - // Check for bitrate counter - processStatisticsWithAmrCodec(mCodec.get()); + result = true; } else - ICELogMedia(<< "RTP packet with tail."); + { + // Reset CNG packet + mCngPacket.reset(); + + // Handle here regular RTP packets + // Check if payload length is ok + int tail = mCodec->rtpLength() ? p->rtp()->GetPayloadLength() % mCodec->rtpLength() : 0; + + if (!tail) + { + // Find number of frames + mFrameCount = mCodec->rtpLength() ? p->rtp()->GetPayloadLength() / mCodec->rtpLength() : 1; + int frameLength = mCodec->rtpLength() ? mCodec->rtpLength() : (int)p->rtp()->GetPayloadLength(); + + // Save last packet time length + mLastPacketTimeLength = mFrameCount * mCodec->frameTime(); + + // Decode + for (int i=0; idecode(p->rtp()->GetPayloadData() + i * mCodec->rtpLength(), + frameLength, mDecodedFrame, sizeof mDecodedFrame); + // mDecodedLength = 3840; // Opus 20 ms stereo + if (mDecodedLength) + processDecoded(output, options); + } + } + result = mFrameCount > 0; + + // Check for bitrate counter + processStatisticsWithAmrCodec(mCodec.get()); + } + else + ICELogMedia(<< "RTP packet with tail."); + + } } } } diff --git a/src/engine/media/MT_AudioReceiver.h b/src/engine/media/MT_AudioReceiver.h index f55e027c..b1f9f7e3 100644 --- a/src/engine/media/MT_AudioReceiver.h +++ b/src/engine/media/MT_AudioReceiver.h @@ -47,12 +47,17 @@ namespace MT public: Packet(const std::shared_ptr& packet, int timelen, int rate); std::shared_ptr rtp() const; + int timelength() const; int rate() const; + const std::vector& pcm() const; + std::vector& pcm(); + protected: std::shared_ptr mRtp; int mTimelength = 0, mRate = 0; + std::vector mPcm; }; RtpBuffer(Statistics& stat); @@ -77,24 +82,28 @@ namespace MT int getCount() const; // Returns false if packet was not add - maybe too old or too new or duplicate - bool add(std::shared_ptr packet, int timelength, int rate); + std::shared_ptr add(std::shared_ptr packet, int timelength, int rate); - typedef std::vector> ResultList; + typedef std::vector> ResultList; typedef std::shared_ptr PResultList; FetchResult fetch(ResultList& rl); protected: - unsigned mSsrc; - int mHigh, mLow, mPrebuffer; - int mReturnedCounter, mAddCounter; + unsigned mSsrc = 0; + int mHigh = RTP_BUFFER_HIGH, + mLow = RTP_BUFFER_LOW, + mPrebuffer = RTP_BUFFER_PREBUFFER; + int mReturnedCounter = 0, + mAddCounter = 0; + mutable Mutex mGuard; - typedef std::vector PacketList; + typedef std::vector> PacketList; PacketList mPacketList; Statistics& mStat; - bool mFirstPacketWillGo; + bool mFirstPacketWillGo = true; jrtplib::RTPSourceStats mRtpStats; - Packet mFetchedPacket; + std::shared_ptr mFetchedPacket; // To calculate average interval between packet add. It is close to jitter but more useful in debugging. float mLastAddTime = 0.0; @@ -155,6 +164,9 @@ namespace MT std::shared_ptr mCngPacket; CngDecoder mCngDecoder; + // Decode RTP early, do not wait for speaker callback + bool mEarlyDecode = false; + // Buffer to hold decoded data char mDecodedFrame[65536]; int mDecodedLength = 0; @@ -170,7 +182,7 @@ namespace MT // Last packet time length int mLastPacketTimeLength = 0; - int mFailedCount; + int mFailedCount = 0; Audio::Resampler mResampler8, mResampler16, mResampler32, mResampler48; diff --git a/src/engine/media/MT_Box.cpp b/src/engine/media/MT_Box.cpp index 6c0a2df5..39609402 100644 --- a/src/engine/media/MT_Box.cpp +++ b/src/engine/media/MT_Box.cpp @@ -34,7 +34,7 @@ Terminal::~Terminal() mAudioPair.reset(); } -PStream Terminal::createStream(int type, VariantMap& config) +PStream Terminal::createStream(int type, VariantMap& /*config*/) { PStream result; switch (type) @@ -52,7 +52,7 @@ PStream Terminal::createStream(int type, VariantMap& config) return result; } -void Terminal::freeStream(PStream stream) +void Terminal::freeStream(const PStream& stream) { if (AudioStream* audio = dynamic_cast(stream.get())) { diff --git a/src/engine/media/MT_Box.h b/src/engine/media/MT_Box.h index 72f5afa1..b4c33e54 100644 --- a/src/engine/media/MT_Box.h +++ b/src/engine/media/MT_Box.h @@ -26,10 +26,11 @@ namespace MT CodecList& codeclist(); PStream createStream(int type, VariantMap& config); - void freeStream(PStream s); + void freeStream(const PStream& s); Audio::PDevicePair audio(); void setAudio(const Audio::PDevicePair& audio); + protected: StreamList mAudioList; std::mutex mAudioListMutex; diff --git a/src/engine/media/MT_Codec.h b/src/engine/media/MT_Codec.h index 97d2d41f..8ddcf576 100644 --- a/src/engine/media/MT_Codec.h +++ b/src/engine/media/MT_Codec.h @@ -63,8 +63,13 @@ public: virtual int channels() { return 1; } + // Returns size of encoded data (RTP) in bytes virtual int encode(const void* input, int inputBytes, void* output, int outputCapacity) = 0; + + // Returns size of decoded data (PCM signed short) in bytes virtual int decode(const void* input, int inputBytes, void* output, int outputCapacity) = 0; + + // Returns size of produced data (PCM signed short) in bytes virtual int plc(int lostFrames, void* output, int outputCapacity) = 0; // Returns size of codec in memory diff --git a/src/engine/media/MT_Stream.cpp b/src/engine/media/MT_Stream.cpp index 05a5350c..c09fbcb3 100644 --- a/src/engine/media/MT_Stream.cpp +++ b/src/engine/media/MT_Stream.cpp @@ -77,13 +77,13 @@ StreamList::~StreamList() clear(); } -void StreamList::add(PStream s) +void StreamList::add(const PStream& s) { Lock l(mMutex); mStreamVector.push_back(s); } -void StreamList::remove(PStream s) +void StreamList::remove(const PStream& s) { Lock l(mMutex); @@ -98,7 +98,7 @@ void StreamList::clear() mStreamVector.clear(); } -bool StreamList::has(PStream s) +bool StreamList::has(const PStream& s) { Lock l(mMutex); return std::find(mStreamVector.begin(), mStreamVector.end(), s) != mStreamVector.end(); @@ -127,4 +127,4 @@ void StreamList::copyTo(StreamList* sl) Mutex& StreamList::getMutex() { return mMutex; -} \ No newline at end of file +} diff --git a/src/engine/media/MT_Stream.h b/src/engine/media/MT_Stream.h index e517de9f..9be3f9ea 100644 --- a/src/engine/media/MT_Stream.h +++ b/src/engine/media/MT_Stream.h @@ -88,10 +88,10 @@ namespace MT StreamList(); ~StreamList(); - void add(PStream s); - void remove(PStream s); + void add(const PStream& s); + void remove(const PStream& s); void clear(); - bool has(PStream s); + bool has(const PStream& s); int size(); PStream streamAt(int index); diff --git a/src/libs/resiprocate/rutil/Data.hxx b/src/libs/resiprocate/rutil/Data.hxx index c43b2445..0297a397 100644 --- a/src/libs/resiprocate/rutil/Data.hxx +++ b/src/libs/resiprocate/rutil/Data.hxx @@ -1081,6 +1081,10 @@ inline bool operator!=(const char* lhs, const Data& rhs) { return !(rhs == lhs); inline bool operator>(const char* lhs, const Data& rhs) { return rhs < lhs; } inline bool operator<=(const char* lhs, const Data& rhs) { return !(rhs < lhs); } inline bool operator>=(const char* lhs, const Data& rhs) { return !(lhs < rhs); } + +extern bool operator==(const Data& lhs, const Data& rhs); +extern bool operator<(const Data& lhs, const Data& rhs); + #ifndef RESIP_USE_STL_STREAMS EncodeStream& operator<<(EncodeStream& strm, const Data& d); #endif