00001
00002
00003
00004 #include <fcntl.h>
00005 #include <unistd.h>
00006 #ifndef USING_MINGW
00007 #include <sys/select.h>
00008 #include <sys/ioctl.h>
00009 #endif
00010
00011
00012 #include <QString>
00013 #include <QFile>
00014
00015
00016 #include "asistreamhandler.h"
00017 #include "asichannel.h"
00018 #include "ThreadedFileWriter.h"
00019 #include "dtvsignalmonitor.h"
00020 #include "streamlisteners.h"
00021 #include "mpegstreamdata.h"
00022 #include "cardutil.h"
00023
00024
00025 #include <dveo/asi.h>
00026 #include <dveo/master.h>
00027
00028 #define LOC QString("ASISH(%1): ").arg(_device)
00029
00030 QMap<QString,ASIStreamHandler*> ASIStreamHandler::_handlers;
00031 QMap<QString,uint> ASIStreamHandler::_handlers_refcnt;
00032 QMutex ASIStreamHandler::_handlers_lock;
00033
00034 ASIStreamHandler *ASIStreamHandler::Get(const QString &devname)
00035 {
00036 QMutexLocker locker(&_handlers_lock);
00037
00038 QString devkey = devname;
00039
00040 QMap<QString,ASIStreamHandler*>::iterator it = _handlers.find(devkey);
00041
00042 if (it == _handlers.end())
00043 {
00044 ASIStreamHandler *newhandler = new ASIStreamHandler(devname);
00045 newhandler->Open();
00046 _handlers[devkey] = newhandler;
00047 _handlers_refcnt[devkey] = 1;
00048
00049 LOG(VB_RECORD, LOG_INFO,
00050 QString("ASISH: Creating new stream handler %1 for %2")
00051 .arg(devkey).arg(devname));
00052 }
00053 else
00054 {
00055 _handlers_refcnt[devkey]++;
00056 uint rcount = _handlers_refcnt[devkey];
00057 LOG(VB_RECORD, LOG_INFO,
00058 QString("ASISH: Using existing stream handler %1 for %2")
00059 .arg(devkey)
00060 .arg(devname) + QString(" (%1 in use)").arg(rcount));
00061 }
00062
00063 return _handlers[devkey];
00064 }
00065
00066 void ASIStreamHandler::Return(ASIStreamHandler * & ref)
00067 {
00068 QMutexLocker locker(&_handlers_lock);
00069
00070 QString devname = ref->_device;
00071
00072 QMap<QString,uint>::iterator rit = _handlers_refcnt.find(devname);
00073 if (rit == _handlers_refcnt.end())
00074 return;
00075
00076 if (*rit > 1)
00077 {
00078 ref = NULL;
00079 (*rit)--;
00080 return;
00081 }
00082
00083 QMap<QString,ASIStreamHandler*>::iterator it = _handlers.find(devname);
00084 if ((it != _handlers.end()) && (*it == ref))
00085 {
00086 LOG(VB_RECORD, LOG_INFO, QString("ASISH: Closing handler for %1")
00087 .arg(devname));
00088 ref->Close();
00089 delete *it;
00090 _handlers.erase(it);
00091 }
00092 else
00093 {
00094 LOG(VB_GENERAL, LOG_ERR,
00095 QString("ASISH Error: Couldn't find handler for %1")
00096 .arg(devname));
00097 }
00098
00099 _handlers_refcnt.erase(rit);
00100 ref = NULL;
00101 }
00102
00103 ASIStreamHandler::ASIStreamHandler(const QString &device) :
00104 StreamHandler(device),
00105 _device_num(-1), _buf_size(-1), _fd(-1),
00106 _packet_size(TSPacket::kSize), _clock_source(kASIInternalClock),
00107 _rx_mode(kASIRXSyncOnActualConvertTo188), _drb(NULL), _mpts(NULL)
00108 {
00109 setObjectName("ASISH");
00110 }
00111
00112 void ASIStreamHandler::SetClockSource(ASIClockSource cs)
00113 {
00114 _clock_source = cs;
00115
00116
00117 }
00118
00119 void ASIStreamHandler::SetRXMode(ASIRXMode m)
00120 {
00121 _rx_mode = m;
00122
00123
00124 }
00125
00126 void ASIStreamHandler::SetRunningDesired(bool desired)
00127 {
00128 if (_drb && _running_desired && !desired)
00129 _drb->Stop();
00130 StreamHandler::SetRunningDesired(desired);
00131 }
00132
00133 void ASIStreamHandler::run(void)
00134 {
00135 RunProlog();
00136
00137 LOG(VB_RECORD, LOG_INFO, LOC + "run(): begin");
00138
00139 if (!Open())
00140 {
00141 LOG(VB_GENERAL, LOG_ERR, LOC + QString("Failed to open device %1 : %2")
00142 .arg(_device).arg(strerror(errno)));
00143 _error = true;
00144 return;
00145 }
00146
00147 DeviceReadBuffer *drb = new DeviceReadBuffer(this, true, false);
00148 bool ok = drb->Setup(_device, _fd, _packet_size, _buf_size);
00149 if (!ok)
00150 {
00151 LOG(VB_GENERAL, LOG_ERR, LOC + "Failed to allocate DRB buffer");
00152 delete drb;
00153 drb = NULL;
00154 Close();
00155 _error = true;
00156 RunEpilog();
00157 return;
00158 }
00159
00160 uint buffer_size = _packet_size * 15000;
00161 unsigned char *buffer = new unsigned char[buffer_size];
00162 if (!buffer)
00163 {
00164 LOG(VB_GENERAL, LOG_ERR, LOC + "Failed to allocate buffer");
00165 delete drb;
00166 drb = NULL;
00167 Close();
00168 _error = true;
00169 RunEpilog();
00170 return;
00171 }
00172 memset(buffer, 0, buffer_size);
00173
00174 SetRunning(true, true, false);
00175
00176 drb->Start();
00177
00178 {
00179 QMutexLocker locker(&_start_stop_lock);
00180 _drb = drb;
00181 }
00182
00183 int remainder = 0;
00184 while (_running_desired && !_error)
00185 {
00186 UpdateFiltersFromStreamData();
00187
00188 ssize_t len = 0;
00189
00190 len = drb->Read(
00191 &(buffer[remainder]), buffer_size - remainder);
00192
00193 if (!_running_desired)
00194 break;
00195
00196
00197 if (drb->IsErrored())
00198 {
00199 LOG(VB_GENERAL, LOG_ERR, LOC + "Device error detected");
00200 _error = true;
00201 }
00202
00203 if (drb->IsEOF())
00204 {
00205 LOG(VB_GENERAL, LOG_ERR, LOC + "Device EOF detected");
00206 _error = true;
00207 }
00208
00209 if ((0 == len) || (-1 == len))
00210 {
00211 usleep(100);
00212 continue;
00213 }
00214
00215 len += remainder;
00216
00217 if (len < 10)
00218 {
00219 remainder = len;
00220 continue;
00221 }
00222
00223 if (!_listener_lock.tryLock())
00224 {
00225 remainder = len;
00226 continue;
00227 }
00228
00229 if (_stream_data_list.empty())
00230 {
00231 _listener_lock.unlock();
00232 continue;
00233 }
00234
00235 StreamDataList::const_iterator sit = _stream_data_list.begin();
00236 for (; sit != _stream_data_list.end(); ++sit)
00237 remainder = sit.key()->ProcessData(buffer, len);
00238
00239 if (_mpts != NULL)
00240 _mpts->Write(buffer, len - remainder);
00241
00242 _listener_lock.unlock();
00243
00244 if (remainder > 0 && (len > remainder))
00245 memmove(buffer, &(buffer[len - remainder]), remainder);
00246 }
00247 LOG(VB_RECORD, LOG_INFO, LOC + "run(): " + "shutdown");
00248
00249 RemoveAllPIDFilters();
00250
00251 {
00252 QMutexLocker locker(&_start_stop_lock);
00253 _drb = NULL;
00254 }
00255
00256 if (drb->IsRunning())
00257 drb->Stop();
00258
00259 delete drb;
00260 delete[] buffer;
00261 Close();
00262
00263 LOG(VB_RECORD, LOG_INFO, LOC + "run(): " + "end");
00264
00265 SetRunning(false, true, false);
00266 RunEpilog();
00267 }
00268
00269 bool ASIStreamHandler::Open(void)
00270 {
00271 if (_fd >= 0)
00272 return true;
00273
00274 QString error;
00275 _device_num = CardUtil::GetASIDeviceNumber(_device, &error);
00276 if (_device_num < 0)
00277 {
00278 LOG(VB_GENERAL, LOG_ERR, LOC + error);
00279 return false;
00280 }
00281
00282 _buf_size = CardUtil::GetASIBufferSize(_device_num, &error);
00283 if (_buf_size <= 0)
00284 {
00285 LOG(VB_GENERAL, LOG_ERR, LOC + error);
00286 return false;
00287 }
00288
00289 if (!CardUtil::SetASIMode(_device_num, (uint)_rx_mode, &error))
00290 {
00291 LOG(VB_GENERAL, LOG_ERR, LOC + "Failed to set RX Mode: " + error);
00292 return false;
00293 }
00294
00295
00296 _fd = open(_device.toLocal8Bit().constData(), O_RDONLY, 0);
00297 if (_fd < 0)
00298 {
00299 LOG(VB_GENERAL, LOG_ERR, LOC +
00300 QString("Failed to open '%1'").arg(_device) + ENO);
00301 return false;
00302 }
00303
00304
00305 unsigned int cap;
00306 if (ioctl(_fd, ASI_IOC_RXGETCAP, &cap) < 0)
00307 {
00308 LOG(VB_GENERAL, LOG_ERR, LOC +
00309 QString("Failed to query capabilities '%1'").arg(_device) + ENO);
00310 Close();
00311 return false;
00312 }
00313
00314
00315
00316 switch (_rx_mode)
00317 {
00318 case kASIRXRawMode:
00319 case kASIRXSyncOnActualSize:
00320 _packet_size = TSPacket::kDVBEmissionSize * TSPacket::kSize;
00321 break;
00322 case kASIRXSyncOn204:
00323 _packet_size = TSPacket::kDVBEmissionSize;
00324 break;
00325 case kASIRXSyncOn188:
00326 case kASIRXSyncOnActualConvertTo188:
00327 case kASIRXSyncOn204ConvertTo188:
00328 _packet_size = TSPacket::kSize;
00329 break;
00330 }
00331
00332
00333
00334 return _fd >= 0;
00335 }
00336
00337 void ASIStreamHandler::Close(void)
00338 {
00339 if (_fd >= 0)
00340 {
00341 close(_fd);
00342 _fd = -1;
00343 }
00344 }
00345
00346 typedef ThreadedFileWriter* ThreadedFileWriterP;
00347 static bool named_output_file_common(
00348 const QString &_device, ThreadedFileWriterP &tfw, QMap<QString,int> &files)
00349 {
00350 if (tfw)
00351 {
00352 delete tfw;
00353 tfw = NULL;
00354 }
00355
00356 QMap<QString,int>::iterator it = files.begin();
00357 if (it == files.end())
00358 return true;
00359
00360 for (it = files.begin(); it != files.end(); ++it)
00361 (*it)++;
00362
00363 QString fn = QString("%1.%2.raw")
00364 .arg(files.begin().key()).arg(*files.begin());
00365
00366 tfw = new ThreadedFileWriter(
00367 fn, O_WRONLY|O_TRUNC|O_CREAT|O_LARGEFILE, 0644);
00368 if (!tfw->Open())
00369 {
00370 delete tfw;
00371 tfw = NULL;
00372 return false;
00373 }
00374
00375 bool ok = true;
00376 const QByteArray ba = fn.toLocal8Bit();
00377 for (; ok && it != files.end(); ++it)
00378 {
00379 int ret = link(ba.constData(), it.key().toLocal8Bit().constData());
00380 if (ret < 0)
00381 {
00382 LOG(VB_GENERAL, LOG_ERR, LOC +
00383 QString("Failed to link '%1' to '%2'").arg(it.key()).arg(fn) +
00384 ENO);
00385 }
00386 ok &= ret >= 0;
00387 }
00388
00389 return ok;
00390 }
00391 void ASIStreamHandler::AddNamedOutputFile(const QString &file)
00392 {
00393 _mpts_files[file] = -1;
00394 named_output_file_common(_device, _mpts, _mpts_files);
00395 }
00396
00397 void ASIStreamHandler::RemoveNamedOutputFile(const QString &file)
00398 {
00399 QMap<QString,int>::iterator it = _mpts_files.find(file);
00400 if (it != _mpts_files.end())
00401 {
00402 _mpts_files.erase(it);
00403 named_output_file_common(_device, _mpts, _mpts_files);
00404 }
00405 }
00406
00407 void ASIStreamHandler::PriorityEvent(int fd)
00408 {
00409
00410 }