00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "MultiFramedRTPSource.hh"
00023 #include "GroupsockHelper.hh"
00024 #include <string.h>
00025
00027
00028 class ReorderingPacketBuffer {
00029 public:
00030 ReorderingPacketBuffer(BufferedPacketFactory* packetFactory);
00031 virtual ~ReorderingPacketBuffer();
00032 void reset();
00033
00034 BufferedPacket* getFreePacket(MultiFramedRTPSource* ourSource);
00035 void storePacket(BufferedPacket* bPacket);
00036 BufferedPacket* getNextCompletedPacket(Boolean& packetLossPreceded);
00037 void releaseUsedPacket(BufferedPacket* packet);
00038 void freePacket(BufferedPacket* packet) {
00039 if (packet != fSavedPacket) delete packet;
00040 }
00041
00042 void setThresholdTime(unsigned uSeconds) { fThresholdTime = uSeconds; }
00043
00044 private:
00045 BufferedPacketFactory* fPacketFactory;
00046 unsigned fThresholdTime;
00047 Boolean fHaveSeenFirstPacket;
00048 unsigned short fNextExpectedSeqNo;
00049 BufferedPacket* fHeadPacket;
00050 BufferedPacket* fSavedPacket;
00051
00052 };
00053
00054
00056
00057 MultiFramedRTPSource
00058 ::MultiFramedRTPSource(UsageEnvironment& env, Groupsock* RTPgs,
00059 unsigned char rtpPayloadFormat,
00060 unsigned rtpTimestampFrequency,
00061 BufferedPacketFactory* packetFactory)
00062 : RTPSource(env, RTPgs, rtpPayloadFormat, rtpTimestampFrequency) {
00063 reset();
00064 fReorderingBuffer = new ReorderingPacketBuffer(packetFactory);
00065
00066
00067 increaseReceiveBufferTo(env, RTPgs->socketNum(), 50*1024);
00068 }
00069
00070 void MultiFramedRTPSource::reset() {
00071 fCurrentPacketBeginsFrame = True;
00072 fCurrentPacketCompletesFrame = True;
00073 fAreDoingNetworkReads = False;
00074 fNeedDelivery = False;
00075 fPacketLossInFragmentedFrame = False;
00076 }
00077
00078 MultiFramedRTPSource::~MultiFramedRTPSource() {
00079 fRTPInterface.stopNetworkReading();
00080 delete fReorderingBuffer;
00081 }
00082
00083 Boolean MultiFramedRTPSource
00084 ::processSpecialHeader(BufferedPacket* ,
00085 unsigned& resultSpecialHeaderSize) {
00086
00087 resultSpecialHeaderSize = 0;
00088 return True;
00089 }
00090
00091 Boolean MultiFramedRTPSource
00092 ::packetIsUsableInJitterCalculation(unsigned char* ,
00093 unsigned ) {
00094
00095 return True;
00096 }
00097
00098 void MultiFramedRTPSource::doStopGettingFrames() {
00099 fRTPInterface.stopNetworkReading();
00100 fReorderingBuffer->reset();
00101 reset();
00102 }
00103
00104 void MultiFramedRTPSource::doGetNextFrame() {
00105 if (!fAreDoingNetworkReads) {
00106
00107 fAreDoingNetworkReads = True;
00108 TaskScheduler::BackgroundHandlerProc* handler
00109 = (TaskScheduler::BackgroundHandlerProc*)&networkReadHandler;
00110 fRTPInterface.startNetworkReading(handler);
00111 }
00112
00113 fSavedTo = fTo;
00114 fSavedMaxSize = fMaxSize;
00115 fFrameSize = 0;
00116 fNeedDelivery = True;
00117 doGetNextFrame1();
00118 }
00119
00120 void MultiFramedRTPSource::doGetNextFrame1() {
00121 while (fNeedDelivery) {
00122
00123 Boolean packetLossPrecededThis;
00124 BufferedPacket* nextPacket
00125 = fReorderingBuffer->getNextCompletedPacket(packetLossPrecededThis);
00126 if (nextPacket == NULL) break;
00127
00128 fNeedDelivery = False;
00129
00130 if (nextPacket->useCount() == 0) {
00131
00132
00133 unsigned specialHeaderSize;
00134 if (!processSpecialHeader(nextPacket, specialHeaderSize)) {
00135
00136 fReorderingBuffer->releaseUsedPacket(nextPacket);
00137 fNeedDelivery = True;
00138 break;
00139 }
00140 nextPacket->skip(specialHeaderSize);
00141 }
00142
00143
00144
00145 if (fCurrentPacketBeginsFrame) {
00146 if (packetLossPrecededThis || fPacketLossInFragmentedFrame) {
00147
00148
00149 fTo = fSavedTo; fMaxSize = fSavedMaxSize;
00150 fFrameSize = 0;
00151 }
00152 fPacketLossInFragmentedFrame = False;
00153 } else if (packetLossPrecededThis) {
00154
00155 fPacketLossInFragmentedFrame = True;
00156 }
00157 if (fPacketLossInFragmentedFrame) {
00158
00159 fReorderingBuffer->releaseUsedPacket(nextPacket);
00160 fNeedDelivery = True;
00161 break;
00162 }
00163
00164
00165 unsigned frameSize;
00166 nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,
00167 fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,
00168 fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,
00169 fCurPacketMarkerBit);
00170 fFrameSize += frameSize;
00171
00172 if (!nextPacket->hasUsableData()) {
00173
00174 fReorderingBuffer->releaseUsedPacket(nextPacket);
00175 }
00176
00177 if (fCurrentPacketCompletesFrame || fNumTruncatedBytes > 0) {
00178
00179 if (fNumTruncatedBytes > 0) {
00180 envir() << "MultiFramedRTPSource::doGetNextFrame1(): The total received frame size exceeds the client's buffer size ("
00181 << fSavedMaxSize << "). "
00182 << fNumTruncatedBytes << " bytes of trailing data will be dropped!\n";
00183 }
00184
00185
00186
00187 afterGetting(this);
00188 } else {
00189
00190
00191 fTo += frameSize; fMaxSize -= frameSize;
00192 fNeedDelivery = True;
00193 }
00194 }
00195 }
00196
00197 void MultiFramedRTPSource
00198 ::setPacketReorderingThresholdTime(unsigned uSeconds) {
00199 fReorderingBuffer->setThresholdTime(uSeconds);
00200 }
00201
00202 #define ADVANCE(n) do { bPacket->skip(n); } while (0)
00203
00204 void MultiFramedRTPSource::networkReadHandler(MultiFramedRTPSource* source,
00205 int ) {
00206
00207 BufferedPacket* bPacket
00208 = source->fReorderingBuffer->getFreePacket(source);
00209
00210
00211 Boolean readSuccess = False;
00212 do {
00213 if (!bPacket->fillInData(source->fRTPInterface)) break;
00214 #ifdef TEST_LOSS
00215 source->setPacketReorderingThresholdTime(0);
00216
00217 if ((our_random()%10) == 0) break;
00218 #endif
00219
00220
00221 if (bPacket->dataSize() < 12) break;
00222 unsigned rtpHdr = ntohl(*(unsigned*)(bPacket->data())); ADVANCE(4);
00223 Boolean rtpMarkerBit = (rtpHdr&0x00800000) >> 23;
00224 unsigned rtpTimestamp = ntohl(*(unsigned*)(bPacket->data()));ADVANCE(4);
00225 unsigned rtpSSRC = ntohl(*(unsigned*)(bPacket->data())); ADVANCE(4);
00226
00227
00228 if ((rtpHdr&0xC0000000) != 0x80000000) break;
00229
00230
00231 unsigned cc = (rtpHdr>>24)&0xF;
00232 if (bPacket->dataSize() < cc) break;
00233 ADVANCE(cc*4);
00234
00235
00236 if (rtpHdr&0x10000000) {
00237 if (bPacket->dataSize() < 4) break;
00238 unsigned extHdr = ntohl(*(unsigned*)(bPacket->data())); ADVANCE(4);
00239 unsigned remExtSize = 4*(extHdr&0xFFFF);
00240 if (bPacket->dataSize() < remExtSize) break;
00241 ADVANCE(remExtSize);
00242 }
00243
00244
00245 if (rtpHdr&0x20000000) {
00246 if (bPacket->dataSize() == 0) break;
00247 unsigned numPaddingBytes
00248 = (unsigned)(bPacket->data())[bPacket->dataSize()-1];
00249 if (bPacket->dataSize() < numPaddingBytes) break;
00250 bPacket->removePadding(numPaddingBytes);
00251 }
00252
00253 if ((unsigned char)((rtpHdr&0x007F0000)>>16)
00254 != source->rtpPayloadFormat()) {
00255 break;
00256 }
00257
00258
00259 source->fLastReceivedSSRC = rtpSSRC;
00260 unsigned short rtpSeqNo = (unsigned short)(rtpHdr&0xFFFF);
00261 Boolean usableInJitterCalculation
00262 = source->packetIsUsableInJitterCalculation((bPacket->data()),
00263 bPacket->dataSize());
00264 struct timeval presentationTime;
00265 Boolean hasBeenSyncedUsingRTCP;
00266 source->receptionStatsDB()
00267 .noteIncomingPacket(rtpSSRC, rtpSeqNo, rtpTimestamp,
00268 source->timestampFrequency(),
00269 usableInJitterCalculation, presentationTime,
00270 hasBeenSyncedUsingRTCP, bPacket->dataSize());
00271
00272
00273 struct timeval timeNow;
00274 gettimeofday(&timeNow, NULL);
00275 bPacket->assignMiscParams(rtpSeqNo, rtpTimestamp, presentationTime,
00276 hasBeenSyncedUsingRTCP, rtpMarkerBit,
00277 timeNow);
00278 source->fReorderingBuffer->storePacket(bPacket);
00279
00280 readSuccess = True;
00281 } while (0);
00282 if (!readSuccess) source->fReorderingBuffer->freePacket(bPacket);
00283
00284 source->doGetNextFrame1();
00285
00286 }
00287
00288
00290
00291 #define MAX_PACKET_SIZE 10000
00292
00293 BufferedPacket::BufferedPacket()
00294 : fPacketSize(MAX_PACKET_SIZE),
00295 fBuf(new unsigned char[MAX_PACKET_SIZE]),
00296 fNextPacket(NULL) {
00297 }
00298
00299 BufferedPacket::~BufferedPacket() {
00300 delete fNextPacket;
00301 delete[] fBuf;
00302 }
00303
00304 void BufferedPacket::reset() {
00305 fHead = fTail = 0;
00306 fUseCount = 0;
00307 }
00308
00309
00310 unsigned BufferedPacket
00311 ::nextEnclosedFrameSize(unsigned char*& , unsigned dataSize) {
00312
00313
00314
00315 return dataSize;
00316 }
00317
00318 void BufferedPacket
00319 ::getNextEnclosedFrameParameters(unsigned char*& framePtr, unsigned dataSize,
00320 unsigned& frameSize,
00321 unsigned& frameDurationInMicroseconds) {
00322
00323
00324
00325
00326
00327
00328 frameSize = nextEnclosedFrameSize(framePtr, dataSize);
00329
00330 frameDurationInMicroseconds = 0;
00331 }
00332
00333 Boolean BufferedPacket::fillInData(RTPInterface& rtpInterface) {
00334 reset();
00335
00336 unsigned numBytesRead;
00337 struct sockaddr_in fromAddress;
00338 if (!rtpInterface.handleRead(&fBuf[fTail], fPacketSize-fTail, numBytesRead,
00339 fromAddress)) {
00340 return False;
00341 }
00342 fTail += numBytesRead;
00343 return True;
00344 }
00345
00346 void BufferedPacket
00347 ::assignMiscParams(unsigned short rtpSeqNo, unsigned rtpTimestamp,
00348 struct timeval presentationTime,
00349 Boolean hasBeenSyncedUsingRTCP, Boolean rtpMarkerBit,
00350 struct timeval timeReceived) {
00351 fRTPSeqNo = rtpSeqNo;
00352 fRTPTimestamp = rtpTimestamp;
00353 fPresentationTime = presentationTime;
00354 fHasBeenSyncedUsingRTCP = hasBeenSyncedUsingRTCP;
00355 fRTPMarkerBit = rtpMarkerBit;
00356 fTimeReceived = timeReceived;
00357 }
00358
00359 void BufferedPacket::skip(unsigned numBytes) {
00360 fHead += numBytes;
00361 if (fHead > fTail) fHead = fTail;
00362 }
00363
00364 void BufferedPacket::removePadding(unsigned numBytes) {
00365 if (numBytes > fTail-fHead) numBytes = fTail-fHead;
00366 fTail -= numBytes;
00367 }
00368
00369 void BufferedPacket::appendData(unsigned char* newData, unsigned numBytes) {
00370 if (numBytes > fPacketSize-fTail) numBytes = fPacketSize - fTail;
00371 memmove(&fBuf[fTail], newData, numBytes);
00372 fTail += numBytes;
00373 }
00374
00375 void BufferedPacket::use(unsigned char* to, unsigned toSize,
00376 unsigned& bytesUsed, unsigned& bytesTruncated,
00377 unsigned short& rtpSeqNo, unsigned& rtpTimestamp,
00378 struct timeval& presentationTime,
00379 Boolean& hasBeenSyncedUsingRTCP,
00380 Boolean& rtpMarkerBit) {
00381 unsigned char* origFramePtr = &fBuf[fHead];
00382 unsigned char* newFramePtr = origFramePtr;
00383 unsigned frameSize, frameDurationInMicroseconds;
00384 getNextEnclosedFrameParameters(newFramePtr, fTail - fHead,
00385 frameSize, frameDurationInMicroseconds);
00386 if (frameSize > toSize) {
00387 bytesTruncated = frameSize - toSize;
00388 bytesUsed = toSize;
00389 } else {
00390 bytesTruncated = 0;
00391 bytesUsed = frameSize;
00392 }
00393
00394 memmove(to, newFramePtr, bytesUsed);
00395 fHead += (newFramePtr - origFramePtr) + frameSize;
00396 ++fUseCount;
00397
00398 rtpSeqNo = fRTPSeqNo;
00399 rtpTimestamp = fRTPTimestamp;
00400 presentationTime = fPresentationTime;
00401 hasBeenSyncedUsingRTCP = fHasBeenSyncedUsingRTCP;
00402 rtpMarkerBit = fRTPMarkerBit;
00403
00404
00405 fPresentationTime.tv_usec += frameDurationInMicroseconds;
00406 if (fPresentationTime.tv_usec >= 1000000) {
00407 fPresentationTime.tv_sec += fPresentationTime.tv_usec/1000000;
00408 fPresentationTime.tv_usec = fPresentationTime.tv_usec%1000000;
00409 }
00410 }
00411
00412 BufferedPacketFactory::BufferedPacketFactory() {
00413 }
00414
00415 BufferedPacketFactory::~BufferedPacketFactory() {
00416 }
00417
00418 BufferedPacket* BufferedPacketFactory
00419 ::createNewPacket(MultiFramedRTPSource* ) {
00420 return new BufferedPacket;
00421 }
00422
00423
00425
00426 ReorderingPacketBuffer
00427 ::ReorderingPacketBuffer(BufferedPacketFactory* packetFactory)
00428 : fThresholdTime(100000) ,
00429 fHaveSeenFirstPacket(False), fHeadPacket(NULL), fSavedPacket(NULL) {
00430 fPacketFactory = (packetFactory == NULL)
00431 ? (new BufferedPacketFactory)
00432 : packetFactory;
00433 }
00434
00435 ReorderingPacketBuffer::~ReorderingPacketBuffer() {
00436 reset();
00437 delete fPacketFactory;
00438 }
00439
00440 void ReorderingPacketBuffer::reset() {
00441 if (fHeadPacket == NULL) {
00442 delete fSavedPacket;
00443 } else {
00444 delete fHeadPacket;
00445 }
00446 fHaveSeenFirstPacket = False;
00447 fHeadPacket = NULL;
00448 fSavedPacket = NULL;
00449 }
00450
00451 BufferedPacket* ReorderingPacketBuffer
00452 ::getFreePacket(MultiFramedRTPSource* ourSource) {
00453 if (fSavedPacket == NULL) {
00454 fSavedPacket = fPacketFactory->createNewPacket(ourSource);
00455 }
00456
00457 return fHeadPacket == NULL
00458 ? fSavedPacket
00459 : fPacketFactory->createNewPacket(ourSource);
00460 }
00461
00462 void ReorderingPacketBuffer::storePacket(BufferedPacket* bPacket) {
00463 unsigned short rtpSeqNo = bPacket->rtpSeqNo();
00464
00465 if (!fHaveSeenFirstPacket) {
00466 fNextExpectedSeqNo = rtpSeqNo;
00467 fHaveSeenFirstPacket = True;
00468 }
00469
00470
00471
00472
00473
00474 unsigned short const seqNoThreshold = 100;
00475 if (seqNumLT(rtpSeqNo, fNextExpectedSeqNo)
00476 && seqNumLT(fNextExpectedSeqNo, rtpSeqNo+seqNoThreshold)) {
00477 return;
00478 }
00479
00480
00481 BufferedPacket* beforePtr = NULL;
00482 BufferedPacket* afterPtr = fHeadPacket;
00483 while (afterPtr != NULL) {
00484 if (seqNumLT(rtpSeqNo, afterPtr->rtpSeqNo())) break;
00485 if (rtpSeqNo == afterPtr->rtpSeqNo()) {
00486
00487 return;
00488 }
00489
00490 beforePtr = afterPtr;
00491 afterPtr = afterPtr->nextPacket();
00492 }
00493
00494
00495 bPacket->nextPacket() = afterPtr;
00496 if (beforePtr == NULL) {
00497 fHeadPacket = bPacket;
00498 } else {
00499 beforePtr->nextPacket() = bPacket;
00500 }
00501 }
00502
00503 void ReorderingPacketBuffer::releaseUsedPacket(BufferedPacket* packet) {
00504
00505
00506 ++fNextExpectedSeqNo;
00507
00508 fHeadPacket = fHeadPacket->nextPacket();
00509 packet->nextPacket() = NULL;
00510
00511 freePacket(packet);
00512 }
00513
00514 BufferedPacket* ReorderingPacketBuffer
00515 ::getNextCompletedPacket(Boolean& packetLossPreceded) {
00516 if (fHeadPacket == NULL) return NULL;
00517
00518
00519
00520
00521 if (fHeadPacket->rtpSeqNo() == fNextExpectedSeqNo) {
00522 packetLossPreceded = False;
00523 return fHeadPacket;
00524 }
00525
00526
00527
00528
00529 struct timeval timeNow;
00530 gettimeofday(&timeNow, NULL);
00531 unsigned uSecondsSinceReceived
00532 = (timeNow.tv_sec - fHeadPacket->timeReceived().tv_sec)*1000000
00533 + (timeNow.tv_usec - fHeadPacket->timeReceived().tv_usec);
00534 if (uSecondsSinceReceived > fThresholdTime) {
00535 fNextExpectedSeqNo = fHeadPacket->rtpSeqNo();
00536
00537 packetLossPreceded = True;
00538 return fHeadPacket;
00539 }
00540
00541
00542 return NULL;
00543 }