00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include "Groupsock.hh"
00021 #include "GroupsockHelper.hh"
00022
00023 #include "TunnelEncaps.hh"
00024
00025 #ifndef NO_STRSTREAM
00026 #if (defined(__WIN32__) || defined(_WIN32)) && !defined(__MINGW32__)
00027 #include <strstrea.h>
00028 #else
00029 #if defined(__GNUC__) && (__GNUC__ > 3 || __GNUC__ == 3 && __GNUC_MINOR__ > 0)
00030 #include <strstream>
00031 #else
00032 #include <strstream.h>
00033 #endif
00034 #endif
00035 #endif
00036 #include <stdio.h>
00037
00039
00040 OutputSocket::OutputSocket(UsageEnvironment& env)
00041 : Socket(env, 0 ),
00042 fSourcePort(0), fLastSentTTL(0) {
00043 }
00044
00045 OutputSocket::OutputSocket(UsageEnvironment& env, Port port)
00046 : Socket(env, port),
00047 fSourcePort(0), fLastSentTTL(0) {
00048 }
00049
00050 OutputSocket::~OutputSocket() {
00051 }
00052
00053 Boolean OutputSocket::write(netAddressBits address, Port port, u_int8_t ttl,
00054 unsigned char* buffer, unsigned bufferSize) {
00055 if (ttl == fLastSentTTL) {
00056
00057 ttl = 0;
00058 } else {
00059 fLastSentTTL = ttl;
00060 }
00061 struct in_addr destAddr; destAddr.s_addr = address;
00062 if (!writeSocket(env(), socketNum(), destAddr, port, ttl,
00063 buffer, bufferSize))
00064 return False;
00065
00066 if (sourcePortNum() == 0) {
00067
00068
00069 if (!getSourcePort(env(), socketNum(), fSourcePort)) {
00070 if (DebugLevel >= 1)
00071 env() << *this
00072 << ": failed to get source port: "
00073 << env().getResultMsg() << "\n";
00074 return False;
00075 }
00076 }
00077
00078 return True;
00079 }
00080
00081
00082 Boolean OutputSocket
00083 ::handleRead(unsigned char* , unsigned ,
00084 unsigned& , struct sockaddr_in& ) {
00085 return True;
00086 }
00087
00088
00090
00091 destRecord
00092 ::destRecord(struct in_addr const& addr, Port const& port, u_int8_t ttl,
00093 destRecord* next)
00094 : fNext(next), fGroupEId(addr, port.num(), ttl), fPort(port) {
00095 }
00096
00097 destRecord::~destRecord() {
00098 delete fNext;
00099 }
00100
00101
00103
00104 NetInterfaceTrafficStats Groupsock::statsIncoming;
00105 NetInterfaceTrafficStats Groupsock::statsOutgoing;
00106 NetInterfaceTrafficStats Groupsock::statsRelayedIncoming;
00107 NetInterfaceTrafficStats Groupsock::statsRelayedOutgoing;
00108
00109
00110 Groupsock::Groupsock(UsageEnvironment& env, struct in_addr const& groupAddr,
00111 Port port, u_int8_t ttl)
00112 : OutputSocket(env, port),
00113 deleteIfNoMembers(False), isSlave(False),
00114 fIncomingGroupEId(groupAddr, port.num(), ttl), fDests(NULL), fTTL(ttl) {
00115 addDestination(groupAddr, port);
00116
00117 if (!socketJoinGroup(env, socketNum(), groupAddr.s_addr)) {
00118 if (DebugLevel >= 1) {
00119 env << *this << ": failed to join group: "
00120 << env.getResultMsg() << "\n";
00121 }
00122 }
00123
00124
00125 if (ourSourceAddressForMulticast(env) == 0) {
00126 if (DebugLevel >= 0) {
00127 env << "Unable to determine our source address: "
00128 << env.getResultMsg() << "\n";
00129 }
00130 }
00131
00132 if (DebugLevel >= 2) env << *this << ": created\n";
00133 }
00134
00135
00136 Groupsock::Groupsock(UsageEnvironment& env, struct in_addr const& groupAddr,
00137 struct in_addr const& sourceFilterAddr,
00138 Port port)
00139 : OutputSocket(env, port),
00140 deleteIfNoMembers(False), isSlave(False),
00141 fIncomingGroupEId(groupAddr, sourceFilterAddr, port.num()),
00142 fDests(NULL), fTTL(255) {
00143 addDestination(groupAddr, port);
00144
00145
00146 if (!socketJoinGroupSSM(env, socketNum(), groupAddr.s_addr,
00147 sourceFilterAddr.s_addr)) {
00148 if (DebugLevel >= 3) {
00149 env << *this << ": SSM join failed: "
00150 << env.getResultMsg();
00151 env << " - trying regular join instead\n";
00152 }
00153 if (!socketJoinGroup(env, socketNum(), groupAddr.s_addr)) {
00154 if (DebugLevel >= 1) {
00155 env << *this << ": failed to join group: "
00156 << env.getResultMsg() << "\n";
00157 }
00158 }
00159 }
00160
00161 if (DebugLevel >= 2) env << *this << ": created\n";
00162 }
00163
00164 Groupsock::~Groupsock() {
00165 if (isSSM()) {
00166 if (!socketLeaveGroupSSM(env(), socketNum(), groupAddress().s_addr,
00167 sourceFilterAddress().s_addr)) {
00168 socketLeaveGroup(env(), socketNum(), groupAddress().s_addr);
00169 }
00170 } else {
00171 socketLeaveGroup(env(), socketNum(), groupAddress().s_addr);
00172 }
00173
00174 delete fDests;
00175
00176 if (DebugLevel >= 2) env() << *this << ": deleting\n";
00177 }
00178
00179 void
00180 Groupsock::changeDestinationParameters(struct in_addr const& newDestAddr,
00181 Port newDestPort, int newDestTTL) {
00182 if (fDests == NULL) return;
00183
00184 struct in_addr destAddr = fDests->fGroupEId.groupAddress();
00185 if (newDestAddr.s_addr != 0) {
00186 if (newDestAddr.s_addr != destAddr.s_addr
00187 && IsMulticastAddress(newDestAddr.s_addr)) {
00188
00189
00190
00191 socketLeaveGroup(env(), socketNum(), destAddr.s_addr);
00192 socketJoinGroup(env(), socketNum(), newDestAddr.s_addr);
00193 }
00194 destAddr.s_addr = newDestAddr.s_addr;
00195 }
00196
00197 portNumBits destPortNum = fDests->fGroupEId.portNum();
00198 if (newDestPort.num() != 0) {
00199 if (newDestPort.num() != destPortNum
00200 && IsMulticastAddress(destAddr.s_addr)) {
00201
00202 changePort(newDestPort);
00203
00204 socketJoinGroup(env(), socketNum(), destAddr.s_addr);
00205 }
00206 destPortNum = newDestPort.num();
00207 fDests->fPort = newDestPort;
00208 }
00209
00210 u_int8_t destTTL = ttl();
00211 if (newDestTTL != ~0) destTTL = (u_int8_t)newDestTTL;
00212
00213 fDests->fGroupEId = GroupEId(destAddr, destPortNum, destTTL);
00214 }
00215
00216 void Groupsock::addDestination(struct in_addr const& addr, Port const& port) {
00217
00218 for (destRecord* dests = fDests; dests != NULL; dests = dests->fNext) {
00219 if (addr.s_addr == dests->fGroupEId.groupAddress().s_addr
00220 && port.num() == dests->fPort.num()) {
00221 return;
00222 }
00223 }
00224
00225 fDests = new destRecord(addr, port, ttl(), fDests);
00226 }
00227
00228 void Groupsock::removeDestination(struct in_addr const& addr, Port const& port) {
00229 for (destRecord** destsPtr = &fDests; *destsPtr != NULL;
00230 destsPtr = &((*destsPtr)->fNext)) {
00231 if (addr.s_addr == (*destsPtr)->fGroupEId.groupAddress().s_addr
00232 && port.num() == (*destsPtr)->fPort.num()) {
00233
00234 destRecord* next = (*destsPtr)->fNext;
00235 (*destsPtr)->fNext = NULL;
00236 delete (*destsPtr);
00237 *destsPtr = next;
00238 return;
00239 }
00240 }
00241 }
00242
00243 void Groupsock::removeAllDestinations() {
00244 delete fDests; fDests = NULL;
00245 }
00246
00247 void Groupsock::multicastSendOnly() {
00248 socketLeaveGroup(env(), socketNum(), fIncomingGroupEId.groupAddress().s_addr);
00249 for (destRecord* dests = fDests; dests != NULL; dests = dests->fNext) {
00250 socketLeaveGroup(env(), socketNum(), dests->fGroupEId.groupAddress().s_addr);
00251 }
00252 }
00253
00254 Boolean Groupsock::output(UsageEnvironment& env, u_int8_t ttlToSend,
00255 unsigned char* buffer, unsigned bufferSize,
00256 DirectedNetInterface* interfaceNotToFwdBackTo) {
00257 do {
00258
00259 Boolean writeSuccess = True;
00260 for (destRecord* dests = fDests; dests != NULL; dests = dests->fNext) {
00261 if (!write(dests->fGroupEId.groupAddress().s_addr, dests->fPort, ttlToSend,
00262 buffer, bufferSize)) {
00263 writeSuccess = False;
00264 break;
00265 }
00266 }
00267 if (!writeSuccess) break;
00268 statsOutgoing.countPacket(bufferSize);
00269 statsGroupOutgoing.countPacket(bufferSize);
00270
00271
00272 int numMembers =
00273 outputToAllMembersExcept(interfaceNotToFwdBackTo,
00274 ttlToSend, buffer, bufferSize,
00275 ourSourceAddressForMulticast(env));
00276 if (numMembers < 0) break;
00277
00278 if (DebugLevel >= 3) {
00279 env << *this << ": wrote " << bufferSize << " bytes, ttl "
00280 << (unsigned)ttlToSend;
00281 if (numMembers > 0) {
00282 env << "; relayed to " << numMembers << " members";
00283 }
00284 env << "\n";
00285 }
00286 return True;
00287 } while (0);
00288
00289 if (DebugLevel >= 0) {
00290 env.setResultMsg("Groupsock write failed: ", env.getResultMsg());
00291 }
00292 return False;
00293 }
00294
00295 Boolean Groupsock::handleRead(unsigned char* buffer, unsigned bufferMaxSize,
00296 unsigned& bytesRead,
00297 struct sockaddr_in& fromAddress) {
00298
00299
00300
00301 bytesRead = 0;
00302
00303 int maxBytesToRead = bufferMaxSize - TunnelEncapsulationTrailerMaxSize;
00304 int numBytes = readSocket(env(), socketNum(),
00305 buffer, maxBytesToRead, fromAddress);
00306 if (numBytes < 0) {
00307 if (DebugLevel >= 0) {
00308 env().setResultMsg("Groupsock read failed: ",
00309 env().getResultMsg());
00310 }
00311 return False;
00312 }
00313
00314
00315 if (isSSM()
00316 && fromAddress.sin_addr.s_addr != sourceFilterAddress().s_addr) {
00317 return True;
00318 }
00319
00320
00321
00322
00323 bytesRead = numBytes;
00324
00325 int numMembers = 0;
00326 if (!wasLoopedBackFromUs(env(), fromAddress)) {
00327 statsIncoming.countPacket(numBytes);
00328 statsGroupIncoming.countPacket(numBytes);
00329 numMembers =
00330 outputToAllMembersExcept(NULL, ttl(),
00331 buffer, bytesRead,
00332 fromAddress.sin_addr.s_addr);
00333 if (numMembers > 0) {
00334 statsRelayedIncoming.countPacket(numBytes);
00335 statsGroupRelayedIncoming.countPacket(numBytes);
00336 }
00337 }
00338 if (DebugLevel >= 3) {
00339 env() << *this << ": read " << bytesRead << " bytes from ";
00340 env() << our_inet_ntoa(fromAddress.sin_addr);
00341 if (numMembers > 0) {
00342 env() << "; relayed to " << numMembers << " members";
00343 }
00344 env() << "\n";
00345 }
00346
00347 return True;
00348 }
00349
00350 Boolean Groupsock::wasLoopedBackFromUs(UsageEnvironment& env,
00351 struct sockaddr_in& fromAddress) {
00352 if (fromAddress.sin_addr.s_addr
00353 == ourSourceAddressForMulticast(env)) {
00354 if (fromAddress.sin_port == sourcePortNum()) {
00355 #ifdef DEBUG_LOOPBACK_CHECKING
00356 if (DebugLevel >= 3) {
00357 env() << *this << ": got looped-back packet\n";
00358 }
00359 #endif
00360 return True;
00361 }
00362 }
00363
00364 return False;
00365 }
00366
00367 int Groupsock::outputToAllMembersExcept(DirectedNetInterface* exceptInterface,
00368 u_int8_t ttlToFwd,
00369 unsigned char* data, unsigned size,
00370 netAddressBits sourceAddr) {
00371
00372 if (ttlToFwd == 0) return 0;
00373
00374 DirectedNetInterfaceSet::Iterator iter(members());
00375 unsigned numMembers = 0;
00376 DirectedNetInterface* interf;
00377 while ((interf = iter.next()) != NULL) {
00378
00379 if (interf == exceptInterface)
00380 continue;
00381
00382
00383
00384 UsageEnvironment& saveEnv = env();
00385
00386 if (!interf->SourceAddrOKForRelaying(saveEnv, sourceAddr)) {
00387 if (strcmp(saveEnv.getResultMsg(), "") != 0) {
00388
00389 return -1;
00390 } else {
00391 continue;
00392 }
00393 }
00394
00395 if (numMembers == 0) {
00396
00397
00398
00399 TunnelEncapsulationTrailer* trailerInPacket
00400 = (TunnelEncapsulationTrailer*)&data[size];
00401 TunnelEncapsulationTrailer* trailer;
00402
00403 Boolean misaligned = ((unsigned long)trailerInPacket & 3) != 0;
00404 unsigned trailerOffset;
00405 u_int8_t tunnelCmd;
00406 if (isSSM()) {
00407
00408 trailerOffset = TunnelEncapsulationTrailerAuxSize;
00409 tunnelCmd = TunnelDataAuxCmd;
00410 } else {
00411 trailerOffset = 0;
00412 tunnelCmd = TunnelDataCmd;
00413 }
00414 unsigned trailerSize = TunnelEncapsulationTrailerSize + trailerOffset;
00415 unsigned tmpTr[TunnelEncapsulationTrailerMaxSize];
00416 if (misaligned) {
00417 trailer = (TunnelEncapsulationTrailer*)&tmpTr;
00418 } else {
00419 trailer = trailerInPacket;
00420 }
00421 trailer += trailerOffset;
00422
00423 if (fDests != NULL) {
00424 trailer->address() = fDests->fGroupEId.groupAddress().s_addr;
00425 trailer->port() = fDests->fPort;
00426 }
00427 trailer->ttl() = ttlToFwd;
00428 trailer->command() = tunnelCmd;
00429
00430 if (isSSM()) {
00431 trailer->auxAddress() = sourceFilterAddress().s_addr;
00432 }
00433
00434 if (misaligned) {
00435 memmove(trailerInPacket, trailer-trailerOffset, trailerSize);
00436 }
00437
00438 size += trailerSize;
00439 }
00440
00441 interf->write(data, size);
00442 ++numMembers;
00443 }
00444
00445 return numMembers;
00446 }
00447
00448 UsageEnvironment& operator<<(UsageEnvironment& s, const Groupsock& g) {
00449 UsageEnvironment& s1 = s << timestampString() << " Groupsock("
00450 << g.socketNum() << ": "
00451 << our_inet_ntoa(g.groupAddress())
00452 << ", " << g.port() << ", ";
00453 if (g.isSSM()) {
00454 return s1 << "SSM source: "
00455 << our_inet_ntoa(g.sourceFilterAddress()) << ")";
00456 } else {
00457 return s1 << (unsigned)(g.ttl()) << ")";
00458 }
00459 }
00460
00461
00463
00464
00465
00466
00467 static HashTable* getSocketTable(UsageEnvironment& env) {
00468 if (env.groupsockPriv == NULL) {
00469 env.groupsockPriv = HashTable::create(ONE_WORD_HASH_KEYS);
00470 }
00471 return (HashTable*)(env.groupsockPriv);
00472 }
00473
00474 static Boolean unsetGroupsockBySocket(Groupsock const* groupsock) {
00475 do {
00476 if (groupsock == NULL) break;
00477
00478 int sock = groupsock->socketNum();
00479
00480 if (sock < 0) break;
00481
00482 HashTable* sockets = getSocketTable(groupsock->env());
00483 if (sockets == NULL) break;
00484
00485 Groupsock* gs = (Groupsock*)sockets->Lookup((char*)(long)sock);
00486 if (gs == NULL || gs != groupsock) break;
00487 sockets->Remove((char*)(long)sock);
00488
00489 if (sockets->IsEmpty()) {
00490
00491 delete sockets;
00492 (gs->env()).groupsockPriv = NULL;
00493 }
00494
00495 return True;
00496 } while (0);
00497
00498 return False;
00499 }
00500
00501 static Boolean setGroupsockBySocket(UsageEnvironment& env, int sock,
00502 Groupsock* groupsock) {
00503 do {
00504
00505 if (sock < 0) {
00506 char buf[100];
00507 sprintf(buf, "trying to use bad socket (%d)", sock);
00508 env.setResultMsg(buf);
00509 break;
00510 }
00511
00512 HashTable* sockets = getSocketTable(env);
00513 if (sockets == NULL) break;
00514
00515
00516
00517 Boolean alreadyExists
00518 = (sockets->Lookup((char*)(long)sock) != 0);
00519 if (alreadyExists) {
00520 char buf[100];
00521 sprintf(buf,
00522 "Attempting to replace an existing socket (%d",
00523 sock);
00524 env.setResultMsg(buf);
00525 break;
00526 }
00527
00528 sockets->Add((char*)(long)sock, groupsock);
00529 return True;
00530 } while (0);
00531
00532 return False;
00533 }
00534
00535 static Groupsock* getGroupsockBySocket(UsageEnvironment& env, int sock) {
00536 do {
00537
00538 if (sock < 0) break;
00539
00540 HashTable* sockets = getSocketTable(env);
00541 if (sockets == NULL) break;
00542
00543 return (Groupsock*)sockets->Lookup((char*)(long)sock);
00544 } while (0);
00545
00546 return NULL;
00547 }
00548
00549 Groupsock*
00550 GroupsockLookupTable::Fetch(UsageEnvironment& env,
00551 netAddressBits groupAddress,
00552 Port port, u_int8_t ttl,
00553 Boolean& isNew) {
00554 isNew = False;
00555 Groupsock* groupsock;
00556 do {
00557 groupsock = (Groupsock*) fTable.Lookup(groupAddress, (~0), port);
00558 if (groupsock == NULL) {
00559 groupsock = AddNew(env, groupAddress, (~0), port, ttl);
00560 if (groupsock == NULL) break;
00561 isNew = True;
00562 }
00563 } while (0);
00564
00565 return groupsock;
00566 }
00567
00568 Groupsock*
00569 GroupsockLookupTable::Fetch(UsageEnvironment& env,
00570 netAddressBits groupAddress,
00571 netAddressBits sourceFilterAddr, Port port,
00572 Boolean& isNew) {
00573 isNew = False;
00574 Groupsock* groupsock;
00575 do {
00576 groupsock
00577 = (Groupsock*) fTable.Lookup(groupAddress, sourceFilterAddr, port);
00578 if (groupsock == NULL) {
00579 groupsock = AddNew(env, groupAddress, sourceFilterAddr, port, 0);
00580 if (groupsock == NULL) break;
00581 isNew = True;
00582 }
00583 } while (0);
00584
00585 return groupsock;
00586 }
00587
00588 Groupsock*
00589 GroupsockLookupTable::Lookup(netAddressBits groupAddress, Port port) {
00590 return (Groupsock*) fTable.Lookup(groupAddress, (~0), port);
00591 }
00592
00593 Groupsock*
00594 GroupsockLookupTable::Lookup(netAddressBits groupAddress,
00595 netAddressBits sourceFilterAddr, Port port) {
00596 return (Groupsock*) fTable.Lookup(groupAddress, sourceFilterAddr, port);
00597 }
00598
00599 Groupsock* GroupsockLookupTable::Lookup(UsageEnvironment& env, int sock) {
00600 return getGroupsockBySocket(env, sock);
00601 }
00602
00603 Boolean GroupsockLookupTable::Remove(Groupsock const* groupsock) {
00604 unsetGroupsockBySocket(groupsock);
00605 return fTable.Remove(groupsock->groupAddress().s_addr,
00606 groupsock->sourceFilterAddress().s_addr,
00607 groupsock->port());
00608 }
00609
00610 Groupsock* GroupsockLookupTable::AddNew(UsageEnvironment& env,
00611 netAddressBits groupAddress,
00612 netAddressBits sourceFilterAddress,
00613 Port port, u_int8_t ttl) {
00614 Groupsock* groupsock;
00615 do {
00616 struct in_addr groupAddr; groupAddr.s_addr = groupAddress;
00617 if (sourceFilterAddress == netAddressBits(~0)) {
00618
00619 groupsock = new Groupsock(env, groupAddr, port, ttl);
00620 } else {
00621
00622 struct in_addr sourceFilterAddr;
00623 sourceFilterAddr.s_addr = sourceFilterAddress;
00624 groupsock = new Groupsock(env, groupAddr, sourceFilterAddr, port);
00625 }
00626
00627 if (groupsock == NULL || groupsock->socketNum() < 0) break;
00628
00629 if (!setGroupsockBySocket(env, groupsock->socketNum(), groupsock)) break;
00630
00631 fTable.Add(groupAddress, sourceFilterAddress, port, (void*)groupsock);
00632 } while (0);
00633
00634 return groupsock;
00635 }
00636
00637 GroupsockLookupTable::Iterator::Iterator(GroupsockLookupTable& groupsocks)
00638 : fIter(AddressPortLookupTable::Iterator(groupsocks.fTable)) {
00639 }
00640
00641 Groupsock* GroupsockLookupTable::Iterator::next() {
00642 return (Groupsock*) fIter.next();
00643 };