00001
00002
00003
00004 #include "streamhandler.h"
00005
00006 #define LOC QString("SH(%1): ").arg(_device)
00007
00008 StreamHandler::StreamHandler(const QString &device) :
00009 MThread("StreamHandler"),
00010 _device(device),
00011 _needs_buffering(false),
00012 _allow_section_reader(false),
00013
00014 _running_desired(false),
00015 _error(false),
00016 _running(false),
00017 _using_buffering(false),
00018 _using_section_reader(false),
00019
00020 _pid_lock(QMutex::Recursive),
00021 _open_pid_filters(0),
00022
00023 _listener_lock(QMutex::Recursive)
00024 {
00025 }
00026
00027 StreamHandler::~StreamHandler()
00028 {
00029 if (!_stream_data_list.empty())
00030 {
00031 LOG(VB_GENERAL, LOG_ERR, LOC + "dtor & _stream_data_list not empty");
00032 }
00033
00034
00035 if (_running)
00036 Stop();
00037 }
00038
00039 void StreamHandler::AddListener(MPEGStreamData *data,
00040 bool allow_section_reader,
00041 bool needs_buffering,
00042 QString output_file)
00043 {
00044 LOG(VB_RECORD, LOG_INFO, LOC + QString("AddListener(0x%1) -- begin")
00045 .arg((uint64_t)data,0,16));
00046 if (!data)
00047 {
00048 LOG(VB_GENERAL, LOG_ERR, LOC +
00049 QString("AddListener(0x%1) -- null data")
00050 .arg((uint64_t)data,0,16));
00051 return;
00052 }
00053
00054 _listener_lock.lock();
00055
00056 LOG(VB_RECORD, LOG_INFO, LOC + QString("AddListener(0x%1) -- locked")
00057 .arg((uint64_t)data,0,16));
00058
00059 if (_stream_data_list.empty())
00060 {
00061 QMutexLocker locker(&_start_stop_lock);
00062 _allow_section_reader = allow_section_reader;
00063 _needs_buffering = needs_buffering;
00064 }
00065 else
00066 {
00067 QMutexLocker locker(&_start_stop_lock);
00068 _allow_section_reader &= allow_section_reader;
00069 _needs_buffering |= needs_buffering;
00070 }
00071
00072 StreamDataList::iterator it = _stream_data_list.find(data);
00073 if (it != _stream_data_list.end())
00074 {
00075 LOG(VB_GENERAL, LOG_ERR, LOC + "Programmer Error, attempted "
00076 "to add a listener which is already being listened to.");
00077 }
00078 else
00079 {
00080 _stream_data_list[data] = output_file;
00081 }
00082
00083 if (!output_file.isEmpty())
00084 AddNamedOutputFile(output_file);
00085
00086 _listener_lock.unlock();
00087
00088 Start();
00089
00090 LOG(VB_RECORD, LOG_INFO, LOC + QString("AddListener(0x%1) -- end")
00091 .arg((uint64_t)data,0,16));
00092 }
00093
00094 void StreamHandler::RemoveListener(MPEGStreamData *data)
00095 {
00096 LOG(VB_RECORD, LOG_INFO, LOC + QString("RemoveListener(0x%1) -- begin")
00097 .arg((uint64_t)data,0,16));
00098 if (!data)
00099 {
00100 LOG(VB_GENERAL, LOG_ERR, LOC +
00101 QString("RemoveListener(0x%1) -- null data")
00102 .arg((uint64_t)data,0,16));
00103 return;
00104 }
00105
00106 _listener_lock.lock();
00107
00108 LOG(VB_RECORD, LOG_INFO, LOC + QString("RemoveListener(0x%1) -- locked")
00109 .arg((uint64_t)data,0,16));
00110
00111 StreamDataList::iterator it = _stream_data_list.find(data);
00112
00113 if (it != _stream_data_list.end())
00114 {
00115 if (!(*it).isEmpty())
00116 RemoveNamedOutputFile(*it);
00117 _stream_data_list.erase(it);
00118 }
00119
00120 if (_stream_data_list.empty())
00121 {
00122 _listener_lock.unlock();
00123 Stop();
00124 }
00125 else
00126 {
00127 _listener_lock.unlock();
00128 }
00129
00130 LOG(VB_RECORD, LOG_INFO, LOC + QString("RemoveListener(0x%1) -- end")
00131 .arg((uint64_t)data,0,16));
00132 }
00133
00134 void StreamHandler::Start(void)
00135 {
00136 QMutexLocker locker(&_start_stop_lock);
00137
00138 if (_running)
00139 {
00140 if ((_using_section_reader && !_allow_section_reader) ||
00141 (_needs_buffering && !_using_buffering))
00142 {
00143 SetRunningDesired(false);
00144 while (!_running_desired && _running)
00145 _running_state_changed.wait(&_start_stop_lock, 100);
00146 }
00147 }
00148
00149 if (_running)
00150 return;
00151
00152 _eit_pids.clear();
00153
00154 _error = false;
00155 SetRunningDesired(true);
00156 MThread::start();
00157
00158 while (!_running && !_error && _running_desired)
00159 _running_state_changed.wait(&_start_stop_lock, 100);
00160
00161 if (!_running_desired)
00162 {
00163 LOG(VB_GENERAL, LOG_WARNING, LOC +
00164 "Programmer Error: Stop called before Start finished");
00165 }
00166
00167 if (_error)
00168 {
00169 LOG(VB_GENERAL, LOG_WARNING, LOC + "Start failed");
00170 SetRunningDesired(false);
00171 }
00172 }
00173
00174 void StreamHandler::Stop(void)
00175 {
00176 QMutexLocker locker(&_start_stop_lock);
00177
00178 do
00179 {
00180 SetRunningDesired(false);
00181 while (!_running_desired && _running)
00182 _running_state_changed.wait(&_start_stop_lock, 100);
00183 if (_running_desired)
00184 {
00185 LOG(VB_GENERAL, LOG_WARNING, LOC +
00186 "Programmer Error: Start called before Stop finished");
00187 }
00188 } while (_running_desired);
00189
00190 wait();
00191 }
00192
00193 bool StreamHandler::IsRunning(void) const
00194 {
00195 QMutexLocker locker(&_start_stop_lock);
00196 return _running;
00197 }
00198
00199 void StreamHandler::SetRunning(bool is_running,
00200 bool is_using_buffering,
00201 bool is_using_section_reader)
00202 {
00203 QMutexLocker locker(&_start_stop_lock);
00204 _running = is_running;
00205 _using_buffering = is_using_buffering;
00206 _using_section_reader = is_using_section_reader;
00207 _running_state_changed.wakeAll();
00208 }
00209
00210 bool StreamHandler::AddPIDFilter(PIDInfo *info)
00211 {
00212 #ifdef DEBUG_PID_FILTERS
00213 LOG(VB_RECORD, LOG_DEBUG, LOC + QString("AddPIDFilter(0x%1)")
00214 .arg(info->_pid, 0, 16));
00215 #endif // DEBUG_PID_FILTERS
00216
00217 QMutexLocker writing_locker(&_pid_lock);
00218 _pid_info[info->_pid] = info;
00219
00220 CycleFiltersByPriority();
00221
00222 return true;
00223 }
00224
00225 bool StreamHandler::RemovePIDFilter(uint pid)
00226 {
00227 #ifdef DEBUG_PID_FILTERS
00228 LOG(VB_RECORD, LOG_DEBUG, LOC +
00229 QString("RemovePIDFilter(0x%1)").arg(pid, 0, 16));
00230 #endif // DEBUG_PID_FILTERS
00231
00232 QMutexLocker write_locker(&_pid_lock);
00233
00234 PIDInfoMap::iterator it = _pid_info.find(pid);
00235 if (it == _pid_info.end())
00236 return false;
00237
00238 PIDInfo *tmp = *it;
00239 _pid_info.erase(it);
00240
00241 bool ok = true;
00242 if (tmp->IsOpen())
00243 {
00244 ok = tmp->Close(_device);
00245 _open_pid_filters--;
00246
00247 CycleFiltersByPriority();
00248 }
00249
00250 delete tmp;
00251
00252 return ok;
00253 }
00254
00255 bool StreamHandler::RemoveAllPIDFilters(void)
00256 {
00257 QMutexLocker write_locker(&_pid_lock);
00258
00259 #ifdef DEBUG_PID_FILTERS
00260 LOG(VB_RECORD, LOG_DEBUG, LOC + "RemoveAllPIDFilters()");
00261 #endif // DEBUG_PID_FILTERS
00262
00263 vector<int> del_pids;
00264 PIDInfoMap::iterator it = _pid_info.begin();
00265 for (; it != _pid_info.end(); ++it)
00266 del_pids.push_back(it.key());
00267
00268 bool ok = true;
00269 vector<int>::iterator dit = del_pids.begin();
00270 for (; dit != del_pids.end(); ++dit)
00271 ok &= RemovePIDFilter(*dit);
00272
00273 return UpdateFilters() && ok;
00274 }
00275
00276 void StreamHandler::UpdateListeningForEIT(void)
00277 {
00278 vector<uint> add_eit, del_eit;
00279
00280 QMutexLocker read_locker(&_listener_lock);
00281
00282 StreamDataList::const_iterator it = _stream_data_list.begin();
00283 for (; it != _stream_data_list.end(); ++it)
00284 {
00285 MPEGStreamData *sd = it.key();
00286 if (sd->HasEITPIDChanges(_eit_pids) &&
00287 sd->GetEITPIDChanges(_eit_pids, add_eit, del_eit))
00288 {
00289 for (uint i = 0; i < del_eit.size(); i++)
00290 {
00291 uint_vec_t::iterator it;
00292 it = find(_eit_pids.begin(), _eit_pids.end(), del_eit[i]);
00293 if (it != _eit_pids.end())
00294 _eit_pids.erase(it);
00295 sd->RemoveListeningPID(del_eit[i]);
00296 }
00297
00298 for (uint i = 0; i < add_eit.size(); i++)
00299 {
00300 _eit_pids.push_back(add_eit[i]);
00301 sd->AddListeningPID(add_eit[i]);
00302 }
00303 }
00304 }
00305 }
00306
00307 bool StreamHandler::UpdateFiltersFromStreamData(void)
00308 {
00309 UpdateListeningForEIT();
00310
00311 pid_map_t pids;
00312
00313 {
00314 QMutexLocker read_locker(&_listener_lock);
00315 StreamDataList::const_iterator it = _stream_data_list.begin();
00316 for (; it != _stream_data_list.end(); ++it)
00317 it.key()->GetPIDs(pids);
00318 }
00319
00320 QMap<uint, PIDInfo*> add_pids;
00321 vector<uint> del_pids;
00322
00323 {
00324 QMutexLocker read_locker(&_pid_lock);
00325
00326
00327 pid_map_t::const_iterator lit = pids.constBegin();
00328 for (; lit != pids.constEnd(); ++lit)
00329 {
00330 if (*lit && (_pid_info.find(lit.key()) == _pid_info.end()))
00331 {
00332 add_pids[lit.key()] = CreatePIDInfo(
00333 lit.key(), StreamID::PrivSec, 0);
00334 }
00335 }
00336
00337
00338 PIDInfoMap::const_iterator fit = _pid_info.begin();
00339 for (; fit != _pid_info.end(); ++fit)
00340 {
00341 bool in_pids = pids.find(fit.key()) != pids.end();
00342 if (!in_pids)
00343 del_pids.push_back(fit.key());
00344 }
00345 }
00346
00347
00348 bool ok = true;
00349 vector<uint>::iterator dit = del_pids.begin();
00350 for (; dit != del_pids.end(); ++dit)
00351 ok &= RemovePIDFilter(*dit);
00352
00353
00354 QMap<uint, PIDInfo*>::iterator ait = add_pids.begin();
00355 for (; ait != add_pids.end(); ++ait)
00356 ok &= AddPIDFilter(*ait);
00357
00358
00359 if (_cycle_timer.elapsed() > 1000)
00360 CycleFiltersByPriority();
00361
00362 return ok;
00363 }
00364
00365 PIDPriority StreamHandler::GetPIDPriority(uint pid) const
00366 {
00367 QMutexLocker reading_locker(&_listener_lock);
00368
00369 PIDPriority tmp = kPIDPriorityNone;
00370
00371 StreamDataList::const_iterator it = _stream_data_list.begin();
00372 for (; it != _stream_data_list.end(); ++it)
00373 tmp = max(tmp, it.key()->GetPIDPriority(pid));
00374
00375 return tmp;
00376 }