00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "RTPInterface.hh"
00024 #include <GroupsockHelper.hh>
00025 #include <stdio.h>
00026
00028
00029
00030
00031
00032 static void sendRTPOverTCP(unsigned char* packet, unsigned packetSize,
00033 int socketNum, unsigned char streamChannelId);
00034
00035
00036
00037
00038
00039
00040 static HashTable* socketHashTable(UsageEnvironment& env) {
00041 _Tables* ourTables = _Tables::getOurTables(env);
00042 if (ourTables->socketTable == NULL) {
00043
00044 ourTables->socketTable = HashTable::create(ONE_WORD_HASH_KEYS);
00045 }
00046 return (HashTable*)(ourTables->socketTable);
00047 }
00048
00049 class SocketDescriptor {
00050 public:
00051 SocketDescriptor(UsageEnvironment& env, int socketNum);
00052 virtual ~SocketDescriptor();
00053
00054 void registerRTPInterface(unsigned char streamChannelId,
00055 RTPInterface* rtpInterface);
00056 RTPInterface* lookupRTPInterface(unsigned char streamChannelId);
00057 void deregisterRTPInterface(unsigned char streamChannelId);
00058
00059 private:
00060 static void tcpReadHandler(SocketDescriptor*, int mask);
00061
00062 private:
00063 UsageEnvironment& fEnv;
00064 int fOurSocketNum;
00065 HashTable* fSubChannelHashTable;
00066 };
00067
00068 static SocketDescriptor* lookupSocketDescriptor(UsageEnvironment& env,
00069 int sockNum) {
00070 char const* key = (char const*)(long)sockNum;
00071 return (SocketDescriptor*)(socketHashTable(env)->Lookup(key));
00072 }
00073
00074 static void removeSocketDescription(UsageEnvironment& env, int sockNum) {
00075 char const* key = (char const*)(long)sockNum;
00076 HashTable* table = socketHashTable(env);
00077 table->Remove(key);
00078
00079 if (table->IsEmpty()) {
00080
00081 _Tables* ourTables = _Tables::getOurTables(env);
00082 delete table;
00083 ourTables->socketTable = NULL;
00084 ourTables->reclaimIfPossible();
00085 }
00086 }
00087
00088
00090
00091 RTPInterface::RTPInterface(Medium* owner, Groupsock* gs)
00092 : fOwner(owner), fGS(gs),
00093 fTCPStreams(NULL),
00094 fNextTCPReadSize(0), fNextTCPReadStreamSocketNum(-1),
00095 fReadHandlerProc(NULL),
00096 fAuxReadHandlerFunc(NULL), fAuxReadHandlerClientData(NULL) {
00097 }
00098
00099 RTPInterface::~RTPInterface() {
00100 delete fTCPStreams;
00101 }
00102
00103 Boolean RTPOverTCP_OK = True;
00104
00105 void RTPInterface::setStreamSocket(int sockNum,
00106 unsigned char streamChannelId) {
00107 fGS->removeAllDestinations();
00108 addStreamSocket(sockNum, streamChannelId);
00109 }
00110
00111 void RTPInterface::addStreamSocket(int sockNum,
00112 unsigned char streamChannelId) {
00113 if (sockNum < 0) return;
00114 else RTPOverTCP_OK = True;
00115
00116 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00117 streams = streams->fNext) {
00118 if (streams->fStreamSocketNum == sockNum
00119 && streams->fStreamChannelId == streamChannelId) {
00120 return;
00121 }
00122 }
00123
00124 fTCPStreams = new tcpStreamRecord(sockNum, streamChannelId, fTCPStreams);
00125 }
00126
00127 void RTPInterface::removeStreamSocket(int sockNum,
00128 unsigned char streamChannelId) {
00129 for (tcpStreamRecord** streamsPtr = &fTCPStreams; *streamsPtr != NULL;
00130 streamsPtr = &((*streamsPtr)->fNext)) {
00131 if ((*streamsPtr)->fStreamSocketNum == sockNum
00132 && (*streamsPtr)->fStreamChannelId == streamChannelId) {
00133
00134 tcpStreamRecord* next = (*streamsPtr)->fNext;
00135 (*streamsPtr)->fNext = NULL;
00136 delete (*streamsPtr);
00137 *streamsPtr = next;
00138 return;
00139 }
00140 }
00141 }
00142
00143 void RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) {
00144
00145 fGS->output(envir(), fGS->ttl(), packet, packetSize);
00146
00147
00148 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00149 streams = streams->fNext) {
00150 sendRTPOverTCP(packet, packetSize,
00151 streams->fStreamSocketNum, streams->fStreamChannelId);
00152 }
00153 }
00154
00155 void RTPInterface
00156 ::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc) {
00157
00158 envir().taskScheduler().
00159 turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner);
00160
00161
00162 fReadHandlerProc = handlerProc;
00163 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00164 streams = streams->fNext) {
00165
00166 SocketDescriptor* socketDescriptor
00167 = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);
00168 if (socketDescriptor == NULL) {
00169 socketDescriptor
00170 = new SocketDescriptor(envir(), streams->fStreamSocketNum);
00171 socketHashTable(envir())->Add((char const*)(long)(streams->fStreamSocketNum),
00172 socketDescriptor);
00173 }
00174
00175
00176 socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this);
00177 }
00178 }
00179
00180 Boolean RTPInterface::handleRead(unsigned char* buffer,
00181 unsigned bufferMaxSize,
00182 unsigned& bytesRead,
00183 struct sockaddr_in& fromAddress) {
00184 Boolean readSuccess;
00185 if (fNextTCPReadStreamSocketNum < 0) {
00186
00187 readSuccess = fGS->handleRead(buffer, bufferMaxSize, bytesRead, fromAddress);
00188 } else {
00189
00190 bytesRead = 0;
00191 unsigned totBytesToRead = fNextTCPReadSize;
00192 if (totBytesToRead > bufferMaxSize) totBytesToRead = bufferMaxSize;
00193 unsigned curBytesToRead = totBytesToRead;
00194 unsigned curBytesRead;
00195 while ((curBytesRead = readSocket(envir(), fNextTCPReadStreamSocketNum,
00196 &buffer[bytesRead], curBytesToRead,
00197 fromAddress)) > 0) {
00198 bytesRead += curBytesRead;
00199 if (bytesRead >= totBytesToRead) break;
00200 curBytesToRead -= curBytesRead;
00201 }
00202 if (curBytesRead <= 0) {
00203 bytesRead = 0;
00204 readSuccess = False;
00205 RTPOverTCP_OK = False;
00206 } else {
00207 readSuccess = True;
00208 }
00209 fNextTCPReadStreamSocketNum = -1;
00210 }
00211
00212 if (readSuccess && fAuxReadHandlerFunc != NULL) {
00213
00214 (*fAuxReadHandlerFunc)(fAuxReadHandlerClientData, buffer, bytesRead);
00215 }
00216 return readSuccess;
00217 }
00218
00219 void RTPInterface::stopNetworkReading() {
00220
00221 envir().taskScheduler().turnOffBackgroundReadHandling(fGS->socketNum());
00222
00223
00224 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00225 streams = streams->fNext) {
00226 SocketDescriptor* socketDescriptor
00227 = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);
00228 if (socketDescriptor != NULL) {
00229 socketDescriptor->deregisterRTPInterface(streams->fStreamChannelId);
00230
00231
00232 }
00233 }
00234 }
00235
00236
00238
00239 void sendRTPOverTCP(unsigned char* packet, unsigned packetSize,
00240 int socketNum, unsigned char streamChannelId) {
00241 #ifdef DEBUG
00242 fprintf(stderr, "sendRTPOverTCP: %d bytes over channel %d (socket %d)\n",
00243 packetSize, streamChannelId, socketNum); fflush(stderr);
00244 #endif
00245
00246
00247 do {
00248 char const dollar = '$';
00249 if (send(socketNum, &dollar, 1, 0) != 1) break;
00250 if (send(socketNum, (char*)&streamChannelId, 1, 0) != 1) break;
00251
00252 char netPacketSize[2];
00253 netPacketSize[0] = (char) ((packetSize&0xFF00)>>8);
00254 netPacketSize[1] = (char) (packetSize&0xFF);
00255 if (send(socketNum, netPacketSize, 2, 0) != 2) break;
00256
00257 if (send(socketNum, (char*)packet, packetSize, 0) != (int)packetSize) break;
00258
00259 #ifdef DEBUG
00260 fprintf(stderr, "sendRTPOverTCP: completed\n"); fflush(stderr);
00261 #endif
00262
00263 return;
00264 } while (0);
00265
00266 RTPOverTCP_OK = False;
00267 #ifdef DEBUG
00268 fprintf(stderr, "sendRTPOverTCP: failed!\n"); fflush(stderr);
00269 #endif
00270 }
00271
00272 SocketDescriptor::SocketDescriptor(UsageEnvironment& env, int socketNum)
00273 : fEnv(env), fOurSocketNum(socketNum),
00274 fSubChannelHashTable(HashTable::create(ONE_WORD_HASH_KEYS)) {
00275 }
00276
00277 SocketDescriptor::~SocketDescriptor() {
00278 delete fSubChannelHashTable;
00279 }
00280
00281 void SocketDescriptor::registerRTPInterface(unsigned char streamChannelId,
00282 RTPInterface* rtpInterface) {
00283 Boolean isFirstRegistration = fSubChannelHashTable->IsEmpty();
00284 fSubChannelHashTable->Add((char const*)(long)streamChannelId,
00285 rtpInterface);
00286
00287 if (isFirstRegistration) {
00288
00289 TaskScheduler::BackgroundHandlerProc* handler
00290 = (TaskScheduler::BackgroundHandlerProc*)&tcpReadHandler;
00291 fEnv.taskScheduler().
00292 turnOnBackgroundReadHandling(fOurSocketNum, handler, this);
00293 }
00294 }
00295
00296 RTPInterface* SocketDescriptor
00297 ::lookupRTPInterface(unsigned char streamChannelId) {
00298 char const* lookupArg = (char const*)(long)streamChannelId;
00299 return (RTPInterface*)(fSubChannelHashTable->Lookup(lookupArg));
00300 }
00301
00302 void SocketDescriptor
00303 ::deregisterRTPInterface(unsigned char streamChannelId) {
00304 fSubChannelHashTable->Remove((char const*)(long)streamChannelId);
00305
00306 if (fSubChannelHashTable->IsEmpty()) {
00307
00308 fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum);
00309 removeSocketDescription(fEnv, fOurSocketNum);
00310 delete this;
00311 }
00312 }
00313
00314 void SocketDescriptor::tcpReadHandler(SocketDescriptor* socketDescriptor,
00315 int mask) {
00316 do {
00317 UsageEnvironment& env = socketDescriptor->fEnv;
00318 int socketNum = socketDescriptor->fOurSocketNum;
00319
00320
00321
00322
00323
00324
00325 unsigned char c;
00326 struct sockaddr_in fromAddress;
00327 do {
00328 if (readSocket(env, socketNum, &c, 1, fromAddress) != 1) {
00329 env.taskScheduler().turnOffBackgroundReadHandling(socketNum);
00330 return;
00331 }
00332 } while (c != '$');
00333
00334
00335 unsigned char streamChannelId;
00336 if (readSocket(env, socketNum, &streamChannelId, 1, fromAddress)
00337 != 1) break;
00338 RTPInterface* rtpInterface
00339 = socketDescriptor->lookupRTPInterface(streamChannelId);
00340 if (rtpInterface == NULL) break;
00341
00342
00343 unsigned short size;
00344 if (readSocketExact(env, socketNum, (unsigned char*)&size, 2,
00345 fromAddress) != 2) break;
00346 rtpInterface->fNextTCPReadSize = ntohs(size);
00347 rtpInterface->fNextTCPReadStreamSocketNum = socketNum;
00348 #ifdef DEBUG
00349 fprintf(stderr, "SocketDescriptor::tcpReadHandler() reading %d bytes on channel %d\n", rtpInterface->fNextTCPReadSize, streamChannelId);
00350 #endif
00351
00352
00353
00354 if (rtpInterface->fReadHandlerProc != NULL) {
00355 rtpInterface->fReadHandlerProc(rtpInterface->fOwner, mask);
00356 }
00357
00358 } while (0);
00359 }
00360
00361
00363
00364 tcpStreamRecord
00365 ::tcpStreamRecord(int streamSocketNum, unsigned char streamChannelId,
00366 tcpStreamRecord* next)
00367 : fNext(next),
00368 fStreamSocketNum(streamSocketNum), fStreamChannelId(streamChannelId) {
00369 }
00370
00371 tcpStreamRecord::~tcpStreamRecord() {
00372 delete fNext;
00373 }
00374