00001
00007 #include <sys/types.h>
00008 #include <sys/socket.h>
00009 #include <sys/stat.h>
00010 #include <sys/file.h>
00011 #include <errno.h>
00012 #include <fcntl.h>
00013 #include <netinet/in.h>
00014
00015 #include <QMutexLocker>
00016 #include <QStringList>
00017 #include <QUdpSocket>
00018
00019 #include "cetonrtp.h"
00020 #include "mythlogging.h"
00021 #include "mythcorecontext.h"
00022
00023 #define LOC QString("CetonRTP(%1-%2): ").arg(_ip).arg(_tuner)
00024
00025 #define RTPDATAGRAMSIZE 1328
00026
00027 CetonRTP::CetonRTP(const QString& ip, uint tuner, ushort listenPort,
00028 ushort controlPort):
00029 _ip(ip),
00030 _tuner(tuner),
00031 _listenPort(listenPort),
00032 _rtpSocket(NULL),
00033 _acceptFrom(ip),
00034 _rtsp(ip, tuner, controlPort),
00035 _lastRead(0),
00036 _thisRead(0),
00037 _lastSequenceNumber(-1)
00038 {
00039 }
00040
00041 bool CetonRTP::Init(void)
00042 {
00043 QStringList options;
00044 if (!(_rtsp.GetOptions(options) && options.contains("OPTIONS") &&
00045 options.contains("DESCRIBE") && options.contains("SETUP") &&
00046 options.contains("PLAY") && options.contains("TEARDOWN")))
00047 {
00048 LOG(VB_RECORD, LOG_ERR, LOC +
00049 "RTSP interface did not support the necessary options");
00050 return false;
00051 }
00052
00053 if (!_rtpSocket.bind(_listenPort))
00054 {
00055 LOG(VB_RECORD, LOG_ERR, LOC +
00056 QString("UDP socket failed to bind on port %1").arg(_listenPort));
00057 return false;
00058 }
00059
00060 if (!_dummySocket.bind())
00061 {
00062 LOG(VB_RECORD, LOG_ERR, LOC +
00063 "UDP dummy socket failed to bind on any port");
00064 return false;
00065 }
00066
00067 int socket = _rtpSocket.socketDescriptor();
00068 if (socket > 0)
00069 {
00070 int oldBufferSize;
00071 socklen_t bs_size = sizeof(oldBufferSize);
00072 getsockopt(socket, SOL_SOCKET, SO_RCVBUF, &oldBufferSize, &bs_size);
00073
00074 int requestedBufferSize = gCoreContext->GetNumSetting("CetonSocketBufferSize", 1024*1024*16);
00075 bs_size = sizeof(requestedBufferSize);
00076 if (setsockopt(socket, SOL_SOCKET, SO_RCVBUF, &requestedBufferSize,
00077 sizeof(requestedBufferSize)) == -1)
00078 {
00079 LOG(VB_RECORD, LOG_WARNING, LOC +
00080 "Unable to adjust socket receive buffer size");
00081 }
00082
00083 int newBufferSize;
00084 bs_size = sizeof(newBufferSize);
00085 getsockopt(socket,SOL_SOCKET, SO_RCVBUF, &newBufferSize, &bs_size);
00086 LOG(VB_RECORD, LOG_INFO, LOC +
00087 QString("Buffer size was %1 - requested %2 - now %3")
00088 .arg(oldBufferSize).arg(requestedBufferSize).arg(newBufferSize));
00089 }
00090
00091 return true;
00092 }
00093
00094 bool CetonRTP::StartStreaming(void)
00095 {
00096 return (_rtsp.Describe() &&
00097 _rtsp.Setup(_rtpSocket.localPort(), _dummySocket.localPort()) &&
00098 _rtsp.Play());
00099 }
00100
00101 bool CetonRTP::StopStreaming(void)
00102 {
00103 return _rtsp.Teardown();
00104 }
00105
00106 int CetonRTP::Read(char *buffer, int bufferSize)
00107 {
00108 ProcessIncomingData();
00109
00110 int count =_mpegBuffer.size();
00111 if (count > bufferSize) count = bufferSize;
00112
00113 char* data = _mpegBuffer.data();
00114 memcpy(buffer, data, count);
00115 _mpegBuffer.remove(0, count);
00116
00117 return count;
00118 }
00119
00120 void CetonRTP::ProcessIncomingData(void)
00121 {
00122 QHostAddress receivedFrom;
00123
00124 _lastRead = _thisRead;
00125 _thisRead = 0;
00126
00127 int bufferSize = _readBuffer.size();
00128
00129 while (_rtpSocket.hasPendingDatagrams())
00130 {
00131 int bytesWaiting = _rtpSocket.pendingDatagramSize();
00132 _readBuffer.resize(bufferSize+bytesWaiting);
00133 char *data = _readBuffer.data();
00134
00135 int bytesRead = _rtpSocket.readDatagram(data+bufferSize,
00136 bytesWaiting, &receivedFrom);
00137 if (bytesRead != bytesWaiting)
00138 {
00139 LOG(VB_RECORD, LOG_ERR, LOC +
00140 QString("RTP packet error. Expected %1 bytes, received %2")
00141 .arg(bytesWaiting).arg(bytesRead));
00142 }
00143 if (receivedFrom != _acceptFrom)
00144 {
00145 LOG(VB_RECORD, LOG_WARNING, LOC +
00146 QString("Got packet from %1 instead of %2")
00147 .arg(receivedFrom.toString(), _acceptFrom.toString()));
00148
00149 _readBuffer.resize(bufferSize);
00150 continue;
00151 }
00152
00153 bufferSize += bytesWaiting;
00154 _thisRead += bytesWaiting;
00155 }
00156
00157 char *data = _readBuffer.data();
00158 int offset = 0;
00159 while((offset+RTPDATAGRAMSIZE) <= bufferSize)
00160 {
00161 char *packet = &data[offset];
00162 char *payload;
00163 int payloadSize;
00164 int sequenceNumber;
00165
00166 if (FindRTPPayload(packet, payload, payloadSize, sequenceNumber))
00167 {
00168 _mpegBuffer.append(payload, payloadSize);
00169
00170 if (_lastSequenceNumber != -1)
00171 {
00172 int expectedSequenceNumber = (_lastSequenceNumber+1) & 0x0FFFF;
00173 if (sequenceNumber != expectedSequenceNumber)
00174 {
00175 LOG(VB_RECORD, LOG_WARNING, LOC +
00176 QString("Sequence number went from %1 to %2 "
00177 "[last read=%3, this read=%4]")
00178 .arg(_lastSequenceNumber).arg(sequenceNumber)
00179 .arg(_lastRead).arg(_thisRead));
00180 }
00181 }
00182 _lastSequenceNumber = sequenceNumber;
00183 }
00184
00185 offset += RTPDATAGRAMSIZE;
00186 }
00187
00188 _readBuffer.remove(0, offset);
00189 }
00190
00191 bool CetonRTP::FindRTPPayload(
00192 char *packet, char *&payload, int &payloadSize, int &sequenceNumber)
00193 {
00194 RtpFixedHeader *header = reinterpret_cast<RtpFixedHeader*>(packet);
00195
00196 if (header->version != 2)
00197 {
00198 LOG(VB_RECORD, LOG_ERR, LOC +
00199 QString("Unhandled RTP version %1 packet (expected ver 2)")
00200 .arg(header->version));
00201 return false;
00202 }
00203
00204 if (header->payloadtype != 33)
00205 {
00206 LOG(VB_RECORD, LOG_ERR, LOC +
00207 "Error: can only process RTP packets with content type 33");
00208 return false;
00209 }
00210
00211 int headerSize = sizeof(RtpFixedHeader) + (4*header->csrccount);
00212
00213 if (header->extension == 1)
00214 {
00215 ushort extHeaderLength = *(ushort*)(packet+headerSize+2);
00216 headerSize += 4 * (1 + extHeaderLength);
00217 }
00218
00219 payload = packet + headerSize;
00220 payloadSize = 1328 - headerSize;
00221 sequenceNumber = ntohs(header->sequencenumber);
00222
00223 if (header->padding)
00224 {
00225 int paddingSize = payload[1328];
00226 payloadSize -= paddingSize;
00227 }
00228
00229 return true;
00230 }