00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "DelayQueue.hh"
00022 #include "GroupsockHelper.hh"
00023
00024 static const int MILLION = 1000000;
00025
00027
00028 int Timeval::operator>=(const Timeval& arg2) const {
00029 return seconds() > arg2.seconds()
00030 || (seconds() == arg2.seconds()
00031 && useconds() >= arg2.useconds());
00032 }
00033
00034 void Timeval::operator+=(const DelayInterval& arg2) {
00035 secs() += arg2.seconds(); usecs() += arg2.useconds();
00036 if (usecs() >= MILLION) {
00037 usecs() -= MILLION;
00038 ++secs();
00039 }
00040 }
00041
00042 void Timeval::operator-=(const DelayInterval& arg2) {
00043 secs() -= arg2.seconds(); usecs() -= arg2.useconds();
00044 if (usecs() < 0) {
00045 usecs() += MILLION;
00046 --secs();
00047 }
00048 if (secs() < 0)
00049 secs() = usecs() = 0;
00050 }
00051
00052 DelayInterval operator-(const Timeval& arg1, const Timeval& arg2) {
00053 time_base_seconds secs = arg1.seconds() - arg2.seconds();
00054 time_base_seconds usecs = arg1.useconds() - arg2.useconds();
00055
00056 if (usecs < 0) {
00057 usecs += MILLION;
00058 --secs;
00059 }
00060 if (secs < 0)
00061 return DELAY_ZERO;
00062 else
00063 return DelayInterval(secs, usecs);
00064 }
00065
00066
00068
00069 DelayInterval operator*(short arg1, const DelayInterval& arg2) {
00070 time_base_seconds result_seconds = arg1*arg2.seconds();
00071 time_base_seconds result_useconds = arg1*arg2.useconds();
00072
00073 time_base_seconds carry = result_useconds/MILLION;
00074 result_useconds -= carry*MILLION;
00075 result_seconds += carry;
00076
00077 return DelayInterval(result_seconds, result_useconds);
00078 }
00079
00080 #ifndef INT_MAX
00081 #define INT_MAX 0x7FFFFFFF
00082 #endif
00083 const DelayInterval DELAY_ZERO(0, 0);
00084 const DelayInterval DELAY_SECOND(1, 0);
00085 const DelayInterval ETERNITY(INT_MAX, MILLION-1);
00086
00087
00088
00090
00091 long DelayQueueEntry::tokenCounter = 0;
00092
00093 DelayQueueEntry::DelayQueueEntry(DelayInterval delay)
00094 : fDeltaTimeRemaining(delay) {
00095 fNext = fPrev = this;
00096 fToken = ++tokenCounter;
00097 }
00098
00099 DelayQueueEntry::~DelayQueueEntry() {
00100 }
00101
00102 void DelayQueueEntry::handleTimeout() {
00103 delete this;
00104 }
00105
00106
00108
00109 DelayQueue::DelayQueue()
00110 : DelayQueueEntry(ETERNITY) {
00111 fLastSyncTime = TimeNow();
00112 }
00113
00114 DelayQueue::~DelayQueue() {
00115 while (fNext != this) removeEntry(fNext);
00116 }
00117
00118 void DelayQueue::addEntry(DelayQueueEntry* newEntry) {
00119 synchronize();
00120
00121 DelayQueueEntry* cur = head();
00122 while (newEntry->fDeltaTimeRemaining >= cur->fDeltaTimeRemaining) {
00123 newEntry->fDeltaTimeRemaining -= cur->fDeltaTimeRemaining;
00124 cur = cur->fNext;
00125 }
00126
00127 cur->fDeltaTimeRemaining -= newEntry->fDeltaTimeRemaining;
00128
00129
00130 newEntry->fNext = cur;
00131 newEntry->fPrev = cur->fPrev;
00132 cur->fPrev = newEntry->fPrev->fNext = newEntry;
00133 }
00134
00135 void DelayQueue::updateEntry(DelayQueueEntry* entry, DelayInterval newDelay) {
00136 if (entry == NULL) return;
00137
00138 removeEntry(entry);
00139 entry->fDeltaTimeRemaining = newDelay;
00140 addEntry(entry);
00141 }
00142
00143 void DelayQueue::updateEntry(long tokenToFind, DelayInterval newDelay) {
00144 DelayQueueEntry* entry = findEntryByToken(tokenToFind);
00145 updateEntry(entry, newDelay);
00146 }
00147
00148 void DelayQueue::removeEntry(DelayQueueEntry* entry) {
00149 if (entry == NULL || entry->fNext == NULL) return;
00150
00151 entry->fNext->fDeltaTimeRemaining += entry->fDeltaTimeRemaining;
00152 entry->fPrev->fNext = entry->fNext;
00153 entry->fNext->fPrev = entry->fPrev;
00154 entry->fNext = entry->fPrev = NULL;
00155
00156 }
00157
00158 DelayQueueEntry* DelayQueue::removeEntry(long tokenToFind) {
00159 DelayQueueEntry* entry = findEntryByToken(tokenToFind);
00160 removeEntry(entry);
00161 return entry;
00162 }
00163
00164 DelayInterval const& DelayQueue::timeToNextAlarm() {
00165 if (head()->fDeltaTimeRemaining == DELAY_ZERO) return DELAY_ZERO;
00166
00167 synchronize();
00168 return head()->fDeltaTimeRemaining;
00169 }
00170
00171 void DelayQueue::handleAlarm() {
00172 if (head()->fDeltaTimeRemaining != DELAY_ZERO) synchronize();
00173
00174 if (head()->fDeltaTimeRemaining == DELAY_ZERO) {
00175
00176 DelayQueueEntry* toRemove = head();
00177 removeEntry(toRemove);
00178
00179 toRemove->handleTimeout();
00180 }
00181 }
00182
00183 DelayQueueEntry* DelayQueue::findEntryByToken(long tokenToFind) {
00184 DelayQueueEntry* cur = head();
00185 while (cur != this) {
00186 if (cur->token() == tokenToFind) return cur;
00187 cur = cur->fNext;
00188 }
00189
00190 return NULL;
00191 }
00192
00193 void DelayQueue::synchronize() {
00194
00195 EventTime timeNow = TimeNow();
00196 DelayInterval timeSinceLastSync = timeNow - fLastSyncTime;
00197 fLastSyncTime = timeNow;
00198
00199
00200 DelayQueueEntry* curEntry = head();
00201 while (timeSinceLastSync >= curEntry->fDeltaTimeRemaining) {
00202 timeSinceLastSync -= curEntry->fDeltaTimeRemaining;
00203 curEntry->fDeltaTimeRemaining = DELAY_ZERO;
00204 curEntry = curEntry->fNext;
00205 }
00206 curEntry->fDeltaTimeRemaining -= timeSinceLastSync;
00207 }
00208
00209
00211
00212 EventTime TimeNow() {
00213 struct timeval tvNow;
00214
00215 gettimeofday(&tvNow, NULL);
00216
00217 return EventTime(tvNow.tv_sec, tvNow.tv_usec);
00218 }
00219
00220 DelayInterval TimeRemainingUntil(const EventTime& futureEvent) {
00221 return futureEvent - TimeNow();
00222 }
00223
00224 const EventTime THE_END_OF_TIME(INT_MAX);