00001
00002 #include <cstdlib>
00003
00004
00005 #include <algorithm>
00006
00007
00008 #include "compat.h"
00009
00010
00011 #ifndef USING_MINGW
00012 #include <sys/select.h>
00013 #endif
00014 #include <sys/types.h>
00015 #include <fcntl.h>
00016 #include <errno.h>
00017
00018 #ifndef O_NONBLOCK
00019 #define O_NONBLOCK 0
00020 #endif
00021
00022
00023 #include <QTime>
00024
00025
00026 #include "mythsocketthread.h"
00027 #include "mythbaseutil.h"
00028 #include "mythlogging.h"
00029 #include "mythsocket.h"
00030
00031 #define SLOC(a) QString("MythSocketThread(sock 0x%1:%2): ")\
00032 .arg((uint64_t)a, 0, 16).arg(a->socket())
00033 #define LOC QString("MythSocketThread: ")
00034
00035 const uint MythSocketThread::kShortWait = 100;
00036
00037 MythSocketThread::MythSocketThread()
00038 : MThread("Socket"), m_readyread_run(false)
00039 {
00040 for (int i = 0; i < 2; i++)
00041 {
00042 m_readyread_pipe[i] = -1;
00043 m_readyread_pipe_flags[i] = 0;
00044 }
00045 }
00046
00047 void ShutdownRRT(void)
00048 {
00049 QMutexLocker locker(&MythSocket::s_readyread_thread_lock);
00050 if (MythSocket::s_readyread_thread)
00051 {
00052 MythSocket::s_readyread_thread->ShutdownReadyReadThread();
00053 MythSocket::s_readyread_thread->wait();
00054 }
00055 }
00056
00057 void MythSocketThread::ShutdownReadyReadThread(void)
00058 {
00059 {
00060 QMutexLocker locker(&m_readyread_lock);
00061 m_readyread_run = false;
00062 }
00063
00064 WakeReadyReadThread();
00065
00066 wait();
00067
00068 CloseReadyReadPipe();
00069 }
00070
00071 void MythSocketThread::CloseReadyReadPipe(void) const
00072 {
00073 for (uint i = 0; i < 2; i++)
00074 {
00075 if (m_readyread_pipe[i] >= 0)
00076 {
00077 ::close(m_readyread_pipe[i]);
00078 m_readyread_pipe[i] = -1;
00079 m_readyread_pipe_flags[i] = 0;
00080 }
00081 }
00082 }
00083
00084 void MythSocketThread::StartReadyReadThread(void)
00085 {
00086 QMutexLocker locker(&m_readyread_lock);
00087 if (!m_readyread_run)
00088 {
00089 atexit(ShutdownRRT);
00090 setup_pipe(m_readyread_pipe, m_readyread_pipe_flags);
00091 m_readyread_run = true;
00092 start();
00093 m_readyread_started_wait.wait(&m_readyread_lock);
00094 }
00095 }
00096
00097 void MythSocketThread::AddToReadyRead(MythSocket *sock)
00098 {
00099 if (sock->socket() == -1)
00100 {
00101 LOG(VB_SOCKET, LOG_ERR, SLOC(sock) +
00102 "attempted to insert invalid socket to ReadyRead");
00103 return;
00104 }
00105 StartReadyReadThread();
00106
00107 sock->UpRef();
00108
00109 {
00110 QMutexLocker locker(&m_readyread_lock);
00111 m_readyread_addlist.push_back(sock);
00112 }
00113
00114 WakeReadyReadThread();
00115 }
00116
00117 void MythSocketThread::RemoveFromReadyRead(MythSocket *sock)
00118 {
00119 {
00120 QMutexLocker locker(&m_readyread_lock);
00121 m_readyread_dellist.push_back(sock);
00122 }
00123 WakeReadyReadThread();
00124 }
00125
00126 void MythSocketThread::WakeReadyReadThread(void) const
00127 {
00128 if (!isRunning())
00129 return;
00130
00131 QMutexLocker locker(&m_readyread_lock);
00132 m_readyread_wait.wakeAll();
00133
00134 if (m_readyread_pipe[1] < 0)
00135 return;
00136
00137 char buf[1] = { '0' };
00138 ssize_t wret = 0;
00139 while (wret <= 0)
00140 {
00141 wret = ::write(m_readyread_pipe[1], &buf, 1);
00142 if ((wret < 0) && (EAGAIN != errno) && (EINTR != errno))
00143 {
00144 LOG(VB_GENERAL, LOG_ERR, LOC +
00145 "Failed to write to readyread pipe, closing pipe.");
00146
00147
00148
00149
00150 CloseReadyReadPipe();
00151 break;
00152 }
00153 }
00154 }
00155
00156 void MythSocketThread::ReadyToBeRead(MythSocket *sock)
00157 {
00158 LOG(VB_SOCKET, LOG_DEBUG, SLOC(sock) + "socket is readable");
00159 int bytesAvail = sock->bytesAvailable();
00160
00161 if (bytesAvail == 0 && sock->closedByRemote())
00162 {
00163 LOG(VB_SOCKET, LOG_INFO, SLOC(sock) + "socket closed");
00164 sock->close();
00165 }
00166 else if (bytesAvail > 0 && sock->m_cb && sock->m_useReadyReadCallback)
00167 {
00168 sock->m_notifyread = true;
00169 LOG(VB_SOCKET, LOG_DEBUG, SLOC(sock) + "calling m_cb->readyRead()");
00170 sock->m_cb->readyRead(sock);
00171 }
00172 }
00173
00174 void MythSocketThread::ProcessAddRemoveQueues(void)
00175 {
00176 while (!m_readyread_dellist.empty())
00177 {
00178 MythSocket *sock = m_readyread_dellist.front();
00179 m_readyread_dellist.pop_front();
00180
00181 if (m_readyread_list.removeAll(sock))
00182 m_readyread_downref_list.push_back(sock);
00183 }
00184
00185 while (!m_readyread_addlist.empty())
00186 {
00187 MythSocket *sock = m_readyread_addlist.front();
00188 m_readyread_addlist.pop_front();
00189 m_readyread_list.push_back(sock);
00190 }
00191 }
00192
00193 void MythSocketThread::run(void)
00194 {
00195 RunProlog();
00196 LOG(VB_SOCKET, LOG_DEBUG, LOC + "readyread thread start");
00197
00198 QMutexLocker locker(&m_readyread_lock);
00199 m_readyread_started_wait.wakeAll();
00200 while (m_readyread_run)
00201 {
00202 LOG(VB_SOCKET, LOG_DEBUG, LOC + "ProcessAddRemoveQueues");
00203
00204 ProcessAddRemoveQueues();
00205
00206 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Construct FD_SET");
00207
00208
00209 int maxfd = -1;
00210 fd_set rfds;
00211 FD_ZERO(&rfds);
00212
00213 QList<MythSocket*>::const_iterator it = m_readyread_list.begin();
00214 for (; it != m_readyread_list.end(); ++it)
00215 {
00216 if (!(*it)->TryLock(false))
00217 continue;
00218
00219 if ((*it)->state() == MythSocket::Connected &&
00220 !(*it)->m_notifyread)
00221 {
00222 FD_SET((*it)->socket(), &rfds);
00223 maxfd = std::max((*it)->socket(), maxfd);
00224 }
00225 (*it)->Unlock(false);
00226 }
00227
00228
00229 if (maxfd < 0)
00230 {
00231 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Empty FD_SET, sleeping");
00232 if (m_readyread_wait.wait(&m_readyread_lock))
00233 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Empty FD_SET, woken up");
00234 else
00235 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Empty FD_SET, timed out");
00236 continue;
00237 }
00238
00239 int rval = 0;
00240
00241 if (m_readyread_pipe[0] >= 0)
00242 {
00243
00244
00245 char dummy[128];
00246 if (m_readyread_pipe_flags[0] & O_NONBLOCK)
00247 {
00248 rval = ::read(m_readyread_pipe[0], dummy, 128);
00249 FD_SET(m_readyread_pipe[0], &rfds);
00250 maxfd = std::max(m_readyread_pipe[0], maxfd);
00251 }
00252
00253
00254 fd_set efds;
00255 memcpy(&efds, &rfds, sizeof(fd_set));
00256
00257
00258
00259
00260
00261 m_readyread_lock.unlock();
00262 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Waiting on select..");
00263 rval = select(maxfd + 1, &rfds, NULL, &efds, NULL);
00264 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Got data on select");
00265 m_readyread_lock.lock();
00266
00267 if (rval > 0 && FD_ISSET(m_readyread_pipe[0], &rfds))
00268 {
00269 int ret = ::read(m_readyread_pipe[0], dummy, 128);
00270 if (ret < 0)
00271 {
00272 LOG(VB_SOCKET, LOG_ERR, LOC +
00273 "Strange.. failed to read event pipe");
00274 }
00275 }
00276 }
00277 else
00278 {
00279 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Waiting on select.. (no pipe)");
00280
00281 fd_set savefds;
00282 memcpy(&savefds, &rfds, sizeof(fd_set));
00283
00284
00285
00286
00287 while (!rval)
00288 {
00289
00290 fd_set efds;
00291 memcpy(&efds, &savefds, sizeof(fd_set));
00292
00293 struct timeval timeout;
00294 timeout.tv_sec = 0;
00295 timeout.tv_usec = kShortWait * 1000;
00296 rval = select(maxfd + 1, &rfds, NULL, &efds, &timeout);
00297 if (!rval)
00298 {
00299 m_readyread_wait.wait(&m_readyread_lock, kShortWait);
00300 memcpy(&rfds, &savefds, sizeof(fd_set));
00301 }
00302 }
00303
00304 if (rval > 0)
00305 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Got data on select (no pipe)");
00306 }
00307
00308 if (rval <= 0)
00309 {
00310 if (rval == 0)
00311 {
00312
00313
00314 LOG(VB_SOCKET, LOG_DEBUG, LOC + "select timeout");
00315 }
00316 else
00317 LOG(VB_SOCKET, LOG_ERR, LOC + "select returned error" + ENO);
00318
00319 m_readyread_wait.wait(&m_readyread_lock, kShortWait);
00320 continue;
00321 }
00322
00323
00324
00325
00326 m_readyread_lock.unlock();
00327
00328
00329
00330
00331 uint downref_tm = 0;
00332 if (!m_readyread_downref_list.empty())
00333 {
00334 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Deleting stale sockets");
00335
00336 QTime tm = QTime::currentTime();
00337 for (it = m_readyread_downref_list.begin();
00338 it != m_readyread_downref_list.end(); ++it)
00339 {
00340 (*it)->DownRef();
00341 }
00342 m_readyread_downref_list.clear();
00343 downref_tm = tm.elapsed();
00344 }
00345
00346 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Processing ready reads");
00347
00348 QMap<uint,uint> timers;
00349 QTime tm = QTime::currentTime();
00350 it = m_readyread_list.begin();
00351
00352 for (; it != m_readyread_list.end() && m_readyread_run; ++it)
00353 {
00354 if (!(*it)->TryLock(false))
00355 continue;
00356
00357 int socket = (*it)->socket();
00358
00359 if (socket >= 0 &&
00360 (*it)->state() == MythSocket::Connected &&
00361 FD_ISSET(socket, &rfds))
00362 {
00363 QTime rrtm = QTime::currentTime();
00364 ReadyToBeRead(*it);
00365 timers[socket] = rrtm.elapsed();
00366 }
00367 (*it)->Unlock(false);
00368 }
00369
00370 if (VERBOSE_LEVEL_CHECK(VB_SOCKET, LOG_DEBUG))
00371 {
00372 QString rep = QString("Total read time: %1ms, on sockets")
00373 .arg(tm.elapsed());
00374 QMap<uint,uint>::const_iterator it = timers.begin();
00375 for (; it != timers.end(); ++it)
00376 rep += QString(" {%1,%2ms}").arg(it.key()).arg(*it);
00377 if (downref_tm)
00378 rep += QString(" {downref, %1ms}").arg(downref_tm);
00379
00380 LOG(VB_SOCKET, LOG_DEBUG, LOC + rep);
00381 }
00382
00383 m_readyread_lock.lock();
00384 LOG(VB_SOCKET, LOG_DEBUG, LOC + "Reacquired ready read lock");
00385 }
00386
00387 LOG(VB_SOCKET, LOG_DEBUG, LOC + "readyread thread exit");
00388 RunEpilog();
00389 }