00001 #include <cstdlib>
00002 #include <cerrno>
00003
00004
00005 #include <sys/types.h>
00006 #include <sys/time.h>
00007 #include <unistd.h>
00008 #include <fcntl.h>
00009
00010 #include <QFileInfo>
00011 #include <QDir>
00012
00013 #include "ThreadedFileWriter.h"
00014 #include "fileringbuffer.h"
00015 #include "mythcontext.h"
00016 #include "remotefile.h"
00017 #include "mythconfig.h"
00018 #include "compat.h"
00019 #include "mythmiscutil.h"
00020
00021 #if HAVE_POSIX_FADVISE < 1
00022 static int posix_fadvise(int, off_t, off_t, int) { return 0; }
00023 #define POSIX_FADV_SEQUENTIAL 0
00024 #define POSIX_FADV_WILLNEED 0
00025 #define POSIX_FADV_DONTNEED 0
00026 #endif
00027
00028 #ifndef O_STREAMING
00029 #define O_STREAMING 0
00030 #endif
00031
00032 #ifndef O_LARGEFILE
00033 #define O_LARGEFILE 0
00034 #endif
00035
00036 #ifndef O_BINARY
00037 #define O_BINARY 0
00038 #endif
00039
00040 #define LOC QString("FileRingBuf(%1): ").arg(filename)
00041
00042 FileRingBuffer::FileRingBuffer(const QString &lfilename,
00043 bool write, bool readahead, int timeout_ms)
00044 : RingBuffer(kRingBuffer_File)
00045 {
00046 startreadahead = readahead;
00047 safefilename = lfilename;
00048 filename = lfilename;
00049
00050 if (write)
00051 {
00052 if (filename.startsWith("myth://"))
00053 {
00054 remotefile = new RemoteFile(filename, true);
00055 if (!remotefile->isOpen())
00056 {
00057 LOG(VB_GENERAL, LOG_ERR,
00058 QString("RingBuffer::RingBuffer(): Failed to open "
00059 "remote file (%1) for write").arg(filename));
00060 delete remotefile;
00061 remotefile = NULL;
00062 }
00063 else
00064 writemode = true;
00065 }
00066 else
00067 {
00068 tfw = new ThreadedFileWriter(
00069 filename, O_WRONLY|O_TRUNC|O_CREAT|O_LARGEFILE, 0644);
00070
00071 if (!tfw->Open())
00072 {
00073 delete tfw;
00074 tfw = NULL;
00075 }
00076 else
00077 writemode = true;
00078 }
00079 }
00080 else if (timeout_ms >= 0)
00081 {
00082 OpenFile(filename, timeout_ms);
00083 }
00084 }
00085
00086 FileRingBuffer::~FileRingBuffer()
00087 {
00088 rwlock.lockForWrite();
00089
00090 if (remotefile)
00091 {
00092 delete remotefile;
00093 remotefile = NULL;
00094 }
00095
00096 if (tfw)
00097 {
00098 delete tfw;
00099 tfw = NULL;
00100 }
00101
00102 if (fd2 >= 0)
00103 {
00104 close(fd2);
00105 fd2 = -1;
00106 }
00107
00108 rwlock.unlock();
00109 }
00110
00115 static bool check_permissions(const QString &filename)
00116 {
00117 QFileInfo fileInfo(filename);
00118 if (fileInfo.exists() && !fileInfo.isReadable())
00119 {
00120 LOG(VB_GENERAL, LOG_ERR, LOC +
00121 "File exists but is not readable by MythTV!");
00122 return false;
00123 }
00124 return true;
00125 }
00126
00127 static bool is_subtitle_possible(const QString &extension)
00128 {
00129 QMutexLocker locker(&RingBuffer::subExtLock);
00130 bool no_subtitle = false;
00131 for (uint i = 0; i < (uint)RingBuffer::subExtNoCheck.size(); i++)
00132 {
00133 if (extension.contains(RingBuffer::subExtNoCheck[i].right(3)))
00134 {
00135 no_subtitle = true;
00136 break;
00137 }
00138 }
00139 return !no_subtitle;
00140 }
00141
00142 static QString local_sub_filename(QFileInfo &fileInfo)
00143 {
00144
00145 QString vidFileName = fileInfo.fileName();
00146 QString dirName = fileInfo.absolutePath();
00147
00148 QString baseName = vidFileName;
00149 int suffixPos = vidFileName.lastIndexOf(QChar('.'));
00150 if (suffixPos > 0)
00151 baseName = vidFileName.left(suffixPos);
00152
00153 QStringList el;
00154 {
00155
00156
00157 const QString findBaseName = baseName
00158 .replace("[", "?")
00159 .replace("]", "?")
00160 .replace("(", "?")
00161 .replace(")", "?");
00162
00163 QMutexLocker locker(&RingBuffer::subExtLock);
00164 QStringList::const_iterator eit = RingBuffer::subExt.begin();
00165 for (; eit != RingBuffer::subExt.end(); ++eit)
00166 el += findBaseName + *eit;
00167 }
00168
00169
00170
00171 QDir dir;
00172 dir.setPath(dirName);
00173
00174 const QStringList candidates = dir.entryList(el);
00175
00176 QStringList::const_iterator cit = candidates.begin();
00177 for (; cit != candidates.end(); ++cit)
00178 {
00179 QFileInfo fi(dirName + "/" + *cit);
00180 if (fi.exists() && (fi.size() >= kReadTestSize))
00181 return fi.absoluteFilePath();
00182 }
00183
00184 return QString::null;
00185 }
00186
00187 bool FileRingBuffer::OpenFile(const QString &lfilename, uint retry_ms)
00188 {
00189 LOG(VB_PLAYBACK, LOG_INFO, LOC + QString("OpenFile(%1, %2 ms)")
00190 .arg(lfilename).arg(retry_ms));
00191
00192 rwlock.lockForWrite();
00193
00194 filename = lfilename;
00195 safefilename = lfilename;
00196
00197 if (remotefile)
00198 {
00199 delete remotefile;
00200 remotefile = NULL;
00201 }
00202
00203 if (fd2 >= 0)
00204 {
00205 close(fd2);
00206 fd2 = -1;
00207 }
00208
00209 bool is_local =
00210 (filename.left(4) != "/dev") &&
00211 ((filename.left(1) == "/") || QFile::exists(filename));
00212
00213 if (is_local)
00214 {
00215 char buf[kReadTestSize];
00216 int lasterror = 0;
00217
00218 MythTimer openTimer;
00219 openTimer.start();
00220
00221 uint openAttempts = 0;
00222 do
00223 {
00224 openAttempts++;
00225 lasterror = 0;
00226
00227 fd2 = open(filename.toLocal8Bit().constData(),
00228 O_RDONLY|O_LARGEFILE|O_STREAMING|O_BINARY);
00229
00230 if (fd2 < 0)
00231 {
00232 if (!check_permissions(filename))
00233 {
00234 lasterror = 3;
00235 break;
00236 }
00237
00238 lasterror = 1;
00239 usleep(10 * 1000);
00240 }
00241 else
00242 {
00243 int ret = read(fd2, buf, kReadTestSize);
00244 if (ret != (int)kReadTestSize)
00245 {
00246 lasterror = 2;
00247 close(fd2);
00248 fd2 = -1;
00249 if (oldfile)
00250 break;
00251 usleep(10 * 1000);
00252 }
00253 else
00254 {
00255 if (0 == lseek(fd2, 0, SEEK_SET))
00256 {
00257 posix_fadvise(fd2, 0, 0, POSIX_FADV_SEQUENTIAL);
00258 posix_fadvise(fd2, 0, 128*1024, POSIX_FADV_WILLNEED);
00259 lasterror = 0;
00260 break;
00261 }
00262 lasterror = 4;
00263 close(fd2);
00264 fd2 = -1;
00265 }
00266 }
00267 }
00268 while ((uint)openTimer.elapsed() < retry_ms);
00269
00270 switch (lasterror)
00271 {
00272 case 0:
00273 {
00274 QFileInfo fi(filename);
00275 oldfile = fi.lastModified()
00276 .secsTo(QDateTime::currentDateTime()) > 60;
00277 QString extension = fi.completeSuffix().toLower();
00278 if (is_subtitle_possible(extension))
00279 subtitlefilename = local_sub_filename(fi);
00280 break;
00281 }
00282 case 1:
00283 LOG(VB_GENERAL, LOG_ERR, LOC +
00284 QString("OpenFile(): Could not open."));
00285 break;
00286 case 2:
00287 LOG(VB_GENERAL, LOG_ERR, LOC +
00288 QString("OpenFile(): File too small (%1B).")
00289 .arg(QFileInfo(filename).size()));
00290 break;
00291 case 3:
00292 LOG(VB_GENERAL, LOG_ERR, LOC +
00293 "OpenFile(): Improper permissions.");
00294 break;
00295 case 4:
00296 LOG(VB_GENERAL, LOG_ERR, LOC +
00297 "OpenFile(): Cannot seek in file.");
00298 break;
00299 default:
00300 break;
00301 }
00302 LOG(VB_FILE, LOG_INFO,
00303 LOC + QString("OpenFile() made %1 attempts in %2 ms")
00304 .arg(openAttempts).arg(openTimer.elapsed()));
00305
00306 }
00307 else
00308 {
00309 QString tmpSubName = filename;
00310 QString dirName = ".";
00311
00312 int dirPos = filename.lastIndexOf(QChar('/'));
00313 if (dirPos > 0)
00314 {
00315 tmpSubName = filename.mid(dirPos + 1);
00316 dirName = filename.left(dirPos);
00317 }
00318
00319 QString baseName = tmpSubName;
00320 QString extension = tmpSubName;
00321 QStringList auxFiles;
00322
00323 int suffixPos = tmpSubName.lastIndexOf(QChar('.'));
00324 if (suffixPos > 0)
00325 {
00326 baseName = tmpSubName.left(suffixPos);
00327 extension = tmpSubName.right(suffixPos-1);
00328 if (is_subtitle_possible(extension))
00329 {
00330 QMutexLocker locker(&subExtLock);
00331 QStringList::const_iterator eit = subExt.begin();
00332 for (; eit != subExt.end(); ++eit)
00333 auxFiles += baseName + *eit;
00334 }
00335 }
00336
00337 remotefile = new RemoteFile(filename, false, true,
00338 retry_ms, &auxFiles);
00339 if (!remotefile->isOpen())
00340 {
00341 LOG(VB_GENERAL, LOG_ERR, LOC +
00342 QString("RingBuffer::RingBuffer(): Failed to open remote "
00343 "file (%1)").arg(filename));
00344 delete remotefile;
00345 remotefile = NULL;
00346 }
00347 else
00348 {
00349 QStringList aux = remotefile->GetAuxiliaryFiles();
00350 if (aux.size())
00351 subtitlefilename = dirName + "/" + aux[0];
00352 }
00353 }
00354
00355 setswitchtonext = false;
00356 ateof = false;
00357 commserror = false;
00358 numfailures = 0;
00359
00360 rawbitrate = 8000;
00361 CalcReadAheadThresh();
00362
00363 bool ok = fd2 >= 0 || remotefile;
00364
00365 rwlock.unlock();
00366
00367 return ok;
00368 }
00369
00370 bool FileRingBuffer::ReOpen(QString newFilename)
00371 {
00372 if (!writemode)
00373 {
00374 LOG(VB_GENERAL, LOG_ERR, LOC + "Tried to ReOpen a read only file.");
00375 return false;
00376 }
00377
00378 bool result = false;
00379
00380 rwlock.lockForWrite();
00381
00382 if (tfw && tfw->ReOpen(newFilename))
00383 result = true;
00384 else if (remotefile && remotefile->ReOpen(newFilename))
00385 result = true;
00386
00387 if (result)
00388 {
00389 filename = newFilename;
00390 poslock.lockForWrite();
00391 writepos = 0;
00392 poslock.unlock();
00393 }
00394
00395 rwlock.unlock();
00396 return result;
00397 }
00398
00399 bool FileRingBuffer::IsOpen(void) const
00400 {
00401 rwlock.lockForRead();
00402 bool ret = tfw || (fd2 > -1) || remotefile;
00403 rwlock.unlock();
00404 return ret;
00405 }
00406
00418 int FileRingBuffer::safe_read(int fd, void *data, uint sz)
00419 {
00420 int ret;
00421 unsigned tot = 0;
00422 unsigned errcnt = 0;
00423 unsigned zerocnt = 0;
00424
00425 if (fd2 < 0)
00426 {
00427 LOG(VB_GENERAL, LOG_ERR, LOC +
00428 "Invalid file descriptor in 'safe_read()'");
00429 return 0;
00430 }
00431
00432 if (stopreads)
00433 return 0;
00434
00435 while (tot < sz)
00436 {
00437 ret = read(fd2, (char *)data + tot, sz - tot);
00438 if (ret < 0)
00439 {
00440 if (errno == EAGAIN)
00441 continue;
00442
00443 LOG(VB_GENERAL, LOG_ERR,
00444 LOC + "File I/O problem in 'safe_read()'" + ENO);
00445
00446 errcnt++;
00447 numfailures++;
00448 if (errcnt == 3)
00449 break;
00450 }
00451 else if (ret > 0)
00452 {
00453 tot += ret;
00454 }
00455
00456 if (oldfile)
00457 break;
00458
00459 if (ret == 0)
00460 {
00461 if (tot > 0)
00462 break;
00463
00464 zerocnt++;
00465
00466
00467
00468 if (zerocnt >= (livetvchain ? 6 : 40))
00469 {
00470 break;
00471 }
00472 }
00473 if (stopreads)
00474 break;
00475 if (tot < sz)
00476 usleep(60000);
00477 }
00478 return tot;
00479 }
00480
00489 int FileRingBuffer::safe_read(RemoteFile *rf, void *data, uint sz)
00490 {
00491 int ret = rf->Read(data, sz);
00492 if (ret < 0)
00493 {
00494 LOG(VB_GENERAL, LOG_ERR, LOC +
00495 "safe_read(RemoteFile* ...): read failed");
00496
00497 poslock.lockForRead();
00498 rf->Seek(internalreadpos - readAdjust, SEEK_SET);
00499 poslock.unlock();
00500 numfailures++;
00501 }
00502 else if (ret == 0)
00503 {
00504 LOG(VB_FILE, LOG_INFO, LOC +
00505 "safe_read(RemoteFile* ...): at EOF");
00506 }
00507
00508 return ret;
00509 }
00510
00511 long long FileRingBuffer::GetReadPosition(void) const
00512 {
00513 poslock.lockForRead();
00514 long long ret = readpos;
00515 poslock.unlock();
00516 return ret;
00517 }
00518
00519 long long FileRingBuffer::GetRealFileSize(void) const
00520 {
00521 rwlock.lockForRead();
00522 long long ret = -1;
00523 if (remotefile)
00524 ret = remotefile->GetFileSize();
00525 else
00526 ret = QFileInfo(filename).size();
00527 rwlock.unlock();
00528 return ret;
00529 }
00530
00531 long long FileRingBuffer::Seek(long long pos, int whence, bool has_lock)
00532 {
00533 LOG(VB_FILE, LOG_INFO, LOC + QString("Seek(%1,%2,%3)")
00534 .arg(pos).arg((SEEK_SET==whence)?"SEEK_SET":
00535 ((SEEK_CUR==whence)?"SEEK_CUR":"SEEK_END"))
00536 .arg(has_lock?"locked":"unlocked"));
00537
00538 long long ret = -1;
00539
00540 StopReads();
00541
00542
00543
00544 if (!has_lock)
00545 rwlock.lockForWrite();
00546
00547 StartReads();
00548
00549 if (writemode)
00550 {
00551 ret = WriterSeek(pos, whence, true);
00552 if (!has_lock)
00553 rwlock.unlock();
00554 return ret;
00555 }
00556
00557 poslock.lockForWrite();
00558
00559
00560 if (readaheadrunning &&
00561 ((whence == SEEK_SET && pos == readpos) ||
00562 (whence == SEEK_CUR && pos == 0)))
00563 {
00564 ret = readpos;
00565
00566 poslock.unlock();
00567 if (!has_lock)
00568 rwlock.unlock();
00569
00570 return ret;
00571 }
00572
00573
00574 long long new_pos = (SEEK_SET==whence) ? pos : readpos + pos;
00575
00576 #if 1
00577
00578
00579 if (readaheadrunning &&
00580 (SEEK_SET==whence || SEEK_CUR==whence))
00581 {
00582 rbrlock.lockForWrite();
00583 rbwlock.lockForRead();
00584 LOG(VB_FILE, LOG_INFO, LOC +
00585 QString("Seek(): rbrpos: %1 rbwpos: %2"
00586 "\n\t\t\treadpos: %3 internalreadpos: %4")
00587 .arg(rbrpos).arg(rbwpos)
00588 .arg(readpos).arg(internalreadpos));
00589 bool used_opt = false;
00590 if ((new_pos < readpos))
00591 {
00592 int min_safety = max(fill_min, readblocksize);
00593 int free = ((rbwpos >= rbrpos) ?
00594 rbrpos + bufferSize : rbrpos) - rbwpos;
00595 int internal_backbuf =
00596 (rbwpos >= rbrpos) ? rbrpos : rbrpos - rbwpos;
00597 internal_backbuf = min(internal_backbuf, free - min_safety);
00598 long long sba = readpos - new_pos;
00599 LOG(VB_FILE, LOG_INFO, LOC +
00600 QString("Seek(): internal_backbuf: %1 sba: %2")
00601 .arg(internal_backbuf).arg(sba));
00602 if (internal_backbuf >= sba)
00603 {
00604 rbrpos = (rbrpos>=sba) ? rbrpos - sba :
00605 bufferSize + rbrpos - sba;
00606 used_opt = true;
00607 LOG(VB_FILE, LOG_INFO, LOC +
00608 QString("Seek(): OPT1 rbrpos: %1 rbwpos: %2"
00609 "\n\t\t\treadpos: %3 internalreadpos: %4")
00610 .arg(rbrpos).arg(rbwpos)
00611 .arg(new_pos).arg(internalreadpos));
00612 }
00613 }
00614 else if ((new_pos >= readpos) && (new_pos <= internalreadpos))
00615 {
00616 rbrpos = (rbrpos + (new_pos - readpos)) % bufferSize;
00617 used_opt = true;
00618 LOG(VB_FILE, LOG_INFO, LOC +
00619 QString("Seek(): OPT2 rbrpos: %1 sba: %2")
00620 .arg(rbrpos).arg(readpos - new_pos));
00621 }
00622 rbwlock.unlock();
00623 rbrlock.unlock();
00624
00625 if (used_opt)
00626 {
00627 if (ignorereadpos >= 0)
00628 {
00629
00630 int ret;
00631 if (remotefile)
00632 ret = remotefile->Seek(internalreadpos, SEEK_SET);
00633 else
00634 {
00635 ret = lseek64(fd2, internalreadpos, SEEK_SET);
00636 posix_fadvise(fd2, internalreadpos,
00637 128*1024, POSIX_FADV_WILLNEED);
00638 }
00639 LOG(VB_FILE, LOG_INFO, LOC +
00640 QString("Seek to %1 from ignore pos %2 returned %3")
00641 .arg(internalreadpos).arg(ignorereadpos).arg(ret));
00642 ignorereadpos = -1;
00643 }
00644
00645
00646 if (new_pos > readpos)
00647 {
00648 ateof = false;
00649 readsallowed = false;
00650 }
00651 readpos = new_pos;
00652 poslock.unlock();
00653 generalWait.wakeAll();
00654 if (!has_lock)
00655 rwlock.unlock();
00656 return new_pos;
00657 }
00658 }
00659 #endif
00660
00661 #if 1
00662
00663
00664
00665
00666
00667
00668
00669
00670
00671
00672
00673
00674
00675
00676 if ((remotefile || fd2 >= 0) && (ignorereadpos < 0))
00677 {
00678 long long off_end = 0xDEADBEEF;
00679 if (SEEK_END == whence)
00680 {
00681 off_end = pos;
00682 if (remotefile)
00683 {
00684 new_pos = remotefile->GetFileSize() - off_end;
00685 }
00686 else
00687 {
00688 QFileInfo fi(filename);
00689 new_pos = fi.size() - off_end;
00690 }
00691 }
00692 else
00693 {
00694 if (remotefile)
00695 {
00696 off_end = remotefile->GetFileSize() - new_pos;
00697 }
00698 else
00699 {
00700 QFileInfo fi(filename);
00701 off_end = fi.size() - new_pos;
00702 }
00703 }
00704
00705 if (off_end != 0xDEADBEEF)
00706 {
00707 LOG(VB_FILE, LOG_INFO, LOC +
00708 QString("Seek(): Offset from end: %1").arg(off_end));
00709 }
00710
00711 if (off_end == 250000)
00712 {
00713 LOG(VB_FILE, LOG_INFO, LOC +
00714 QString("Seek(): offset from end: %1").arg(off_end) +
00715 "\n\t\t\t -- ignoring read ahead thread until next seek.");
00716
00717 ignorereadpos = new_pos;
00718 errno = EINVAL;
00719 long long ret;
00720 if (remotefile)
00721 ret = remotefile->Seek(ignorereadpos, SEEK_SET);
00722 else
00723 ret = lseek64(fd2, ignorereadpos, SEEK_SET);
00724
00725 if (ret < 0)
00726 {
00727 int tmp_eno = errno;
00728 QString cmd = QString("Seek(%1, SEEK_SET) ign ")
00729 .arg(ignorereadpos);
00730
00731 ignorereadpos = -1;
00732
00733 LOG(VB_GENERAL, LOG_ERR, LOC + cmd + " Failed" + ENO);
00734
00735
00736 if (remotefile)
00737 ret = remotefile->Seek(internalreadpos, SEEK_SET);
00738 else
00739 ret = lseek64(fd2, internalreadpos, SEEK_SET);
00740 if (ret < 0)
00741 {
00742 QString cmd = QString("Seek(%1, SEEK_SET) int ")
00743 .arg(internalreadpos);
00744 LOG(VB_GENERAL, LOG_ERR, LOC + cmd + " Failed" + ENO);
00745 }
00746 else
00747 {
00748 QString cmd = QString("Seek(%1, %2) int ")
00749 .arg(internalreadpos)
00750 .arg((SEEK_SET == whence) ? "SEEK_SET" :
00751 ((SEEK_CUR == whence) ?"SEEK_CUR" : "SEEK_END"));
00752 LOG(VB_GENERAL, LOG_ERR, LOC + cmd + " succeeded");
00753 }
00754 ret = -1;
00755 errno = tmp_eno;
00756 }
00757 else
00758 {
00759 ateof = false;
00760 readsallowed = false;
00761 }
00762
00763 poslock.unlock();
00764
00765 generalWait.wakeAll();
00766
00767 if (!has_lock)
00768 rwlock.unlock();
00769
00770 return ret;
00771 }
00772 }
00773 #endif
00774
00775
00776
00777
00778 if (remotefile)
00779 {
00780 ret = remotefile->Seek(pos, whence, readpos);
00781 if (ret<0)
00782 errno = EINVAL;
00783 }
00784 else
00785 {
00786 ret = lseek64(fd2, pos, whence);
00787 }
00788
00789 if (ret >= 0)
00790 {
00791 readpos = ret;
00792
00793 ignorereadpos = -1;
00794
00795 if (readaheadrunning)
00796 ResetReadAhead(readpos);
00797
00798 readAdjust = 0;
00799 }
00800 else
00801 {
00802 QString cmd = QString("Seek(%1, %2)").arg(pos)
00803 .arg((whence == SEEK_SET) ? "SEEK_SET" :
00804 ((whence == SEEK_CUR) ? "SEEK_CUR" : "SEEK_END"));
00805 LOG(VB_GENERAL, LOG_ERR, LOC + cmd + " Failed" + ENO);
00806 }
00807
00808 poslock.unlock();
00809
00810 generalWait.wakeAll();
00811
00812 if (!has_lock)
00813 rwlock.unlock();
00814
00815 return ret;
00816 }