- initial work to move decoding from audio callback

This commit is contained in:
Dmytro Bogovych 2021-09-06 14:21:44 +03:00
parent ead9979db7
commit 23b4283b89
8 changed files with 199 additions and 131 deletions

View File

@ -48,11 +48,19 @@ int RtpBuffer::Packet::rate() const
return mRate;
}
const std::vector<short>& RtpBuffer::Packet::pcm() const
{
return mPcm;
}
std::vector<short>& RtpBuffer::Packet::pcm()
{
return mPcm;
}
// ------------ RtpBuffer ----------------
RtpBuffer::RtpBuffer(Statistics& stat)
:mStat(stat), mSsrc(0), mHigh(RTP_BUFFER_HIGH), mLow(RTP_BUFFER_LOW), mPrebuffer(RTP_BUFFER_PREBUFFER),
mFirstPacketWillGo(true), mReturnedCounter(0), mAddCounter(0),
mFetchedPacket(std::shared_ptr<RTPPacket>(), 0, 0)
:mStat(stat)
{
}
@ -97,15 +105,15 @@ int RtpBuffer::getCount() const
return static_cast<int>(mPacketList.size());
}
bool SequenceSort(const RtpBuffer::Packet& p1, const RtpBuffer::Packet& p2)
bool SequenceSort(const std::shared_ptr<RtpBuffer::Packet>& p1, const std::shared_ptr<RtpBuffer::Packet>& p2)
{
return p1.rtp()->GetExtendedSequenceNumber() < p2.rtp()->GetExtendedSequenceNumber();
return p1->rtp()->GetExtendedSequenceNumber() < p2->rtp()->GetExtendedSequenceNumber();
}
bool RtpBuffer::add(std::shared_ptr<jrtplib::RTPPacket> packet, int timelength, int rate)
std::shared_ptr<RtpBuffer::Packet> RtpBuffer::add(std::shared_ptr<jrtplib::RTPPacket> packet, int timelength, int rate)
{
if (!packet)
return false;
return std::shared_ptr<Packet>();
Lock l(mGuard);
@ -131,15 +139,15 @@ bool RtpBuffer::add(std::shared_ptr<jrtplib::RTPPacket> packet, int timelength,
// New sequence number
unsigned newSeqno = packet->GetExtendedSequenceNumber();
for (Packet& p: mPacketList)
for (std::shared_ptr<Packet>& p: mPacketList)
{
unsigned seqno = p.rtp()->GetExtendedSequenceNumber();
unsigned seqno = p->rtp()->GetExtendedSequenceNumber();
if (seqno == newSeqno)
{
mStat.mDuplicatedRtp++;
ICELogMedia(<< "Discovered duplicated packet, skipping");
return false;
return std::shared_ptr<Packet>();
}
if (seqno > maxno)
@ -153,8 +161,11 @@ bool RtpBuffer::add(std::shared_ptr<jrtplib::RTPPacket> packet, int timelength,
if (newSeqno > minno || (available < mHigh))
{
Packet p(packet, timelength, rate);
// Insert into queue
auto p = std::make_shared<Packet>(packet, timelength, rate);
mPacketList.push_back(p);
// Sort again
std::sort(mPacketList.begin(), mPacketList.end(), SequenceSort);
// Limit by max timelength
@ -162,21 +173,18 @@ bool RtpBuffer::add(std::shared_ptr<jrtplib::RTPPacket> packet, int timelength,
if (available > mHigh)
ICELogMedia(<< "Available " << available << "ms with limit " << mHigh << "ms");
/*while (available > mHigh && mPacketList.size())
{
ICELogDebug( << "Dropping RTP packet from jitter buffer");
available -= mPacketList.front().timelength();
mPacketList.erase(mPacketList.begin());
}*/
return p;
}
else
{
ICELogMedia(<< "Too old packet, skipping");
mStat.mOldRtp++;
return false;
return std::shared_ptr<Packet>();
}
return true;
return std::shared_ptr<Packet>();
}
RtpBuffer::FetchResult RtpBuffer::fetch(ResultList& rl)
@ -192,7 +200,7 @@ RtpBuffer::FetchResult RtpBuffer::fetch(ResultList& rl)
while (total > mHigh && mPacketList.size())
{
ICELogMedia( << "Dropping RTP packets from jitter buffer");
total -= mPacketList.front().timelength();
total -= mPacketList.front()->timelength();
// Save it as last packet however - to not confuse loss packet counter
mFetchedPacket = mPacketList.front();
@ -208,7 +216,7 @@ RtpBuffer::FetchResult RtpBuffer::fetch(ResultList& rl)
result = FetchResult::NoPacket;
else
{
if (mFetchedPacket.rtp())
if (mFetchedPacket->rtp())
{
if (mPacketList.empty())
{
@ -218,10 +226,10 @@ RtpBuffer::FetchResult RtpBuffer::fetch(ResultList& rl)
else
{
// Current sequence number ?
unsigned seqno = mPacketList.front().rtp()->GetExtendedSequenceNumber();
unsigned seqno = mPacketList.front()->rtp()->GetExtendedSequenceNumber();
// Gap between new packet and previous on
int gap = seqno - mFetchedPacket.rtp()->GetSequenceNumber() - 1;
int gap = seqno - mFetchedPacket->rtp()->GetSequenceNumber() - 1;
gap = std::min(gap, 127);
if (gap > 0 && mPacketList.empty())
{
@ -238,29 +246,7 @@ RtpBuffer::FetchResult RtpBuffer::fetch(ResultList& rl)
}
result = FetchResult::RegularPacket;
Packet& p = mPacketList.front();
rl.push_back(p.rtp());
// Maybe it is time to replay packet right now ? For case of AMR SID packets
/*if (mFetchedPacket.rtp() && gap == 0 && mFetchedPacket.timelength() >= 10 && p.timelength() >= 10)
{
int timestampDelta;
// Timestamp difference
if (p.rtp()->GetTimestamp() > mFetchedPacket.rtp()->GetTimestamp())
timestampDelta = TimeHelper::getDelta(p.rtp()->GetTimestamp(), mFetchedPacket.rtp()->GetTimestamp());
else
timestampDelta = TimeHelper::getDelta(mFetchedPacket.rtp()->GetTimestamp(), p.rtp()->GetTimestamp());
// Timestamp units per packet
int nrOfPackets = timestampDelta / (p.timelength() * (p.rate() / 1000));
// Add more copies of SID (most probably) packets
for (int i = 0; i < nrOfPackets - 1; i++)
{
//assert(false);
rl.push_back(p.rtp());
}
}*/
rl.push_back(mPacketList.front());
// Save last returned normal packet
mFetchedPacket = mPacketList.front();
@ -279,7 +265,7 @@ RtpBuffer::FetchResult RtpBuffer::fetch(ResultList& rl)
result = FetchResult::RegularPacket;
// Put it to output list
rl.push_back(mPacketList.front().rtp());
rl.push_back(mPacketList.front());
// Remember returned packet
mFetchedPacket = mPacketList.front();
@ -305,7 +291,7 @@ int RtpBuffer::findTimelength()
{
int available = 0;
for (unsigned i = 0; i < mPacketList.size(); i++)
available += mPacketList[i].timelength();
available += mPacketList[i]->timelength();
return available;
}
@ -331,7 +317,7 @@ Receiver::~Receiver()
//-------------- AudioReceiver ----------------
AudioReceiver::AudioReceiver(const CodecList::Settings& settings, MT::Statistics &stat)
:Receiver(stat), mBuffer(stat), mFrameCount(0), mFailedCount(0), mCodecSettings(settings),
:Receiver(stat), mBuffer(stat), mCodecSettings(settings),
mCodecList(settings)
{
// Init resamplers
@ -364,6 +350,41 @@ AudioReceiver::~AudioReceiver()
mDecodedDump.reset();
}
size_t decode_packet(Codec& codec, RTPPacket& p, void* output_buffer, size_t output_capacity)
{
// How much data was produced
size_t result = 0;
// Handle here regular RTP packets
// Check if payload length is ok
int tail = codec.rtpLength() ? p.GetPayloadLength() % codec.rtpLength() : 0;
if (!tail)
{
// Find number of frames
int frame_count = codec.rtpLength() ? p.GetPayloadLength() / codec.rtpLength() : 1;
int frame_length = codec.rtpLength() ? codec.rtpLength() : (int)p.GetPayloadLength();
// Save last packet time length
// mLastPacketTimeLength = mFrameCount * mCodec->frameTime();
// Decode
for (int i=0; i < frame_count; i++)
{
auto decoded_length = codec.decode(p.GetPayloadData() + i * codec.rtpLength(),
frame_length,
output_buffer,
output_capacity);
result += decoded_length;
}
}
else
ICELogMedia(<< "RTP packet with tail.");
return result;
}
bool AudioReceiver::add(const std::shared_ptr<jrtplib::RTPPacket>& p, Codec** codec)
{
@ -421,7 +442,22 @@ bool AudioReceiver::add(const std::shared_ptr<jrtplib::RTPPacket>& p, Codec** co
}
// Queue packet to buffer
return mBuffer.add(p, time_length, codecIter->second->samplerate());
auto packet = mBuffer.add(p, time_length, codecIter->second->samplerate()).get();
if (packet)
{
// Check if early decoding configured
if (mEarlyDecode && *codec)
{
// Move data to packet buffer
size_t available = decode_packet(**codec, *p, mDecodedFrame, sizeof mDecodedFrame);
packet->pcm().resize(available / 2);
memcpy(packet->pcm().data(), mDecodedFrame, available / 2);
}
return true;
}
else
return false;
}
void AudioReceiver::processDecoded(Audio::DataWindow& output, int options)
@ -446,7 +482,7 @@ void AudioReceiver::processDecoded(Audio::DataWindow& output, int options)
bool AudioReceiver::getAudio(Audio::DataWindow& output, int options, int* rate)
{
bool result = false, had_cng = false, had_decode = false;
bool result = false, /*had_cng = false, */had_decode = false;
// Get next packet from buffer
RtpBuffer::ResultList rl;
@ -493,14 +529,14 @@ bool AudioReceiver::getAudio(Audio::DataWindow& output, int options, int* rate)
case RtpBuffer::FetchResult::RegularPacket:
mFailedCount = 0;
for (std::shared_ptr<RTPPacket>& p: rl)
for (std::shared_ptr<RtpBuffer::Packet>& p: rl)
{
assert(p);
// Check if previously CNG packet was detected. Emit CNG audio here if needed.
if (options & DecodeOptions_FillCngGap && mCngPacket && mCodec)
{
// Fill CNG audio is server mode is present
int units = p->GetTimestamp() - mCngPacket->GetTimestamp();
int units = p->rtp()->GetTimestamp() - mCngPacket->GetTimestamp();
int milliseconds = units / (mCodec->samplerate() / 1000);
if (milliseconds > mLastPacketTimeLength)
{
@ -533,73 +569,83 @@ bool AudioReceiver::getAudio(Audio::DataWindow& output, int options, int* rate)
}
}
// Find codec
mCodec = mCodecMap[p->GetPayloadType()];
if (mCodec)
if (mEarlyDecode)
{
if (rate)
*rate = mCodec->samplerate();
// ToDo - copy the decoded data to output buffer
// Check if it is CNG packet
if ((p->GetPayloadType() == 0 || p->GetPayloadType() == 8) && p->GetPayloadLength() >= 1 && p->GetPayloadLength() <= 6)
}
else
{
// Find codec by payload type
int ptype = p->rtp()->GetPayloadType();
mCodec = mCodecMap[ptype];
if (mCodec)
{
if (options & DecodeOptions_SkipDecode)
mDecodedLength = 0;
else
if (rate)
*rate = mCodec->samplerate();
// Check if it is CNG packet
if ((ptype == 0 || ptype == 8) && p->rtp()->GetPayloadLength() >= 1 && p->rtp()->GetPayloadLength() <= 6)
{
mCngPacket = p;
mCngDecoder.decode3389(p->GetPayloadData(), p->GetPayloadLength());
// Emit CNG mLastPacketLength milliseconds
mDecodedLength = mCngDecoder.produce(mCodec->samplerate(), mLastPacketTimeLength,
(short*)mDecodedFrame, true);
if (mDecodedLength)
processDecoded(output, options);
}
result = true;
}
else
{
// Reset CNG packet
mCngPacket.reset();
// Handle here regular RTP packets
// Check if payload length is ok
int tail = mCodec->rtpLength() ? p->GetPayloadLength() % mCodec->rtpLength() : 0;
if (!tail)
{
// Find number of frames
mFrameCount = mCodec->rtpLength() ? p->GetPayloadLength() / mCodec->rtpLength() : 1;
int frameLength = mCodec->rtpLength() ? mCodec->rtpLength() : (int)p->GetPayloadLength();
// Save last packet time length
mLastPacketTimeLength = mFrameCount * mCodec->frameTime();
// Decode
for (int i=0; i<mFrameCount && !mCodecSettings.mSkipDecode; i++)
if (options & DecodeOptions_SkipDecode)
mDecodedLength = 0;
else
{
if (options & DecodeOptions_SkipDecode)
mDecodedLength = 0;
else
{
// Trigger the statistics
had_decode = true;
// Decode frame by frame
mDecodedLength = mCodec->decode(p->GetPayloadData() + i * mCodec->rtpLength(),
frameLength, mDecodedFrame, sizeof mDecodedFrame);
// mDecodedLength = 3840; // Opus 20 ms stereo
if (mDecodedLength)
processDecoded(output, options);
}
mCngPacket = p->rtp();
mCngDecoder.decode3389(p->rtp()->GetPayloadData(), p->rtp()->GetPayloadLength());
// Emit CNG mLastPacketLength milliseconds
mDecodedLength = mCngDecoder.produce(mCodec->samplerate(), mLastPacketTimeLength,
(short*)mDecodedFrame, true);
if (mDecodedLength)
processDecoded(output, options);
}
result = mFrameCount > 0;
// Check for bitrate counter
processStatisticsWithAmrCodec(mCodec.get());
result = true;
}
else
ICELogMedia(<< "RTP packet with tail.");
{
// Reset CNG packet
mCngPacket.reset();
// Handle here regular RTP packets
// Check if payload length is ok
int tail = mCodec->rtpLength() ? p->rtp()->GetPayloadLength() % mCodec->rtpLength() : 0;
if (!tail)
{
// Find number of frames
mFrameCount = mCodec->rtpLength() ? p->rtp()->GetPayloadLength() / mCodec->rtpLength() : 1;
int frameLength = mCodec->rtpLength() ? mCodec->rtpLength() : (int)p->rtp()->GetPayloadLength();
// Save last packet time length
mLastPacketTimeLength = mFrameCount * mCodec->frameTime();
// Decode
for (int i=0; i<mFrameCount && !mCodecSettings.mSkipDecode; i++)
{
if (options & DecodeOptions_SkipDecode)
mDecodedLength = 0;
else
{
// Trigger the statistics
had_decode = true;
// Decode frame by frame
mDecodedLength = mCodec->decode(p->rtp()->GetPayloadData() + i * mCodec->rtpLength(),
frameLength, mDecodedFrame, sizeof mDecodedFrame);
// mDecodedLength = 3840; // Opus 20 ms stereo
if (mDecodedLength)
processDecoded(output, options);
}
}
result = mFrameCount > 0;
// Check for bitrate counter
processStatisticsWithAmrCodec(mCodec.get());
}
else
ICELogMedia(<< "RTP packet with tail.");
}
}
}
}

View File

@ -47,12 +47,17 @@ namespace MT
public:
Packet(const std::shared_ptr<RTPPacket>& packet, int timelen, int rate);
std::shared_ptr<RTPPacket> rtp() const;
int timelength() const;
int rate() const;
const std::vector<short>& pcm() const;
std::vector<short>& pcm();
protected:
std::shared_ptr<RTPPacket> mRtp;
int mTimelength = 0, mRate = 0;
std::vector<short> mPcm;
};
RtpBuffer(Statistics& stat);
@ -77,24 +82,28 @@ namespace MT
int getCount() const;
// Returns false if packet was not add - maybe too old or too new or duplicate
bool add(std::shared_ptr<RTPPacket> packet, int timelength, int rate);
std::shared_ptr<Packet> add(std::shared_ptr<RTPPacket> packet, int timelength, int rate);
typedef std::vector<std::shared_ptr<RTPPacket>> ResultList;
typedef std::vector<std::shared_ptr<Packet>> ResultList;
typedef std::shared_ptr<ResultList> PResultList;
FetchResult fetch(ResultList& rl);
protected:
unsigned mSsrc;
int mHigh, mLow, mPrebuffer;
int mReturnedCounter, mAddCounter;
unsigned mSsrc = 0;
int mHigh = RTP_BUFFER_HIGH,
mLow = RTP_BUFFER_LOW,
mPrebuffer = RTP_BUFFER_PREBUFFER;
int mReturnedCounter = 0,
mAddCounter = 0;
mutable Mutex mGuard;
typedef std::vector<Packet> PacketList;
typedef std::vector<std::shared_ptr<Packet>> PacketList;
PacketList mPacketList;
Statistics& mStat;
bool mFirstPacketWillGo;
bool mFirstPacketWillGo = true;
jrtplib::RTPSourceStats mRtpStats;
Packet mFetchedPacket;
std::shared_ptr<Packet> mFetchedPacket;
// To calculate average interval between packet add. It is close to jitter but more useful in debugging.
float mLastAddTime = 0.0;
@ -155,6 +164,9 @@ namespace MT
std::shared_ptr<jrtplib::RTPPacket> mCngPacket;
CngDecoder mCngDecoder;
// Decode RTP early, do not wait for speaker callback
bool mEarlyDecode = false;
// Buffer to hold decoded data
char mDecodedFrame[65536];
int mDecodedLength = 0;
@ -170,7 +182,7 @@ namespace MT
// Last packet time length
int mLastPacketTimeLength = 0;
int mFailedCount;
int mFailedCount = 0;
Audio::Resampler mResampler8, mResampler16,
mResampler32, mResampler48;

View File

@ -34,7 +34,7 @@ Terminal::~Terminal()
mAudioPair.reset();
}
PStream Terminal::createStream(int type, VariantMap& config)
PStream Terminal::createStream(int type, VariantMap& /*config*/)
{
PStream result;
switch (type)
@ -52,7 +52,7 @@ PStream Terminal::createStream(int type, VariantMap& config)
return result;
}
void Terminal::freeStream(PStream stream)
void Terminal::freeStream(const PStream& stream)
{
if (AudioStream* audio = dynamic_cast<AudioStream*>(stream.get()))
{

View File

@ -26,10 +26,11 @@ namespace MT
CodecList& codeclist();
PStream createStream(int type, VariantMap& config);
void freeStream(PStream s);
void freeStream(const PStream& s);
Audio::PDevicePair audio();
void setAudio(const Audio::PDevicePair& audio);
protected:
StreamList mAudioList;
std::mutex mAudioListMutex;

View File

@ -63,8 +63,13 @@ public:
virtual int channels() { return 1; }
// Returns size of encoded data (RTP) in bytes
virtual int encode(const void* input, int inputBytes, void* output, int outputCapacity) = 0;
// Returns size of decoded data (PCM signed short) in bytes
virtual int decode(const void* input, int inputBytes, void* output, int outputCapacity) = 0;
// Returns size of produced data (PCM signed short) in bytes
virtual int plc(int lostFrames, void* output, int outputCapacity) = 0;
// Returns size of codec in memory

View File

@ -77,13 +77,13 @@ StreamList::~StreamList()
clear();
}
void StreamList::add(PStream s)
void StreamList::add(const PStream& s)
{
Lock l(mMutex);
mStreamVector.push_back(s);
}
void StreamList::remove(PStream s)
void StreamList::remove(const PStream& s)
{
Lock l(mMutex);
@ -98,7 +98,7 @@ void StreamList::clear()
mStreamVector.clear();
}
bool StreamList::has(PStream s)
bool StreamList::has(const PStream& s)
{
Lock l(mMutex);
return std::find(mStreamVector.begin(), mStreamVector.end(), s) != mStreamVector.end();
@ -127,4 +127,4 @@ void StreamList::copyTo(StreamList* sl)
Mutex& StreamList::getMutex()
{
return mMutex;
}
}

View File

@ -88,10 +88,10 @@ namespace MT
StreamList();
~StreamList();
void add(PStream s);
void remove(PStream s);
void add(const PStream& s);
void remove(const PStream& s);
void clear();
bool has(PStream s);
bool has(const PStream& s);
int size();
PStream streamAt(int index);

View File

@ -1081,6 +1081,10 @@ inline bool operator!=(const char* lhs, const Data& rhs) { return !(rhs == lhs);
inline bool operator>(const char* lhs, const Data& rhs) { return rhs < lhs; }
inline bool operator<=(const char* lhs, const Data& rhs) { return !(rhs < lhs); }
inline bool operator>=(const char* lhs, const Data& rhs) { return !(lhs < rhs); }
extern bool operator==(const Data& lhs, const Data& rhs);
extern bool operator<(const Data& lhs, const Data& rhs);
#ifndef RESIP_USE_STL_STREAMS
EncodeStream& operator<<(EncodeStream& strm, const Data& d);
#endif