pull/1624/head
Isaac Connor 2016-06-21 12:26:08 -04:00
parent 6a8db582ff
commit ce22e6534e
1 changed files with 296 additions and 296 deletions

View File

@ -27,374 +27,374 @@
#if HAVE_LIBAVCODEC #if HAVE_LIBAVCODEC
RtpSource::RtpSource( int id, const std::string &localHost, int localPortBase, const std::string &remoteHost, int remotePortBase, uint32_t ssrc, uint16_t seq, uint32_t rtpClock, uint32_t rtpTime, _AVCODECID codecId ) : RtpSource::RtpSource( int id, const std::string &localHost, int localPortBase, const std::string &remoteHost, int remotePortBase, uint32_t ssrc, uint16_t seq, uint32_t rtpClock, uint32_t rtpTime, _AVCODECID codecId ) :
mId( id ), mId( id ),
mSsrc( ssrc ), mSsrc( ssrc ),
mLocalHost( localHost ), mLocalHost( localHost ),
mRemoteHost( remoteHost ), mRemoteHost( remoteHost ),
mRtpClock( rtpClock ), mRtpClock( rtpClock ),
mCodecId( codecId ), mCodecId( codecId ),
mFrame( 65536 ), mFrame( 65536 ),
mFrameCount( 0 ), mFrameCount( 0 ),
mFrameGood( true ), mFrameGood( true ),
mFrameReady( false ), mFrameReady( false ),
mFrameProcessed( false ) mFrameProcessed( false )
{ {
char hostname[256] = ""; char hostname[256] = "";
gethostname( hostname, sizeof(hostname) ); gethostname( hostname, sizeof(hostname) );
mCname = stringtf( "zm-%d@%s", mId, hostname ); mCname = stringtf( "zm-%d@%s", mId, hostname );
Debug( 3, "RTP CName = %s", mCname.c_str() ); Debug( 3, "RTP CName = %s", mCname.c_str() );
init( seq ); init( seq );
mMaxSeq = seq - 1; mMaxSeq = seq - 1;
mProbation = MIN_SEQUENTIAL; mProbation = MIN_SEQUENTIAL;
mLocalPortChans[0] = localPortBase; mLocalPortChans[0] = localPortBase;
mLocalPortChans[1] = localPortBase+1; mLocalPortChans[1] = localPortBase+1;
mRemotePortChans[0] = remotePortBase; mRemotePortChans[0] = remotePortBase;
mRemotePortChans[1] = remotePortBase+1; mRemotePortChans[1] = remotePortBase+1;
mRtpFactor = mRtpClock; mRtpFactor = mRtpClock;
mBaseTimeReal = tvNow(); mBaseTimeReal = tvNow();
mBaseTimeNtp = tvZero(); mBaseTimeNtp = tvZero();
mBaseTimeRtp = rtpTime; mBaseTimeRtp = rtpTime;
mLastSrTimeReal = tvZero(); mLastSrTimeReal = tvZero();
mLastSrTimeNtp = tvZero(); mLastSrTimeNtp = tvZero();
mLastSrTimeRtp = 0; mLastSrTimeRtp = 0;
if(mCodecId != AV_CODEC_ID_H264 && mCodecId != AV_CODEC_ID_MPEG4) if(mCodecId != AV_CODEC_ID_H264 && mCodecId != AV_CODEC_ID_MPEG4)
Warning( "The device is using a codec that may not be supported. Do not be surprised if things don't work." ); Warning( "The device is using a codec that may not be supported. Do not be surprised if things don't work." );
} }
void RtpSource::init( uint16_t seq ) void RtpSource::init( uint16_t seq )
{ {
Debug( 3, "Initialising sequence" ); Debug( 3, "Initialising sequence" );
mBaseSeq = seq; mBaseSeq = seq;
mMaxSeq = seq; mMaxSeq = seq;
mBadSeq = RTP_SEQ_MOD + 1; // so seq == mBadSeq is false mBadSeq = RTP_SEQ_MOD + 1; // so seq == mBadSeq is false
mCycles = 0; mCycles = 0;
mReceivedPackets = 0; mReceivedPackets = 0;
mReceivedPrior = 0; mReceivedPrior = 0;
mExpectedPrior = 0; mExpectedPrior = 0;
// other initialization // other initialization
mJitter = 0; mJitter = 0;
mTransit = 0; mTransit = 0;
} }
bool RtpSource::updateSeq( uint16_t seq ) bool RtpSource::updateSeq( uint16_t seq )
{ {
uint16_t uDelta = seq - mMaxSeq; uint16_t uDelta = seq - mMaxSeq;
// Source is not valid until MIN_SEQUENTIAL packets with // Source is not valid until MIN_SEQUENTIAL packets with
// sequential sequence numbers have been received. // sequential sequence numbers have been received.
Debug( 5, "Seq: %d", seq ); Debug( 5, "Seq: %d", seq );
if ( mProbation) if ( mProbation)
{
// packet is in sequence
if ( seq == mMaxSeq + 1)
{ {
// packet is in sequence Debug( 3, "Sequence in probation %d, in sequence", mProbation );
if ( seq == mMaxSeq + 1) mProbation--;
{ mMaxSeq = seq;
Debug( 3, "Sequence in probation %d, in sequence", mProbation ); if ( mProbation == 0 )
mProbation--; {
mMaxSeq = seq; init( seq );
if ( mProbation == 0 ) mReceivedPackets++;
{
init( seq );
mReceivedPackets++;
return( true );
}
}
else
{
Warning( "Sequence in probation %d, out of sequence", mProbation );
mProbation = MIN_SEQUENTIAL - 1;
mMaxSeq = seq;
return( false );
}
return( true ); return( true );
} }
else if ( uDelta < MAX_DROPOUT )
{
if ( uDelta == 1 )
{
Debug( 4, "Packet in sequence, gap %d", uDelta );
}
else
{
Warning( "Packet in sequence, gap %d", uDelta );
}
// in order, with permissible gap
if ( seq < mMaxSeq )
{
// Sequence number wrapped - count another 64K cycle.
mCycles += RTP_SEQ_MOD;
}
mMaxSeq = seq;
}
else if ( uDelta <= RTP_SEQ_MOD - MAX_MISORDER )
{
Warning( "Packet out of sequence, gap %d", uDelta );
// the sequence number made a very large jump
if ( seq == mBadSeq )
{
Debug( 3, "Restarting sequence" );
// Two sequential packets -- assume that the other side
// restarted without telling us so just re-sync
// (i.e., pretend this was the first packet).
init( seq );
}
else
{
mBadSeq = (seq + 1) & (RTP_SEQ_MOD-1);
return( false );
}
} }
else else
{ {
Warning( "Packet duplicate or reordered, gap %d", uDelta ); Warning( "Sequence in probation %d, out of sequence", mProbation );
// duplicate or reordered packet mProbation = MIN_SEQUENTIAL - 1;
return( false ); mMaxSeq = seq;
return( false );
} }
mReceivedPackets++; return( true );
return( uDelta==1?true:false ); }
else if ( uDelta < MAX_DROPOUT )
{
if ( uDelta == 1 )
{
Debug( 4, "Packet in sequence, gap %d", uDelta );
}
else
{
Warning( "Packet in sequence, gap %d", uDelta );
}
// in order, with permissible gap
if ( seq < mMaxSeq )
{
// Sequence number wrapped - count another 64K cycle.
mCycles += RTP_SEQ_MOD;
}
mMaxSeq = seq;
}
else if ( uDelta <= RTP_SEQ_MOD - MAX_MISORDER )
{
Warning( "Packet out of sequence, gap %d", uDelta );
// the sequence number made a very large jump
if ( seq == mBadSeq )
{
Debug( 3, "Restarting sequence" );
// Two sequential packets -- assume that the other side
// restarted without telling us so just re-sync
// (i.e., pretend this was the first packet).
init( seq );
}
else
{
mBadSeq = (seq + 1) & (RTP_SEQ_MOD-1);
return( false );
}
}
else
{
Warning( "Packet duplicate or reordered, gap %d", uDelta );
// duplicate or reordered packet
return( false );
}
mReceivedPackets++;
return( uDelta==1?true:false );
} }
void RtpSource::updateJitter( const RtpDataHeader *header ) void RtpSource::updateJitter( const RtpDataHeader *header )
{ {
if ( mRtpFactor > 0 ) if ( mRtpFactor > 0 )
{ {
Debug( 5, "Delta rtp = %.6f", tvDiffSec( mBaseTimeReal ) ); Debug( 5, "Delta rtp = %.6f", tvDiffSec( mBaseTimeReal ) );
uint32_t localTimeRtp = mBaseTimeRtp + uint32_t( tvDiffSec( mBaseTimeReal ) * mRtpFactor ); uint32_t localTimeRtp = mBaseTimeRtp + uint32_t( tvDiffSec( mBaseTimeReal ) * mRtpFactor );
Debug( 5, "Local RTP time = %x", localTimeRtp ); Debug( 5, "Local RTP time = %x", localTimeRtp );
Debug( 5, "Packet RTP time = %x", ntohl(header->timestampN) ); Debug( 5, "Packet RTP time = %x", ntohl(header->timestampN) );
uint32_t packetTransit = localTimeRtp - ntohl(header->timestampN); uint32_t packetTransit = localTimeRtp - ntohl(header->timestampN);
Debug( 5, "Packet transit RTP time = %x", packetTransit ); Debug( 5, "Packet transit RTP time = %x", packetTransit );
if ( mTransit > 0 ) if ( mTransit > 0 )
{
// Jitter
int d = packetTransit - mTransit;
Debug( 5, "Jitter D = %d", d );
if ( d < 0 )
d = -d;
//mJitter += (1./16.) * ((double)d - mJitter);
mJitter += d - ((mJitter + 8) >> 4);
}
mTransit = packetTransit;
}
else
{ {
mJitter = 0; // Jitter
int d = packetTransit - mTransit;
Debug( 5, "Jitter D = %d", d );
if ( d < 0 )
d = -d;
//mJitter += (1./16.) * ((double)d - mJitter);
mJitter += d - ((mJitter + 8) >> 4);
} }
Debug( 5, "RTP Jitter: %d", mJitter ); mTransit = packetTransit;
}
else
{
mJitter = 0;
}
Debug( 5, "RTP Jitter: %d", mJitter );
} }
void RtpSource::updateRtcpData( uint32_t ntpTimeSecs, uint32_t ntpTimeFrac, uint32_t rtpTime ) void RtpSource::updateRtcpData( uint32_t ntpTimeSecs, uint32_t ntpTimeFrac, uint32_t rtpTime )
{ {
struct timeval ntpTime = tvMake( ntpTimeSecs, suseconds_t((USEC_PER_SEC*(ntpTimeFrac>>16))/(1<<16)) ); struct timeval ntpTime = tvMake( ntpTimeSecs, suseconds_t((USEC_PER_SEC*(ntpTimeFrac>>16))/(1<<16)) );
Debug( 5, "ntpTime: %ld.%06ld, rtpTime: %x", ntpTime.tv_sec, ntpTime.tv_usec, rtpTime );
if ( mBaseTimeNtp.tv_sec == 0 )
{
mBaseTimeReal = tvNow();
mBaseTimeNtp = ntpTime;
mBaseTimeRtp = rtpTime;
}
else if ( !mRtpClock )
{
Debug( 5, "lastSrNtpTime: %ld.%06ld, rtpTime: %x", mLastSrTimeNtp.tv_sec, mLastSrTimeNtp.tv_usec, rtpTime );
Debug( 5, "ntpTime: %ld.%06ld, rtpTime: %x", ntpTime.tv_sec, ntpTime.tv_usec, rtpTime ); Debug( 5, "ntpTime: %ld.%06ld, rtpTime: %x", ntpTime.tv_sec, ntpTime.tv_usec, rtpTime );
if ( mBaseTimeNtp.tv_sec == 0 )
{
mBaseTimeReal = tvNow();
mBaseTimeNtp = ntpTime;
mBaseTimeRtp = rtpTime;
}
else if ( !mRtpClock )
{
Debug( 5, "lastSrNtpTime: %ld.%06ld, rtpTime: %x", mLastSrTimeNtp.tv_sec, mLastSrTimeNtp.tv_usec, rtpTime );
Debug( 5, "ntpTime: %ld.%06ld, rtpTime: %x", ntpTime.tv_sec, ntpTime.tv_usec, rtpTime );
double diffNtpTime = tvDiffSec( mBaseTimeNtp, ntpTime ); double diffNtpTime = tvDiffSec( mBaseTimeNtp, ntpTime );
uint32_t diffRtpTime = rtpTime - mBaseTimeRtp; uint32_t diffRtpTime = rtpTime - mBaseTimeRtp;
//Debug( 5, "Real-diff: %.6f", diffRealTime ); //Debug( 5, "Real-diff: %.6f", diffRealTime );
Debug( 5, "NTP-diff: %.6f", diffNtpTime ); Debug( 5, "NTP-diff: %.6f", diffNtpTime );
Debug( 5, "RTP-diff: %d", diffRtpTime ); Debug( 5, "RTP-diff: %d", diffRtpTime );
mRtpFactor = (uint32_t)(diffRtpTime / diffNtpTime); mRtpFactor = (uint32_t)(diffRtpTime / diffNtpTime);
Debug( 5, "RTPfactor: %d", mRtpFactor ); Debug( 5, "RTPfactor: %d", mRtpFactor );
} }
mLastSrTimeNtpSecs = ntpTimeSecs; mLastSrTimeNtpSecs = ntpTimeSecs;
mLastSrTimeNtpFrac = ntpTimeFrac; mLastSrTimeNtpFrac = ntpTimeFrac;
mLastSrTimeNtp = ntpTime; mLastSrTimeNtp = ntpTime;
mLastSrTimeRtp = rtpTime; mLastSrTimeRtp = rtpTime;
} }
void RtpSource::updateRtcpStats() void RtpSource::updateRtcpStats()
{ {
uint32_t extendedMax = mCycles + mMaxSeq; uint32_t extendedMax = mCycles + mMaxSeq;
mExpectedPackets = extendedMax - mBaseSeq + 1; mExpectedPackets = extendedMax - mBaseSeq + 1;
Debug( 5, "Expected packets = %d", mExpectedPackets ); Debug( 5, "Expected packets = %d", mExpectedPackets );
// The number of packets lost is defined to be the number of packets // The number of packets lost is defined to be the number of packets
// expected less the number of packets actually received: // expected less the number of packets actually received:
mLostPackets = mExpectedPackets - mReceivedPackets; mLostPackets = mExpectedPackets - mReceivedPackets;
Debug( 5, "Lost packets = %d", mLostPackets ); Debug( 5, "Lost packets = %d", mLostPackets );
uint32_t expectedInterval = mExpectedPackets - mExpectedPrior; uint32_t expectedInterval = mExpectedPackets - mExpectedPrior;
Debug( 5, "Expected interval = %d", expectedInterval ); Debug( 5, "Expected interval = %d", expectedInterval );
mExpectedPrior = mExpectedPackets; mExpectedPrior = mExpectedPackets;
uint32_t receivedInterval = mReceivedPackets - mReceivedPrior; uint32_t receivedInterval = mReceivedPackets - mReceivedPrior;
Debug( 5, "Received interval = %d", receivedInterval ); Debug( 5, "Received interval = %d", receivedInterval );
mReceivedPrior = mReceivedPackets; mReceivedPrior = mReceivedPackets;
uint32_t lostInterval = expectedInterval - receivedInterval; uint32_t lostInterval = expectedInterval - receivedInterval;
Debug( 5, "Lost interval = %d", lostInterval ); Debug( 5, "Lost interval = %d", lostInterval );
if ( expectedInterval == 0 || lostInterval <= 0 ) if ( expectedInterval == 0 || lostInterval <= 0 )
mLostFraction = 0; mLostFraction = 0;
else else
mLostFraction = (lostInterval << 8) / expectedInterval; mLostFraction = (lostInterval << 8) / expectedInterval;
Debug( 5, "Lost fraction = %d", mLostFraction ); Debug( 5, "Lost fraction = %d", mLostFraction );
} }
bool RtpSource::handlePacket( const unsigned char *packet, size_t packetLen ) bool RtpSource::handlePacket( const unsigned char *packet, size_t packetLen )
{ {
const RtpDataHeader *rtpHeader; const RtpDataHeader *rtpHeader;
rtpHeader = (RtpDataHeader *)packet; rtpHeader = (RtpDataHeader *)packet;
int rtpHeaderSize = 12 + rtpHeader->cc * 4; int rtpHeaderSize = 12 + rtpHeader->cc * 4;
// No need to check for nal type as non fragmented packets already have 001 start sequence appended // No need to check for nal type as non fragmented packets already have 001 start sequence appended
bool h264FragmentEnd = (mCodecId == AV_CODEC_ID_H264) && (packet[rtpHeaderSize+1] & 0x40); bool h264FragmentEnd = (mCodecId == AV_CODEC_ID_H264) && (packet[rtpHeaderSize+1] & 0x40);
// M stands for Marker, it is the 8th bit // M stands for Marker, it is the 8th bit
// The interpretation of the marker is defined by a profile. It is intended // The interpretation of the marker is defined by a profile. It is intended
// to allow significant events such as frame boundaries to be marked in the // to allow significant events such as frame boundaries to be marked in the
// packet stream. A profile may define additional marker bits or specify // packet stream. A profile may define additional marker bits or specify
// that there is no marker bit by changing the number of bits in the payload type field. // that there is no marker bit by changing the number of bits in the payload type field.
bool thisM = rtpHeader->m || h264FragmentEnd; bool thisM = rtpHeader->m || h264FragmentEnd;
if ( updateSeq( ntohs(rtpHeader->seqN) ) ) if ( updateSeq( ntohs(rtpHeader->seqN) ) )
{
Hexdump( 4, packet+rtpHeaderSize, 16 );
if ( mFrameGood )
{ {
Hexdump( 4, packet+rtpHeaderSize, 16 ); int extraHeader = 0;
if ( mFrameGood ) if( mCodecId == AV_CODEC_ID_H264 )
{
int nalType = (packet[rtpHeaderSize] & 0x1f);
Debug( 3, "Have H264 frame: nal type is %d", nalType );
switch (nalType)
{ {
int extraHeader = 0; case 24: // STAP-A
if( mCodecId == AV_CODEC_ID_H264 )
{ {
int nalType = (packet[rtpHeaderSize] & 0x1f); extraHeader = 2;
Debug( 3, "Have H264 frame: nal type is %d", nalType ); break;
switch (nalType)
{
case 24: // STAP-A
{
extraHeader = 2;
break;
}
case 25: // STAP-B
case 26: // MTAP-16
case 27: // MTAP-24
{
extraHeader = 3;
break;
}
// FU-A and FU-B
case 28: case 29:
{
// Is this NAL the first NAL in fragmentation sequence
if ( packet[rtpHeaderSize+1] & 0x80 )
{
// Now we will form new header of frame
mFrame.append( "\x0\x0\x1\x0", 4 );
// Reconstruct NAL header from FU headers
*(mFrame+3) = (packet[rtpHeaderSize+1] & 0x1f) |
(packet[rtpHeaderSize] & 0xe0);
}
extraHeader = 2;
break;
}
default: {
Debug(3, "Unhandled nalType %d", nalType );
}
}
// Append NAL frame start code
if ( !mFrame.size() )
mFrame.append( "\x0\x0\x1", 3 );
} }
mFrame.append( packet+rtpHeaderSize+extraHeader, packetLen-rtpHeaderSize-extraHeader ); case 25: // STAP-B
} else { case 26: // MTAP-16
Debug( 3, "NOT H264 frame: type is %d", mCodecId ); case 27: // MTAP-24
{
extraHeader = 3;
break;
}
// FU-A and FU-B
case 28: case 29:
{
// Is this NAL the first NAL in fragmentation sequence
if ( packet[rtpHeaderSize+1] & 0x80 )
{
// Now we will form new header of frame
mFrame.append( "\x0\x0\x1\x0", 4 );
// Reconstruct NAL header from FU headers
*(mFrame+3) = (packet[rtpHeaderSize+1] & 0x1f) |
(packet[rtpHeaderSize] & 0xe0);
}
extraHeader = 2;
break;
}
default: {
Debug(3, "Unhandled nalType %d", nalType );
}
} }
Hexdump( 4, mFrame.head(), 16 ); // Append NAL frame start code
if ( !mFrame.size() )
if ( thisM ) mFrame.append( "\x0\x0\x1", 3 );
{ }
if ( mFrameGood ) mFrame.append( packet+rtpHeaderSize+extraHeader, packetLen-rtpHeaderSize-extraHeader );
{ } else {
Debug( 2, "Got new frame %d, %d bytes", mFrameCount, mFrame.size() ); Debug( 3, "NOT H264 frame: type is %d", mCodecId );
mFrameProcessed.setValueImmediate( false );
mFrameReady.updateValueSignal( true );
if ( !mFrameProcessed.getValueImmediate() )
{
// What is the point of this for loop? Is it just me, or will it call getUpdatedValue once or twice? Could it not be better written as
// if ( ! mFrameProcessed.getUpdatedValue( 1 ) && mFrameProcessed.getUpdatedValue( 1 ) ) return false;
for ( int count = 0; !mFrameProcessed.getUpdatedValue( 1 ); count++ )
if( count > 1 )
return( false );
}
mFrameCount++;
}
else
{
Warning( "Discarding incomplete frame %d, %d bytes", mFrameCount, mFrame.size() );
}
mFrame.clear();
}
}
else
{
if ( mFrame.size() )
{
Warning( "Discarding partial frame %d, %d bytes", mFrameCount, mFrame.size() );
}
else
{
Warning( "Discarding frame %d", mFrameCount );
}
mFrameGood = false;
mFrame.clear();
} }
Hexdump( 4, mFrame.head(), 16 );
if ( thisM ) if ( thisM )
{ {
mFrameGood = true; if ( mFrameGood )
prevM = true; {
Debug( 2, "Got new frame %d, %d bytes", mFrameCount, mFrame.size() );
mFrameProcessed.setValueImmediate( false );
mFrameReady.updateValueSignal( true );
if ( !mFrameProcessed.getValueImmediate() )
{
// What is the point of this for loop? Is it just me, or will it call getUpdatedValue once or twice? Could it not be better written as
// if ( ! mFrameProcessed.getUpdatedValue( 1 ) && mFrameProcessed.getUpdatedValue( 1 ) ) return false;
for ( int count = 0; !mFrameProcessed.getUpdatedValue( 1 ); count++ )
if( count > 1 )
return( false );
}
mFrameCount++;
}
else
{
Warning( "Discarding incomplete frame %d, %d bytes", mFrameCount, mFrame.size() );
}
mFrame.clear();
}
}
else
{
if ( mFrame.size() )
{
Warning( "Discarding partial frame %d, %d bytes", mFrameCount, mFrame.size() );
} }
else else
prevM = false; {
Warning( "Discarding frame %d", mFrameCount );
}
mFrameGood = false;
mFrame.clear();
}
if ( thisM )
{
mFrameGood = true;
prevM = true;
}
else
prevM = false;
updateJitter( rtpHeader ); updateJitter( rtpHeader );
return( true ); return( true );
} }
bool RtpSource::getFrame( Buffer &buffer ) bool RtpSource::getFrame( Buffer &buffer )
{ {
Debug( 3, "Getting frame" ); Debug( 3, "Getting frame" );
if ( !mFrameReady.getValueImmediate() ) if ( !mFrameReady.getValueImmediate() )
{ {
// Allow for a couple of spurious returns // Allow for a couple of spurious returns
for ( int count = 0; !mFrameReady.getUpdatedValue( 1 ); count++ ) for ( int count = 0; !mFrameReady.getUpdatedValue( 1 ); count++ )
if ( count > 1 ) if ( count > 1 )
return( false ); return( false );
} }
buffer = mFrame; buffer = mFrame;
mFrameReady.setValueImmediate( false ); mFrameReady.setValueImmediate( false );
mFrameProcessed.updateValueSignal( true ); mFrameProcessed.updateValueSignal( true );
Debug( 4, "Copied %d bytes", buffer.size() ); Debug( 4, "Copied %d bytes", buffer.size() );
return( true ); return( true );
} }
#endif // HAVE_LIBAVCODEC #endif // HAVE_LIBAVCODEC