00001
00002 #include <cstdio>
00003 #include <cstdlib>
00004 #include <cerrno>
00005
00006
00007 #include <sys/types.h>
00008 #include <sys/stat.h>
00009 #include <unistd.h>
00010 #include <signal.h>
00011 #include <fcntl.h>
00012 #include <string.h>
00013
00014
00015 #include <QString>
00016
00017
00018 #include "ThreadedFileWriter.h"
00019 #include "mythlogging.h"
00020
00021 #include "mythtimer.h"
00022 #include "compat.h"
00023
00024 #define LOC QString("TFW(%1:%2): ").arg(filename).arg(fd)
00025
00027 void TFWWriteThread::run(void)
00028 {
00029 RunProlog();
00030 m_parent->DiskLoop();
00031 RunEpilog();
00032 }
00033
00035 void TFWSyncThread::run(void)
00036 {
00037 RunProlog();
00038 m_parent->SyncLoop();
00039 RunEpilog();
00040 }
00041
00042 const uint ThreadedFileWriter::kMaxBufferSize = 128 * 1024 * 1024;
00043 const uint ThreadedFileWriter::kMinWriteSize = 64 * 1024;
00044
00059 ThreadedFileWriter::ThreadedFileWriter(const QString &fname,
00060 int pflags, mode_t pmode) :
00061
00062 filename(fname), flags(pflags),
00063 mode(pmode), fd(-1),
00064
00065 flush(false), in_dtor(false),
00066 ignore_writes(false), tfw_min_write_size(kMinWriteSize),
00067 totalBufferUse(0),
00068
00069 writeThread(NULL), syncThread(NULL)
00070 {
00071 filename.detach();
00072 }
00073
00080 bool ThreadedFileWriter::ReOpen(QString newFilename)
00081 {
00082 Flush();
00083
00084 buflock.lock();
00085
00086 if (fd >= 0)
00087 {
00088 close(fd);
00089 fd = -1;
00090 }
00091
00092 if (!newFilename.isEmpty())
00093 filename = newFilename;
00094
00095 buflock.unlock();
00096
00097 return Open();
00098 }
00099
00104 bool ThreadedFileWriter::Open(void)
00105 {
00106 ignore_writes = false;
00107
00108 if (filename == "-")
00109 fd = fileno(stdout);
00110 else
00111 {
00112 QByteArray fname = filename.toLocal8Bit();
00113 fd = open(fname.constData(), flags, mode);
00114 }
00115
00116 if (fd < 0)
00117 {
00118 LOG(VB_GENERAL, LOG_ERR, LOC +
00119 QString("Opening file '%1'.").arg(filename) + ENO);
00120 return false;
00121 }
00122 else
00123 {
00124 LOG(VB_FILE, LOG_INFO, LOC + "Open() successful");
00125
00126 #ifdef USING_MINGW
00127 _setmode(fd, _O_BINARY);
00128 #endif
00129 if (!writeThread)
00130 {
00131 writeThread = new TFWWriteThread(this);
00132 writeThread->start();
00133 }
00134
00135 if (!syncThread)
00136 {
00137 syncThread = new TFWSyncThread(this);
00138 syncThread->start();
00139 }
00140
00141 return true;
00142 }
00143 }
00144
00148 ThreadedFileWriter::~ThreadedFileWriter()
00149 {
00150 Flush();
00151
00152 {
00153 QMutexLocker locker(&buflock);
00154 in_dtor = true;
00155 bufferSyncWait.wakeAll();
00156 bufferHasData.wakeAll();
00157 }
00158
00159 if (writeThread)
00160 {
00161 writeThread->wait();
00162 delete writeThread;
00163 writeThread = NULL;
00164 }
00165
00166 while (!writeBuffers.empty())
00167 {
00168 delete writeBuffers.front();
00169 writeBuffers.pop_front();
00170 }
00171
00172 while (!emptyBuffers.empty())
00173 {
00174 delete emptyBuffers.front();
00175 emptyBuffers.pop_front();
00176 }
00177
00178 if (syncThread)
00179 {
00180 syncThread->wait();
00181 delete syncThread;
00182 syncThread = NULL;
00183 }
00184
00185 if (fd >= 0)
00186 {
00187 close(fd);
00188 fd = -1;
00189 }
00190 }
00191
00198 uint ThreadedFileWriter::Write(const void *data, uint count)
00199 {
00200 if (count == 0)
00201 return 0;
00202
00203 QMutexLocker locker(&buflock);
00204
00205 if (ignore_writes)
00206 return count;
00207
00208 if (totalBufferUse + count > kMaxBufferSize)
00209 {
00210 LOG(VB_GENERAL, LOG_ERR, LOC +
00211 "Maximum buffer size exceeded."
00212 "\n\t\t\tfile will be truncated, no further writing "
00213 "will be done."
00214 "\n\t\t\tThis generally indicates your disk performance "
00215 "\n\t\t\tis insufficient to deal with the number of on-going "
00216 "\n\t\t\trecordings, or you have a disk failure.");
00217 ignore_writes = true;
00218 return count;
00219 }
00220
00221 TFWBuffer *buf = NULL;
00222
00223 if (!writeBuffers.empty() &&
00224 (writeBuffers.back()->data.size() + count) < kMinWriteSize)
00225 {
00226 buf = writeBuffers.back();
00227 writeBuffers.pop_back();
00228 }
00229 else
00230 {
00231 if (!emptyBuffers.empty())
00232 {
00233 buf = emptyBuffers.front();
00234 emptyBuffers.pop_front();
00235 buf->data.clear();
00236 }
00237 else
00238 {
00239 buf = new TFWBuffer();
00240 }
00241 }
00242
00243 totalBufferUse += count;
00244 const char *cdata = (const char*) data;
00245 buf->data.insert(buf->data.end(), cdata, cdata+count);
00246 buf->lastUsed = QDateTime::currentDateTime();
00247
00248 writeBuffers.push_back(buf);
00249
00250 bufferHasData.wakeAll();
00251
00252 LOG(VB_FILE, LOG_DEBUG, LOC + QString("Write(*, %1) total %2 cnt %3")
00253 .arg(count,4).arg(totalBufferUse).arg(writeBuffers.size()));
00254
00255 return count;
00256 }
00257
00269 long long ThreadedFileWriter::Seek(long long pos, int whence)
00270 {
00271 QMutexLocker locker(&buflock);
00272 flush = true;
00273 while (!writeBuffers.empty())
00274 {
00275 bufferHasData.wakeAll();
00276 if (!bufferEmpty.wait(locker.mutex(), 2000))
00277 {
00278 LOG(VB_GENERAL, LOG_WARNING, LOC +
00279 QString("Taking a long time to flush.. buffer size %1")
00280 .arg(totalBufferUse));
00281 }
00282 }
00283 flush = false;
00284 return lseek(fd, pos, whence);
00285 }
00286
00290 void ThreadedFileWriter::Flush(void)
00291 {
00292 QMutexLocker locker(&buflock);
00293 flush = true;
00294 while (!writeBuffers.empty())
00295 {
00296 bufferHasData.wakeAll();
00297 if (!bufferEmpty.wait(locker.mutex(), 2000))
00298 {
00299 LOG(VB_GENERAL, LOG_WARNING, LOC +
00300 QString("Taking a long time to flush.. buffer size %1")
00301 .arg(totalBufferUse));
00302 }
00303 }
00304 flush = false;
00305 }
00306
00327 void ThreadedFileWriter::Sync(void)
00328 {
00329 if (fd >= 0)
00330 {
00331 #if defined(_POSIX_SYNCHRONIZED_IO) && _POSIX_SYNCHRONIZED_IO > 0
00332
00333
00334
00335 fdatasync(fd);
00336 #else
00337 fsync(fd);
00338 #endif
00339 }
00340 }
00341
00346 void ThreadedFileWriter::SetWriteBufferMinWriteSize(uint newMinSize)
00347 {
00348 QMutexLocker locker(&buflock);
00349 if (newMinSize > 0)
00350 tfw_min_write_size = newMinSize;
00351 bufferHasData.wakeAll();
00352 }
00353
00357 void ThreadedFileWriter::SyncLoop(void)
00358 {
00359 QMutexLocker locker(&buflock);
00360 while (!in_dtor)
00361 {
00362 locker.unlock();
00363
00364 Sync();
00365
00366 locker.relock();
00367 bufferSyncWait.wait(&buflock, 1000);
00368 }
00369 }
00370
00374 void ThreadedFileWriter::DiskLoop(void)
00375 {
00376 #ifndef USING_MINGW
00377
00378 signal(SIGXFSZ, SIG_IGN);
00379 #endif
00380
00381 QMutexLocker locker(&buflock);
00382
00383
00384
00385
00386 MythTimer minWriteTimer;
00387 minWriteTimer.start();
00388
00389 while (!in_dtor)
00390 {
00391 if (ignore_writes)
00392 {
00393 while (!writeBuffers.empty())
00394 {
00395 delete writeBuffers.front();
00396 writeBuffers.pop_front();
00397 }
00398 while (!emptyBuffers.empty())
00399 {
00400 delete emptyBuffers.front();
00401 emptyBuffers.pop_front();
00402 }
00403 bufferEmpty.wakeAll();
00404 bufferHasData.wait(locker.mutex());
00405 continue;
00406 }
00407
00408 if (writeBuffers.empty())
00409 {
00410 bufferEmpty.wakeAll();
00411 bufferHasData.wait(locker.mutex(), 1000);
00412 TrimEmptyBuffers();
00413 continue;
00414 }
00415
00416 int mwte = minWriteTimer.elapsed();
00417 if (!flush && (mwte < 250) && (totalBufferUse < kMinWriteSize))
00418 {
00419 bufferHasData.wait(locker.mutex(), 250 - mwte);
00420 TrimEmptyBuffers();
00421 continue;
00422 }
00423
00424 if (fd == -1)
00425 {
00426 bufferHasData.wait(locker.mutex(), 200);
00427 TrimEmptyBuffers();
00428 continue;
00429 }
00430
00431 TFWBuffer *buf = writeBuffers.front();
00432 writeBuffers.pop_front();
00433 totalBufferUse -= buf->data.size();
00434 minWriteTimer.start();
00435
00437
00438 const void *data = &(buf->data[0]);
00439 uint sz = buf->data.size();
00440
00441 bool write_ok = true;
00442 uint tot = 0;
00443 uint errcnt = 0;
00444
00445 LOG(VB_FILE, LOG_DEBUG, LOC + QString("write(%1) cnt %2 total %3")
00446 .arg(sz).arg(writeBuffers.size())
00447 .arg(totalBufferUse));
00448
00449 MythTimer writeTimer;
00450 writeTimer.start();
00451
00452 while ((tot < sz) && !in_dtor)
00453 {
00454 locker.unlock();
00455
00456 int ret = write(fd, (char *)data + tot, sz - tot);
00457
00458 if (ret < 0)
00459 {
00460 if (errno == EAGAIN)
00461 {
00462 LOG(VB_GENERAL, LOG_WARNING, LOC + "Got EAGAIN.");
00463 }
00464 else
00465 {
00466 errcnt++;
00467 LOG(VB_GENERAL, LOG_ERR, LOC + "File I/O " +
00468 QString(" errcnt: %1").arg(errcnt) + ENO);
00469 }
00470
00471 if ((errcnt >= 3) || (ENOSPC == errno) || (EFBIG == errno))
00472 {
00473 locker.relock();
00474 write_ok = false;
00475 break;
00476 }
00477 }
00478 else
00479 {
00480 tot += ret;
00481 }
00482
00483 locker.relock();
00484
00485 if (!in_dtor)
00486 bufferHasData.wait(locker.mutex(), 50);
00487 }
00488
00490
00491 buf->lastUsed = QDateTime::currentDateTime();
00492 emptyBuffers.push_back(buf);
00493
00494 if (writeTimer.elapsed() > 1000)
00495 {
00496 LOG(VB_GENERAL, LOG_WARNING, LOC +
00497 QString("write(%1) cnt %2 total %3 -- took a long time, %4 ms")
00498 .arg(sz).arg(writeBuffers.size())
00499 .arg(totalBufferUse).arg(writeTimer.elapsed()));
00500 }
00501
00502 if (!write_ok && ((EFBIG == errno) || (ENOSPC == errno)))
00503 {
00504 QString msg;
00505 switch (errno)
00506 {
00507 case EFBIG:
00508 msg =
00509 "Maximum file size exceeded by '%1'"
00510 "\n\t\t\t"
00511 "You must either change the process ulimits, configure"
00512 "\n\t\t\t"
00513 "your operating system with \"Large File\" support, "
00514 "or use"
00515 "\n\t\t\t"
00516 "a filesystem which supports 64-bit or 128-bit files."
00517 "\n\t\t\t"
00518 "HINT: FAT32 is a 32-bit filesystem.";
00519 break;
00520 case ENOSPC:
00521 msg =
00522 "No space left on the device for file '%1'"
00523 "\n\t\t\t"
00524 "file will be truncated, no further writing "
00525 "will be done.";
00526 break;
00527 }
00528
00529 LOG(VB_GENERAL, LOG_ERR, LOC + msg.arg(filename));
00530 ignore_writes = true;
00531 }
00532 }
00533 }
00534
00535 void ThreadedFileWriter::TrimEmptyBuffers(void)
00536 {
00537 QDateTime cur = QDateTime::currentDateTime();
00538 QDateTime cur_m_60 = cur.addSecs(-60);
00539
00540 QList<TFWBuffer*>::iterator it = emptyBuffers.begin();
00541 while (it != emptyBuffers.end())
00542 {
00543 if (((*it)->lastUsed < cur_m_60) ||
00544 ((*it)->data.capacity() > 3 * (*it)->data.size() &&
00545 (*it)->data.capacity() > 64 * 1024))
00546 {
00547 delete *it;
00548 it = emptyBuffers.erase(it);
00549 continue;
00550 }
00551 ++it;
00552 }
00553 }