00001 #include <cstdio>
00002 #include <cstdlib>
00003 #include <unistd.h>
00004 #include <fcntl.h>
00005 #include <cassert>
00006 #include <cerrno>
00007 #include <sys/time.h>
00008 #include <sys/types.h>
00009 #include <sys/stat.h>
00010 #include <ctime>
00011 #include <cmath>
00012
00013 #include "fifowriter.h"
00014 #include "compat.h"
00015 #include "mythlogging.h"
00016
00017 #include "mythconfig.h"
00018 #if CONFIG_DARWIN
00019 #include <sys/aio.h>
00020 #endif
00021
00022 #include <iostream>
00023 using namespace std;
00024
00025 FIFOWriter::FIFOWriter(int count, bool sync) :
00026 fifo_buf(NULL),
00027 fb_inptr(NULL),
00028 fb_outptr(NULL),
00029 fifothrds(NULL),
00030 fifo_lock(NULL),
00031 full_cond(NULL),
00032 empty_cond(NULL),
00033 filename(NULL),
00034 fbdesc(NULL),
00035 maxblksize(NULL),
00036 killwr(NULL),
00037 fbcount(NULL),
00038 num_fifos(count),
00039 usesync(sync)
00040 {
00041 if (count <= 0)
00042 return;
00043
00044 fifo_buf = new struct fifo_buf *[count];
00045 fb_inptr = new struct fifo_buf *[count];
00046 fb_outptr = new struct fifo_buf *[count];
00047 fifothrds = new FIFOThread[count];
00048 fifo_lock = new QMutex[count];
00049 full_cond = new QWaitCondition[count];
00050 empty_cond = new QWaitCondition[count];
00051 filename = new QString [count];
00052 fbdesc = new QString [count];
00053 maxblksize = new long[count];
00054 killwr = new int[count];
00055 fbcount = new int[count];
00056 }
00057
00058 FIFOWriter::~FIFOWriter()
00059 {
00060 if (num_fifos <= 0)
00061 return;
00062
00063 for (int i = 0; i < num_fifos; i++)
00064 {
00065 QMutexLocker flock(&fifo_lock[i]);
00066 killwr[i] = 1;
00067 empty_cond[i].wakeAll();
00068 }
00069
00070 for (int i = 0; i < num_fifos; i++)
00071 {
00072 fifothrds[i].wait();
00073 }
00074
00075 num_fifos = 0;
00076
00077 delete [] maxblksize;
00078 delete [] fifo_buf;
00079 delete [] fb_inptr;
00080 delete [] fb_outptr;
00081 delete [] fifothrds;
00082 delete [] full_cond;
00083 delete [] empty_cond;
00084 delete [] fifo_lock;
00085 delete [] filename;
00086 delete [] fbdesc;
00087 delete [] killwr;
00088 delete [] fbcount;
00089 }
00090
00091 int FIFOWriter::FIFOInit(int id, QString desc, QString name, long size,
00092 int num_bufs)
00093 {
00094 if (id < 0 || id >= num_fifos)
00095 return false;
00096
00097 QByteArray fname = name.toAscii();
00098 const char *aname = fname.constData();
00099 if (mkfifo(aname, S_IREAD | S_IWRITE | S_IRGRP | S_IROTH) == -1)
00100 {
00101 LOG(VB_GENERAL, LOG_ERR, QString("Couldn't create fifo for file: '%1'")
00102 .arg(name) + ENO);
00103 return false;
00104 }
00105 LOG(VB_GENERAL, LOG_INFO, QString("Created %1 fifo: %2")
00106 .arg(desc).arg(name));
00107 maxblksize[id] = size;
00108 filename[id] = name;
00109 fbdesc[id] = desc;
00110 killwr[id] = 0;
00111 fbcount[id] = (usesync) ? 2 : num_bufs;
00112 fifo_buf[id] = new struct fifo_buf;
00113 struct fifo_buf *fifoptr = fifo_buf[id];
00114 for (int i = 0; i < fbcount[id]; i++)
00115 {
00116 fifoptr->data = new unsigned char[maxblksize[id]];
00117 if (i == fbcount[id] - 1)
00118 fifoptr->next = fifo_buf[id];
00119 else
00120 fifoptr->next = new struct fifo_buf;
00121 fifoptr = fifoptr->next;
00122 }
00123 fb_inptr[id] = fifo_buf[id];
00124 fb_outptr[id] = fifo_buf[id];
00125
00126 fifothrds[id].SetParent(this);
00127 fifothrds[id].SetId(id);
00128 fifothrds[id].start();
00129
00130 while (0 == killwr[id] && !fifothrds[id].isRunning())
00131 usleep(1000);
00132
00133 return fifothrds[id].isRunning();
00134 }
00135
00136 void FIFOThread::run(void)
00137 {
00138 RunProlog();
00139 if (m_parent && m_id != -1)
00140 m_parent->FIFOWriteThread(m_id);
00141 RunEpilog();
00142 }
00143
00144 void FIFOWriter::FIFOWriteThread(int id)
00145 {
00146 int fd = -1;
00147
00148 QMutexLocker flock(&fifo_lock[id]);
00149 while (1)
00150 {
00151 if ((fb_inptr[id] == fb_outptr[id]) && (0 == killwr[id]))
00152 empty_cond[id].wait(flock.mutex());
00153 flock.unlock();
00154 if (killwr[id])
00155 break;
00156 if (fd < 0)
00157 {
00158 QByteArray fname = filename[id].toAscii();
00159 fd = open(fname.constData(), O_WRONLY| O_SYNC);
00160 }
00161 if (fd >= 0)
00162 {
00163 int written = 0;
00164 while (written < fb_outptr[id]->blksize)
00165 {
00166 int ret = write(fd, fb_outptr[id]->data+written,
00167 fb_outptr[id]->blksize-written);
00168 if (ret < 0)
00169 {
00170 LOG(VB_GENERAL, LOG_ERR,
00171 QString("FIFOW: write failed with %1")
00172 .arg(strerror(errno)));
00174 break;
00175 }
00176 else
00177 {
00178 written += ret;
00179 }
00180 }
00181 }
00182 flock.relock();
00183 fb_outptr[id] = fb_outptr[id]->next;
00184 full_cond[id].wakeAll();
00185 }
00186
00187 if (fd != -1)
00188 close(fd);
00189
00190 unlink(filename[id].toLocal8Bit().constData());
00191
00192 while (fifo_buf[id]->next != fifo_buf[id])
00193 {
00194 struct fifo_buf *tmpfifo = fifo_buf[id]->next->next;
00195 delete [] fifo_buf[id]->next->data;
00196 delete fifo_buf[id]->next;
00197 fifo_buf[id]->next = tmpfifo;
00198 }
00199 delete [] fifo_buf[id]->data;
00200 delete fifo_buf[id];
00201 }
00202
00203 void FIFOWriter::FIFOWrite(int id, void *buffer, long blksize)
00204 {
00205 QMutexLocker flock(&fifo_lock[id]);
00206 while (fb_inptr[id]->next == fb_outptr[id])
00207 {
00208 bool blocking = false;
00209 if (!usesync)
00210 {
00211 for(int i = 0; i < num_fifos; i++)
00212 {
00213 if (i == id)
00214 continue;
00215 if (fb_inptr[i] == fb_outptr[i])
00216 blocking = true;
00217 }
00218 }
00219
00220 if (blocking)
00221 {
00222 struct fifo_buf *tmpfifo;
00223 tmpfifo = fb_inptr[id]->next;
00224 fb_inptr[id]->next = new struct fifo_buf;
00225 fb_inptr[id]->next->data = new unsigned char[maxblksize[id]];
00226 fb_inptr[id]->next->next = tmpfifo;
00227 QString msg = QString("allocating additonal buffer for : %1(%2)")
00228 .arg(fbdesc[id]).arg(++fbcount[id]);
00229 LOG(VB_FILE, LOG_INFO, msg);
00230 }
00231 else
00232 {
00233 full_cond[id].wait(flock.mutex(), 1000);
00234 }
00235 }
00236 if (blksize > maxblksize[id])
00237 {
00238 delete [] fb_inptr[id]->data;
00239 fb_inptr[id]->data = new unsigned char[blksize];
00240 }
00241 memcpy(fb_inptr[id]->data,buffer,blksize);
00242 fb_inptr[id]->blksize = blksize;
00243 fb_inptr[id] = fb_inptr[id]->next;
00244 empty_cond[id].wakeAll();
00245 }
00246
00247 void FIFOWriter::FIFODrain(void)
00248 {
00249 int count = 0;
00250 while (count < num_fifos)
00251 {
00252 count = 0;
00253 for (int i = 0; i < num_fifos; i++)
00254 {
00255 QMutexLocker flock(&fifo_lock[i]);
00256 if (fb_inptr[i] == fb_outptr[i])
00257 {
00258 killwr[i] = 1;
00259 empty_cond[i].wakeAll();
00260 count++;
00261 }
00262 }
00263 usleep(1000);
00264 }
00265 }