00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00012
00013 #include <algorithm>
00014
00015 #include "mythtimer.h"
00016 #include "bufferedsocketdevice.h"
00017 #include "upnputil.h"
00018 #include "mythlogging.h"
00019
00021
00023
00024 BufferedSocketDevice::BufferedSocketDevice( int nSocket )
00025 {
00026 m_pSocket = new MSocketDevice();
00027
00028 m_pSocket->setSocket ( nSocket, MSocketDevice::Stream );
00029 m_pSocket->setBlocking ( false );
00030 m_pSocket->setAddressReusable( true );
00031
00032 struct linger ling = {1, 1};
00033
00034 if ( setsockopt(socket(), SOL_SOCKET, SO_LINGER, &ling, sizeof(ling)) < 0)
00035 LOG(VB_GENERAL, LOG_ERR,
00036 "BufferedSocketDevice: setsockopt - SO_LINGER: " + ENO);
00037
00038 m_nDestPort = 0;
00039
00040 m_nMaxReadBufferSize = 0;
00041 m_nWriteSize = 0;
00042 m_nWriteIndex = 0;
00043 m_bHandleSocketDelete= true;
00044 }
00045
00047
00049
00050 BufferedSocketDevice::BufferedSocketDevice( MSocketDevice *pSocket ,
00051 bool bTakeOwnership )
00052 {
00053 m_pSocket = pSocket;
00054
00055 m_nDestPort = 0;
00056
00057 m_nMaxReadBufferSize = 0;
00058 m_nWriteSize = 0;
00059 m_nWriteIndex = 0;
00060 m_bHandleSocketDelete= bTakeOwnership;
00061
00062 }
00063
00065
00067
00068 BufferedSocketDevice::~BufferedSocketDevice()
00069 {
00070 Close();
00071 }
00072
00074
00076
00077 void BufferedSocketDevice::Close()
00078 {
00079 Flush();
00080 ReadBytes();
00081
00082 m_bufRead.clear();
00083 ClearPendingData();
00084
00085 if (m_pSocket != NULL)
00086 {
00087 if (m_pSocket->isValid())
00088 m_pSocket->close();
00089
00090 if (m_bHandleSocketDelete)
00091 delete m_pSocket;
00092
00093 m_pSocket = NULL;
00094 }
00095
00096 }
00097
00099
00101
00102 bool BufferedSocketDevice::Connect( const QHostAddress &addr, quint16 port )
00103 {
00104 if (m_pSocket == NULL)
00105 return false;
00106
00107 return m_pSocket->connect( addr, port );
00108 }
00109
00111
00113
00114 MSocketDevice *BufferedSocketDevice::SocketDevice()
00115 {
00116 return( m_pSocket );
00117 }
00118
00120
00122
00123 void BufferedSocketDevice::SetSocketDevice( MSocketDevice *pSocket )
00124 {
00125 if ((m_bHandleSocketDelete) && (m_pSocket != NULL))
00126 delete m_pSocket;
00127
00128 m_bHandleSocketDelete = false;
00129
00130 m_pSocket = pSocket;
00131 }
00132
00134
00136
00137 void BufferedSocketDevice::SetDestAddress(
00138 QHostAddress hostAddress, quint16 nPort)
00139 {
00140 m_DestHostAddress = hostAddress;
00141 m_nDestPort = nPort;
00142 }
00143
00145
00147
00148 void BufferedSocketDevice::SetReadBufferSize( qulonglong bufSize )
00149 {
00150 m_nMaxReadBufferSize = bufSize;
00151 }
00152
00154
00156
00157 qulonglong BufferedSocketDevice::ReadBufferSize(void) const
00158 {
00159 return m_nMaxReadBufferSize;
00160 }
00161
00163
00165
00166 int BufferedSocketDevice::ReadBytes()
00167 {
00168 if (m_pSocket == NULL)
00169 return m_bufRead.size();
00170
00171 qlonglong maxToRead = 0;
00172
00173 if ( m_nMaxReadBufferSize > 0 )
00174 {
00175 maxToRead = m_nMaxReadBufferSize - m_bufRead.size();
00176
00177 if ( maxToRead <= 0 )
00178 return m_bufRead.size();
00179 }
00180
00181 qlonglong nbytes = m_pSocket->bytesAvailable();
00182 qlonglong nread;
00183
00184 QByteArray *a = 0;
00185
00186 if ( nbytes > 0 )
00187 {
00188 a = new QByteArray();
00189 a->resize(nbytes);
00190
00191 nread = m_pSocket->readBlock(
00192 a->data(), maxToRead ? std::min(nbytes, maxToRead) : nbytes);
00193
00194 if (( nread > 0 ) && ( nread != (int)a->size() ))
00195 {
00196
00197 a->resize( nread );
00198 }
00199 }
00200
00201 if (a)
00202 {
00203 #if 0
00204 QString msg;
00205 for( long n = 0; n < a->count(); n++ )
00206 msg += QString("%1").arg(a->at(n));
00207 LOG(VB_GENERAL, LOG_DEBUG, msg);
00208 #endif
00209
00210 m_bufRead.append( a );
00211 }
00212
00213 return m_bufRead.size();
00214 }
00215
00217
00219
00220 bool BufferedSocketDevice::ConsumeWriteBuf( qulonglong nbytes )
00221 {
00222 if ( !nbytes || ((qlonglong)nbytes > m_nWriteSize) )
00223 return false;
00224
00225 m_nWriteSize -= nbytes;
00226
00227 for ( ;; )
00228 {
00229 QByteArray *a = m_bufWrite.front();
00230
00231 if ( m_nWriteIndex + nbytes >= (qulonglong)a->size() )
00232 {
00233 nbytes -= a->size() - m_nWriteIndex;
00234 m_bufWrite.pop_front();
00235 delete a;
00236
00237 m_nWriteIndex = 0;
00238
00239 if ( nbytes == 0 )
00240 break;
00241 }
00242 else
00243 {
00244 m_nWriteIndex += nbytes;
00245 break;
00246 }
00247 }
00248
00249 return true;
00250 }
00251
00253
00255
00256 void BufferedSocketDevice::Flush()
00257 {
00258
00259 if ((m_pSocket == NULL) || !m_pSocket->isValid())
00260 return;
00261
00262 bool osBufferFull = false;
00263 int consumed = 0;
00264
00265 while ( !osBufferFull && ( m_nWriteSize > 0 ) && m_pSocket->isValid())
00266 {
00267 deque<QByteArray*>::iterator it = m_bufWrite.begin();
00268 QByteArray *a = *it;
00269
00270 int nwritten = 0;
00271 int i = 0;
00272
00273 if ( (int)a->size() - m_nWriteIndex < 1460 )
00274 {
00275 QByteArray out;
00276 out.resize(65536);
00277
00278 int j = m_nWriteIndex;
00279 int s = a->size() - j;
00280
00281 while ( a && i+s < (int)out.size() )
00282 {
00283 memcpy( out.data()+i, a->data()+j, s );
00284 j = 0;
00285 i += s;
00286 ++it;
00287 a = *it;
00288 s = a ? a->size() : 0;
00289 }
00290
00291 if (m_nDestPort != 0)
00292 nwritten = m_pSocket->writeBlock( out.data(), i, m_DestHostAddress, m_nDestPort );
00293 else
00294 nwritten = m_pSocket->writeBlock( out.data(), i );
00295 }
00296 else
00297 {
00298
00299 i = a->size() - m_nWriteIndex;
00300
00301 if (m_nDestPort != 0)
00302 nwritten = m_pSocket->writeBlock( a->data() + m_nWriteIndex, i, m_DestHostAddress, m_nDestPort );
00303 else
00304 nwritten = m_pSocket->writeBlock( a->data() + m_nWriteIndex, i );
00305 }
00306
00307 if ( nwritten > 0 )
00308 {
00309 if ( ConsumeWriteBuf( nwritten ) )
00310
00311 consumed += nwritten;
00312 }
00313
00314 if ( nwritten < i )
00315 osBufferFull = true;
00316 }
00317 }
00318
00319
00321
00323
00324 qlonglong BufferedSocketDevice::Size()
00325 {
00326 return (qlonglong)BytesAvailable();
00327 }
00328
00330
00332
00333 qlonglong BufferedSocketDevice::At() const
00334 {
00335 return( 0 );
00336 }
00337
00339
00341
00342 bool BufferedSocketDevice::At( qlonglong index )
00343 {
00344 ReadBytes();
00345
00346 if ( index > m_bufRead.size() )
00347 return false;
00348
00349
00350 m_bufRead.consumeBytes( (qulonglong)index, 0 );
00351
00352 return true;
00353 }
00354
00356
00358
00359 bool BufferedSocketDevice::AtEnd()
00360 {
00361 if ( !m_pSocket->isValid() )
00362 return true;
00363
00364 ReadBytes();
00365
00366 return m_bufRead.size() == 0;
00367
00368 }
00369
00371
00373
00374 qulonglong BufferedSocketDevice::BytesAvailable(void)
00375 {
00376 if ( !m_pSocket->isValid() )
00377 return 0;
00378
00379 return ReadBytes();
00380 }
00381
00383
00385
00386 qulonglong BufferedSocketDevice::WaitForMore(
00387 int msecs, bool *pTimeout )
00388 {
00389 bool bTimeout = false;
00390
00391 if ( !m_pSocket->isValid() )
00392 return 0;
00393
00394 qlonglong nBytes = BytesAvailable();
00395
00396 if (nBytes == 0)
00397 {
00398
00399
00400
00401
00402
00403
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422 msecs = 1000;
00423
00424 nBytes = m_pSocket->waitForMore( msecs, &bTimeout );
00425
00426 if (pTimeout != NULL)
00427 *pTimeout = bTimeout;
00428 }
00429
00430 return nBytes;
00431 }
00432
00434
00436
00437 qulonglong BufferedSocketDevice::BytesToWrite(void) const
00438 {
00439 return m_nWriteSize;
00440
00441 }
00442
00444
00446
00447 void BufferedSocketDevice::ClearPendingData()
00448 {
00449 while (!m_bufWrite.empty())
00450 {
00451 delete m_bufWrite.back();
00452 m_bufWrite.pop_back();
00453 }
00454 m_nWriteIndex = m_nWriteSize = 0;
00455 }
00456
00458
00460
00461 void BufferedSocketDevice::ClearReadBuffer()
00462 {
00463 m_bufRead.clear();
00464 }
00465
00467
00469
00470 qlonglong BufferedSocketDevice::ReadBlock( char *data, qulonglong maxlen )
00471 {
00472 if ( data == 0 && maxlen != 0 )
00473 return -1;
00474
00475 if ( !m_pSocket->isOpen() )
00476 return -1;
00477
00478 ReadBytes();
00479
00480 if ( maxlen >= (qulonglong)m_bufRead.size() )
00481 maxlen = m_bufRead.size();
00482
00483 m_bufRead.consumeBytes( maxlen, data );
00484
00485 return maxlen;
00486
00487 }
00488
00490
00492
00493 qlonglong BufferedSocketDevice::WriteBlock( const char *data, qulonglong len )
00494 {
00495 if ( len == 0 )
00496 return 0;
00497
00498 QByteArray *a = m_bufWrite.back();
00499
00500 bool writeNow = ( (m_nWriteSize + len >= 1400) || (len > 512) );
00501
00502 if ( a && (a->size() + len < 128) )
00503 {
00504
00505 int i = a->size();
00506
00507 a->resize( i+len );
00508 memcpy( a->data()+i, data, len );
00509 }
00510 else
00511 {
00512
00513 m_bufWrite.push_back(new QByteArray(data, len));
00514 }
00515
00516 m_nWriteSize += len;
00517
00518 if ( writeNow )
00519 Flush();
00520
00521 return len;
00522 }
00523
00525
00527
00528 qlonglong BufferedSocketDevice::WriteBlockDirect(
00529 const char *data, qulonglong len)
00530 {
00531 qlonglong nWritten = 0;
00532
00533
00534
00535 Flush();
00536
00537 if (m_nDestPort != 0)
00538 nWritten = m_pSocket->writeBlock( data, len, m_DestHostAddress, m_nDestPort );
00539 else
00540 nWritten = m_pSocket->writeBlock( data, len );
00541
00542 return nWritten;
00543 }
00544
00546
00548
00549 int BufferedSocketDevice::Getch()
00550 {
00551 if ( m_pSocket->isOpen() )
00552 {
00553 ReadBytes();
00554
00555 if (m_bufRead.size() > 0 )
00556 {
00557 uchar c;
00558
00559 m_bufRead.consumeBytes( 1, (char*)&c );
00560
00561 return c;
00562 }
00563 }
00564
00565 return -1;
00566 }
00567
00569
00571
00572 int BufferedSocketDevice::Putch( int ch )
00573 {
00574 char buf[2];
00575
00576 buf[0] = ch;
00577
00578 return WriteBlock(buf, 1) == 1 ? ch : -1;
00579 }
00580
00582
00584
00585 int BufferedSocketDevice::Ungetch(int ch )
00586 {
00587 return m_bufRead.ungetch( ch );
00588 }
00589
00591
00593
00594 bool BufferedSocketDevice::CanReadLine()
00595 {
00596 ReadBytes();
00597
00598 if (( BytesAvailable() > 0 ) && m_bufRead.scanNewline( 0 ) )
00599 return true;
00600
00601 return false;
00602 }
00603
00605
00607
00608 QString BufferedSocketDevice::ReadLine()
00609 {
00610 QByteArray a;
00611 a.resize(256);
00612
00613 ReadBytes();
00614
00615 bool nl = m_bufRead.scanNewline( &a );
00616
00617 QString s;
00618
00619 if ( nl )
00620 {
00621 At( a.size() );
00622
00623 s = QString( a );
00624 }
00625
00626 return s;
00627 }
00628
00630
00632
00633 QString BufferedSocketDevice::ReadLine( int msecs )
00634 {
00635 MythTimer timer;
00636 QString sLine;
00637
00638 if ( CanReadLine() )
00639 return( ReadLine() );
00640
00641
00642
00643
00644
00645
00646 if ( msecs > 0)
00647 {
00648 bool bTimeout = false;
00649
00650 timer.start();
00651
00652 while ( !CanReadLine() && !bTimeout )
00653 {
00654 #if 0
00655 LOG(VB_UPNP, LOG_DEBUG, "Can't Read Line... Waiting for more." );
00656 #endif
00657
00658 WaitForMore( msecs, &bTimeout );
00659
00660 if ( timer.elapsed() >= msecs )
00661 {
00662 bTimeout = true;
00663 LOG(VB_UPNP, LOG_INFO, "Exceeded Total Elapsed Wait Time." );
00664 }
00665 }
00666
00667 if (CanReadLine())
00668 sLine = ReadLine();
00669 }
00670
00671 return( sLine );
00672 }
00673
00675
00677
00678 quint16 BufferedSocketDevice::Port(void) const
00679 {
00680 if (m_pSocket)
00681 return( m_pSocket->port() );
00682
00683 return 0;
00684 }
00685
00687
00689
00690 quint16 BufferedSocketDevice::PeerPort(void) const
00691 {
00692 if (m_pSocket)
00693 return( m_pSocket->peerPort() );
00694
00695 return 0;
00696 }
00697
00699
00701
00702 QHostAddress BufferedSocketDevice::Address() const
00703 {
00704 if (m_pSocket)
00705 return( m_pSocket->address() );
00706
00707 QHostAddress tmp;
00708
00709 return tmp;
00710 }
00711
00713
00715
00716 QHostAddress BufferedSocketDevice::PeerAddress() const
00717 {
00718 if (m_pSocket)
00719 return( m_pSocket->peerAddress() );
00720
00721 QHostAddress tmp;
00722
00723 return tmp;
00724 }
00725