00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "PrioritizedRTPStreamSelector.hh"
00023 #include "GroupsockHelper.hh"
00024 #include <string.h>
00025 #include <stdlib.h>
00026
00028
00029
00030 class PrioritizedInputStreamDescriptor {
00031 public:
00032 PrioritizedInputStreamDescriptor(PrioritizedRTPStreamSelector*
00033 ourSelector,
00034 PrioritizedInputStreamDescriptor* next,
00035 unsigned priority,
00036 RTPSource* inputStream,
00037 RTCPInstance* inputStreamRTCP);
00038 virtual ~PrioritizedInputStreamDescriptor();
00039
00040 PrioritizedInputStreamDescriptor*& next() { return fNext; }
00041 unsigned priority() const { return fPriority; }
00042 RTPSource* rtpStream() const { return fRTPStream; }
00043 unsigned char*& buffer() { return fBuffer; }
00044 unsigned bufferSize() const { return fBufferSize; }
00045
00046 void afterGettingFrame1(unsigned frameSize);
00047 void onSourceClosure1();
00048
00049 private:
00050 PrioritizedRTPStreamSelector* fOurSelector;
00051 PrioritizedInputStreamDescriptor* fNext;
00052 unsigned fPriority;
00053 RTPSource* fRTPStream;
00054 RTCPInstance* fRTCPStream;
00055 unsigned char* fBuffer;
00056 static unsigned const fBufferSize;
00057 unsigned fBufferBytesUsed;
00058 };
00059
00060 static void afterGettingFrame(void* clientData, unsigned frameSize,
00061 unsigned numTruncatedBytes,
00062 struct timeval presentationTime,
00063 unsigned durationInMicroseconds);
00064 static void onSourceClosure(void* clientData);
00065
00066
00068
00069 class WarehousedPacketDescriptor;
00070
00071 class PacketWarehouse {
00072 public:
00073 PacketWarehouse(unsigned seqNumStagger);
00074 virtual ~PacketWarehouse();
00075
00076 Boolean isFull();
00077 void addNewFrame(unsigned priority, unsigned short rtpSeqNo,
00078 unsigned char* buffer, unsigned frameSize);
00079 unsigned char* dequeueFrame(unsigned& resultFrameSize,
00080 unsigned& uSecondsToDefer);
00081
00082 Boolean fLastActionWasIncoming;
00083 private:
00084 WarehousedPacketDescriptor* fPacketDescriptors;
00085 Boolean fHaveReceivedFrames;
00086 unsigned short fMinSeqNumStored, fMaxSeqNumStored;
00087 unsigned const fMinSpanForDelivery, fMaxSpanForDelivery, fNumDescriptors;
00088 struct timeval fLastArrivalTime; unsigned short fLastRTPSeqNo;
00089 unsigned fInterArrivalAveGap;
00090 };
00091
00093
00094 PrioritizedRTPStreamSelector
00095 ::PrioritizedRTPStreamSelector(UsageEnvironment& env,
00096 unsigned seqNumStagger)
00097 : FramedSource(env),
00098 fNextInputStreamPriority(0), fInputStreams(NULL),
00099 fAmCurrentlyReading(False), fNeedAFrame(False) {
00100 fWarehouse = new PacketWarehouse(seqNumStagger);
00101 }
00102
00103 PrioritizedRTPStreamSelector::~PrioritizedRTPStreamSelector() {
00104 delete fWarehouse;
00105
00106 while (fInputStreams != NULL) {
00107 PrioritizedInputStreamDescriptor* inputStream
00108 = fInputStreams;
00109 fInputStreams = inputStream->next();
00110 delete inputStream;
00111 }
00112 }
00113
00114 PrioritizedRTPStreamSelector* PrioritizedRTPStreamSelector
00115 ::createNew(UsageEnvironment& env, unsigned seqNumStagger) {
00116 return new PrioritizedRTPStreamSelector(env, seqNumStagger);
00117 }
00118
00119 unsigned PrioritizedRTPStreamSelector
00120 ::addInputRTPStream(RTPSource* inputStream,
00121 RTCPInstance* inputStreamRTCP) {
00122 fInputStreams
00123 = new PrioritizedInputStreamDescriptor(this, fInputStreams,
00124 fNextInputStreamPriority,
00125 inputStream, inputStreamRTCP);
00126 return fNextInputStreamPriority++;
00127 }
00128
00129 void PrioritizedRTPStreamSelector::removeInputRTPStream(unsigned priority) {
00130 for (PrioritizedInputStreamDescriptor*& inputStream
00131 = fInputStreams;
00132 inputStream != NULL; inputStream = inputStream->next()) {
00133 if (inputStream->priority() == priority) {
00134 PrioritizedInputStreamDescriptor* toDelete
00135 = inputStream;
00136 inputStream->next() = toDelete->next();
00137 delete toDelete;
00138 break;
00139 }
00140 }
00141 }
00142
00143 Boolean PrioritizedRTPStreamSelector
00144 ::lookupByName(UsageEnvironment& env,
00145 char const* sourceName,
00146 PrioritizedRTPStreamSelector*& resultSelector) {
00147 resultSelector = NULL;
00148
00149 FramedSource* source;
00150 if (!FramedSource::lookupByName(env, sourceName, source)) return False;
00151
00152 if (!source->isPrioritizedRTPStreamSelector()) {
00153 env.setResultMsg(sourceName, " is not a Prioritized RTP Stream Selector");
00154 return False;
00155 }
00156
00157 resultSelector = (PrioritizedRTPStreamSelector*)source;
00158 return True;
00159 }
00160
00161 Boolean PrioritizedRTPStreamSelector
00162 ::isPrioritizedRTPStreamSelector() const {
00163 return True;
00164 }
00165
00166 void PrioritizedRTPStreamSelector::doGetNextFrame() {
00167
00168
00169 startReadingProcess();
00170
00171
00172 unsigned uSecondsToDefer;
00173 if (deliverFrameToClient(uSecondsToDefer)) {
00174 fNeedAFrame = False;
00175
00176
00177
00178 if (uSecondsToDefer > 0) {
00179 nextTask()
00180 = envir().taskScheduler().scheduleDelayedTask((int)uSecondsToDefer,
00181 completeDelivery,
00182 this);
00183 } else {
00184 completeDelivery(this);
00185 }
00186 } else {
00187 fNeedAFrame = True;
00188 }
00189 }
00190
00191 void PrioritizedRTPStreamSelector::startReadingProcess() {
00192 if (fAmCurrentlyReading) return;
00193 if (fWarehouse->isFull()) return;
00194
00195
00196
00197
00198
00199 for (PrioritizedInputStreamDescriptor* inputStream
00200 = fInputStreams;
00201 inputStream != NULL; inputStream = inputStream->next()) {
00202 RTPSource* rtpStream = inputStream->rtpStream();
00203 if (!rtpStream->isCurrentlyAwaitingData()) {
00204
00205 fAmCurrentlyReading = True;
00206 rtpStream->getNextFrame(inputStream->buffer(),
00207 inputStream->bufferSize(),
00208 afterGettingFrame, inputStream,
00209 onSourceClosure, inputStream);
00210 }
00211 }
00212 }
00213
00214 void PrioritizedRTPStreamSelector
00215 ::handleNewIncomingFrame(unsigned priority, unsigned short rtpSeqNo,
00216 unsigned char* buffer, unsigned frameSize) {
00217
00218 fWarehouse->addNewFrame(priority, rtpSeqNo, buffer, frameSize);
00219 fWarehouse->fLastActionWasIncoming = True;
00220
00221
00222 if (fNeedAFrame) {
00223 doGetNextFrame();
00224 }
00225
00226
00227 fAmCurrentlyReading = False;
00228 startReadingProcess();
00229 }
00230
00231 Boolean PrioritizedRTPStreamSelector
00232 ::deliverFrameToClient(unsigned& uSecondsToDefer) {
00233 unsigned char* buffer
00234 = fWarehouse->dequeueFrame(fFrameSize, uSecondsToDefer);
00235
00236 if (buffer != NULL) {
00237
00238 if (fFrameSize > fMaxSize) {
00239 fNumTruncatedBytes = fFrameSize - fMaxSize;
00240 fFrameSize = fMaxSize;
00241 }
00242 memmove(fTo, buffer, fFrameSize);
00243
00244 delete[] buffer;
00245 fWarehouse->fLastActionWasIncoming = False;
00246 return True;
00247 }
00248
00249
00250 return False;
00251 }
00252
00253
00254 void PrioritizedRTPStreamSelector::completeDelivery(void* clientData) {
00255 PrioritizedRTPStreamSelector* selector
00256 = (PrioritizedRTPStreamSelector*)clientData;
00257
00258
00259
00260 FramedSource::afterGetting(selector);
00261 }
00262
00263
00265
00266 unsigned const PrioritizedInputStreamDescriptor::fBufferSize = 4000;
00267
00268 PrioritizedInputStreamDescriptor
00269 ::PrioritizedInputStreamDescriptor(PrioritizedRTPStreamSelector*
00270 ourSelector,
00271 PrioritizedInputStreamDescriptor* next,
00272 unsigned priority,
00273 RTPSource* inputStream,
00274 RTCPInstance* inputStreamRTCP)
00275 : fOurSelector(ourSelector), fNext(next), fPriority(priority),
00276 fRTPStream(inputStream), fRTCPStream(inputStreamRTCP) {
00277 fBuffer = new unsigned char[fBufferSize];
00278 fBufferBytesUsed = 0;
00279 }
00280
00281 PrioritizedInputStreamDescriptor::~PrioritizedInputStreamDescriptor() {
00282 delete[] fBuffer;
00283 }
00284
00285 static void afterGettingFrame(void* clientData, unsigned frameSize,
00286 unsigned ,
00287 struct timeval ,
00288 unsigned ) {
00289 PrioritizedInputStreamDescriptor* inputStream
00290 = (PrioritizedInputStreamDescriptor*)clientData;
00291 inputStream->afterGettingFrame1(frameSize);
00292 }
00293
00294 void PrioritizedInputStreamDescriptor
00295 ::afterGettingFrame1(unsigned frameSize) {
00296 unsigned short rtpSeqNo = rtpStream()->curPacketRTPSeqNum();
00297
00298 fOurSelector->handleNewIncomingFrame(fPriority, rtpSeqNo,
00299 fBuffer, frameSize);
00300 }
00301
00302 static void onSourceClosure(void* clientData) {
00303 PrioritizedInputStreamDescriptor* inputStream
00304 = (PrioritizedInputStreamDescriptor*)clientData;
00305 inputStream->onSourceClosure1();
00306 }
00307
00308 void PrioritizedInputStreamDescriptor::onSourceClosure1() {
00309 fOurSelector->removeInputRTPStream(fPriority);
00310 }
00311
00312
00314
00315 class WarehousedPacketDescriptor {
00316 public:
00317 WarehousedPacketDescriptor() : buffer(NULL) {}
00318
00319
00320 unsigned priority;
00321 unsigned frameSize;
00322 unsigned char* buffer;
00323 };
00324
00325
00327
00328 PacketWarehouse::PacketWarehouse(unsigned seqNumStagger)
00329 : fLastActionWasIncoming(False),
00330 fHaveReceivedFrames(False), fMinSeqNumStored(0), fMaxSeqNumStored(0),
00331 fMinSpanForDelivery((unsigned)(1.5*seqNumStagger)),
00332 fMaxSpanForDelivery(3*seqNumStagger),
00333 fNumDescriptors(4*seqNumStagger),
00334 fInterArrivalAveGap(0) {
00335 fPacketDescriptors = new WarehousedPacketDescriptor[fNumDescriptors];
00336 if (fPacketDescriptors == NULL) {
00337 #ifdef DEBUG
00338 fprintf(stderr, "PacketWarehouse failed to allocate %d descriptors; seqNumStagger too large!\n", fNumDescriptors);
00339 #endif
00340 exit(1);
00341 }
00342
00343
00344 gettimeofday(&fLastArrivalTime, NULL);
00345 }
00346
00347 PacketWarehouse::~PacketWarehouse() {
00348
00349 for (unsigned i = 0; i < fNumDescriptors; ++i) {
00350 delete[] fPacketDescriptors[i].buffer;
00351 }
00352 delete[] fPacketDescriptors;
00353 }
00354
00355 Boolean PacketWarehouse::isFull() {
00356 int currentSpan = fMaxSeqNumStored - fMinSeqNumStored;
00357 if (currentSpan < 0) currentSpan += 65536;
00358
00359 return (unsigned)currentSpan >= fNumDescriptors;
00360 }
00361
00362 void PacketWarehouse::addNewFrame(unsigned priority,
00363 unsigned short rtpSeqNo,
00364 unsigned char* buffer,
00365 unsigned frameSize) {
00366 if (!fHaveReceivedFrames) {
00367
00368
00369
00370 if (priority != 0) return;
00371
00372 fMinSeqNumStored = fMaxSeqNumStored = rtpSeqNo;
00373 fHaveReceivedFrames = True;
00374 } else {
00375
00376 if (seqNumLT(fMaxSeqNumStored, rtpSeqNo)) {
00377 fMaxSeqNumStored = rtpSeqNo;
00378 } else if (seqNumLT(rtpSeqNo, fMinSeqNumStored)) {
00379 return;
00380 }
00381 }
00382
00383 if (isFull()) {
00384
00385
00386
00387
00388 fMinSeqNumStored = fMaxSeqNumStored = rtpSeqNo;
00389 }
00390
00391
00392 WarehousedPacketDescriptor& desc
00393 = fPacketDescriptors[rtpSeqNo%fNumDescriptors];
00394 if (desc.buffer != NULL) {
00395
00396
00397 if (desc.priority < priority) return;
00398
00399
00400 delete[] desc.buffer;
00401 }
00402
00403
00404 desc.buffer = new unsigned char[frameSize];
00405 if (desc.buffer == NULL) {
00406 #ifdef DEBUG
00407 fprintf(stderr, "PacketWarehouse::addNewFrame failed to allocate %d-byte buffer!\n", frameSize);
00408 #endif
00409 exit(1);
00410 }
00411 memmove(desc.buffer, buffer, frameSize);
00412 desc.frameSize = frameSize;
00413 desc.priority = priority;
00414
00415 struct timeval timeNow;
00416 gettimeofday(&timeNow, NULL);
00417
00418 if (rtpSeqNo == (fLastRTPSeqNo+1)%65536) {
00419
00420
00421 unsigned lastGap
00422 = (timeNow.tv_sec - fLastArrivalTime.tv_sec)*1000000
00423 + (timeNow.tv_usec - fLastArrivalTime.tv_usec);
00424
00425 fInterArrivalAveGap = (9*fInterArrivalAveGap + lastGap)/10;
00426 }
00427 fLastArrivalTime = timeNow;
00428 fLastRTPSeqNo = rtpSeqNo;
00429 }
00430
00431 unsigned char* PacketWarehouse::dequeueFrame(unsigned& resultFrameSize,
00432 unsigned& uSecondsToDefer) {
00433 uSecondsToDefer = 0;
00434
00435
00436
00437 int currentSpan = fMaxSeqNumStored - fMinSeqNumStored;
00438 if (currentSpan < 0) currentSpan += 65536;
00439 if (currentSpan < (int)fMinSpanForDelivery) {
00440 return NULL;
00441 }
00442
00443
00444
00445
00446
00447
00448 if (currentSpan < (int)fMaxSpanForDelivery) {
00449 if (fLastActionWasIncoming) {
00450 uSecondsToDefer = (unsigned)(1.5*fInterArrivalAveGap);
00451 }
00452 }
00453
00454
00455 unsigned char* resultBuffer = NULL;
00456 do {
00457 if (currentSpan < (int)fMinSpanForDelivery) break;
00458
00459 WarehousedPacketDescriptor& desc
00460 = fPacketDescriptors[fMinSeqNumStored%fNumDescriptors];
00461 resultBuffer = desc.buffer;
00462 resultFrameSize = desc.frameSize;
00463
00464 desc.buffer = NULL;
00465 #ifdef DEBUG
00466 if (resultBuffer == NULL) fprintf(stderr, "No packet for seq num %d - skipping\n", fMinSeqNumStored);
00467 else if (desc.priority == 2) fprintf(stderr, "Using priority %d frame for seq num %d\n", desc.priority, fMinSeqNumStored);
00468 #endif
00469 fMinSeqNumStored = (fMinSeqNumStored+1)%65536;
00470 --currentSpan;
00471 } while (resultBuffer == NULL);
00472
00473 return resultBuffer;
00474 }