00001
00002 #include <cstdio>
00003 #include <cstdlib>
00004 #include <cerrno>
00005
00006
00007 #include <sys/types.h>
00008 #include <sys/stat.h>
00009 #include <unistd.h>
00010 #include <signal.h>
00011 #include <fcntl.h>
00012
00013
00014 #include <qstring.h>
00015 #include <qdeepcopy.h>
00016
00017
00018 #include "ThreadedFileWriter.h"
00019 #include "mythcontext.h"
00020 #include "compat.h"
00021
00022 #if defined(_POSIX_SYNCHRONIZED_IO) && _POSIX_SYNCHRONIZED_IO > 0
00023 #define HAVE_FDATASYNC
00024 #endif
00025
00026 #define LOC QString("TFW: ")
00027 #define LOC_ERR QString("TFW, Error: ")
00028
00029 const uint ThreadedFileWriter::TFW_DEF_BUF_SIZE = 2*1024*1024;
00030 const uint ThreadedFileWriter::TFW_MAX_WRITE_SIZE = TFW_DEF_BUF_SIZE / 4;
00031 const uint ThreadedFileWriter::TFW_MIN_WRITE_SIZE = TFW_DEF_BUF_SIZE / 32;
00032
00055 static uint safe_write(int fd, const void *data, uint sz, bool &ok)
00056 {
00057 int ret;
00058 uint tot = 0;
00059 uint errcnt = 0;
00060
00061 while (tot < sz)
00062 {
00063 ret = write(fd, (char *)data + tot, sz - tot);
00064 if (ret < 0)
00065 {
00066 if (errno == EAGAIN)
00067 {
00068 VERBOSE(VB_IMPORTANT, LOC + "safe_write(): Got EAGAIN.");
00069 continue;
00070 }
00071
00072 errcnt++;
00073 VERBOSE(VB_IMPORTANT, LOC_ERR + "safe_write(): File I/O " +
00074 QString(" errcnt: %1").arg(errcnt) + ENO);
00075
00076 if (errcnt == 3)
00077 break;
00078 }
00079 else
00080 {
00081 tot += ret;
00082 }
00083
00084 if (tot < sz)
00085 {
00086 VERBOSE(VB_IMPORTANT, LOC + "safe_write(): funky usleep");
00087 usleep(1000);
00088 }
00089 }
00090 ok = (errcnt < 3);
00091 return tot;
00092 }
00093
00097 void *ThreadedFileWriter::boot_writer(void *wotsit)
00098 {
00099 #ifndef USING_MINGW
00100 signal(SIGXFSZ, SIG_IGN);
00101 #endif
00102 ThreadedFileWriter *fw = (ThreadedFileWriter *)wotsit;
00103 fw->DiskLoop();
00104 return NULL;
00105 }
00106
00110 void *ThreadedFileWriter::boot_syncer(void *wotsit)
00111 {
00112 ThreadedFileWriter *fw = (ThreadedFileWriter *)wotsit;
00113 fw->SyncLoop();
00114 return NULL;
00115 }
00116
00120 ThreadedFileWriter::ThreadedFileWriter(const QString &fname,
00121 int pflags, mode_t pmode) :
00122
00123 filename(QDeepCopy<QString>(fname)), flags(pflags),
00124 mode(pmode), fd(-1),
00125
00126 no_writes(false), flush(false),
00127 in_dtor(false), ignore_writes(false),
00128 tfw_min_write_size(0),
00129
00130 rpos(0), wpos(0),
00131 written(0),
00132
00133 buf(NULL), tfw_buf_size(0)
00134 {
00135 }
00136
00141 bool ThreadedFileWriter::Open(void)
00142 {
00143 ignore_writes = false;
00144 fd = open(filename.ascii(), flags, mode);
00145
00146 if (fd < 0)
00147 {
00148 VERBOSE(VB_IMPORTANT, LOC_ERR +
00149 QString("Opening file '%1'.").arg(filename) + ENO);
00150 return false;
00151 }
00152 else
00153 {
00154 buf = new char[TFW_DEF_BUF_SIZE + 1024];
00155 bzero(buf, TFW_DEF_BUF_SIZE + 64);
00156
00157 tfw_buf_size = TFW_DEF_BUF_SIZE;
00158 tfw_min_write_size = TFW_MIN_WRITE_SIZE;
00159 pthread_create(&writer, NULL, boot_writer, this);
00160 pthread_create(&syncer, NULL, boot_syncer, this);
00161 return true;
00162 }
00163 }
00164
00168 ThreadedFileWriter::~ThreadedFileWriter()
00169 {
00170 no_writes = true;
00171
00172 if (fd >= 0)
00173 {
00174 Flush();
00175 in_dtor = true;
00176
00177 bufferSyncWait.wakeAll();
00178 pthread_join(syncer, NULL);
00179
00180 bufferHasData.wakeAll();
00181 pthread_join(writer, NULL);
00182 close(fd);
00183 fd = -1;
00184 }
00185
00186 if (buf)
00187 {
00188 delete [] buf;
00189 buf = NULL;
00190 }
00191 }
00192
00201 uint ThreadedFileWriter::Write(const void *data, uint count)
00202 {
00203 if (count == 0)
00204 return 0;
00205
00206 bool first = true;
00207
00208 while (count > BufFree())
00209 {
00210 if (first)
00211 {
00212 VERBOSE(VB_IMPORTANT, LOC_ERR + "Write() -- IOBOUND begin " +
00213 QString("cnt(%1) free(%2)").arg(count).arg(BufFree()));
00214 first = false;
00215 }
00216
00217 bufferWroteData.wait(100);
00218 }
00219 if (!first)
00220 VERBOSE(VB_IMPORTANT, LOC_ERR + "Write() -- IOBOUND end");
00221
00222 if (no_writes)
00223 return 0;
00224
00225 if ((wpos + count) > tfw_buf_size)
00226 {
00227 int first_chunk_size = tfw_buf_size - wpos;
00228 int second_chunk_size = count - first_chunk_size;
00229 memcpy(buf + wpos, data, first_chunk_size );
00230 memcpy(buf, (char *)data + first_chunk_size, second_chunk_size );
00231 }
00232 else
00233 {
00234 memcpy(buf + wpos, data, count);
00235 }
00236
00237 buflock.lock();
00238 wpos = (wpos + count) % tfw_buf_size;
00239 buflock.unlock();
00240
00241 bufferHasData.wakeAll();
00242
00243 return count;
00244 }
00245
00257 long long ThreadedFileWriter::Seek(long long pos, int whence)
00258 {
00259 Flush();
00260
00261 return lseek(fd, pos, whence);
00262 }
00263
00267 void ThreadedFileWriter::Flush(void)
00268 {
00269 flush = true;
00270 while (BufUsed() > 0)
00271 {
00272 if (!bufferEmpty.wait(2000))
00273 VERBOSE(VB_IMPORTANT, LOC + "Taking a long time to flush..");
00274 }
00275 flush = false;
00276 }
00277
00291 void ThreadedFileWriter::Sync(void)
00292 {
00293 if (fd >= 0)
00294 {
00295 #ifdef HAVE_FDATASYNC
00296 fdatasync(fd);
00297 #else
00298 fsync(fd);
00299 #endif
00300 }
00301 }
00302
00307 void ThreadedFileWriter::SetWriteBufferSize(uint newSize)
00308 {
00309 if (newSize <= 0)
00310 return;
00311
00312 Flush();
00313
00314 buflock.lock();
00315 delete [] buf;
00316 rpos = wpos = 0;
00317 buf = new char[newSize + 1024];
00318 bzero(buf, newSize + 64);
00319 tfw_buf_size = newSize;
00320 buflock.unlock();
00321 }
00322
00327 void ThreadedFileWriter::SetWriteBufferMinWriteSize(uint newMinSize)
00328 {
00329 if (newMinSize <= 0)
00330 return;
00331
00332 tfw_min_write_size = newMinSize;
00333 }
00334
00338 void ThreadedFileWriter::SyncLoop(void)
00339 {
00340 while (!in_dtor)
00341 {
00342 bufferSyncWait.wait(written > tfw_min_write_size ? 1000 : 100);
00343 Sync();
00344 }
00345 }
00346
00350 void ThreadedFileWriter::DiskLoop(void)
00351 {
00352 uint size = 0;
00353 written = 0;
00354
00355 while (!in_dtor || BufUsed() > 0)
00356 {
00357 size = BufUsed();
00358
00359 if (size == 0)
00360 bufferEmpty.wakeAll();
00361
00362 if (!size || (!in_dtor && !flush &&
00363 ((size < tfw_min_write_size) &&
00364 (written >= tfw_min_write_size))))
00365 {
00366 bufferHasData.wait(100);
00367 continue;
00368 }
00369
00370
00371
00372
00373
00374 size = (size > TFW_MAX_WRITE_SIZE) ? TFW_MAX_WRITE_SIZE : size;
00375
00376 bool write_ok;
00377 if (ignore_writes)
00378 ;
00379 else if ((rpos + size) > tfw_buf_size)
00380 {
00381 int first_chunk_size = tfw_buf_size - rpos;
00382 int second_chunk_size = size - first_chunk_size;
00383 size = safe_write(fd, buf+rpos, first_chunk_size, write_ok);
00384 if ((int)size == first_chunk_size && write_ok)
00385 size += safe_write(fd, buf, second_chunk_size, write_ok);
00386 }
00387 else
00388 {
00389 size = safe_write(fd, buf+rpos, size, write_ok);
00390 }
00391
00392
00393 if (!ignore_writes && !write_ok && (EFBIG == errno))
00394 {
00395 QString msg =
00396 "Maximum file size exceeded by '%1'"
00397 "\n\t\t\t"
00398 "You must either change the process ulimits, configure"
00399 "\n\t\t\t"
00400 "your operating system with \"Large File\" support, or use"
00401 "\n\t\t\t"
00402 "a filesystem which supports 64-bit or 128-bit files."
00403 "\n\t\t\t"
00404 "HINT: FAT32 is a 32-bit filesystem.";
00405
00406 VERBOSE(VB_IMPORTANT, msg.arg(filename));
00407 ignore_writes = true;
00408 }
00409
00410 if (written <= tfw_min_write_size)
00411 {
00412 written += size;
00413 }
00414
00415 buflock.lock();
00416 rpos = (rpos + size) % tfw_buf_size;
00417 buflock.unlock();
00418
00419 bufferWroteData.wakeAll();
00420 }
00421 }
00422
00426 uint ThreadedFileWriter::BufUsed(void)
00427 {
00428 QMutexLocker locker(&buflock);
00429 return (wpos >= rpos) ? wpos - rpos : tfw_buf_size - rpos + wpos;
00430 }
00431
00435 uint ThreadedFileWriter::BufFree(void)
00436 {
00437 QMutexLocker locker(&buflock);
00438 return ((wpos >= rpos) ? (rpos + tfw_buf_size) : rpos) - wpos - 1;
00439 }
00440