[mythtv] tiny code improvement.

michael at optusnet.com.au michael at optusnet.com.au
Sun May 11 14:49:42 EDT 2003


I've worked out that one of the reasons I had so much
trouble understanding RingBuffer.cpp was that it was
much larger than it needed to be. :)

There's basically two seperate cases interwoven in the
object. (Correct me if I'm wrong here).

1. A 'normalfile'. This means that when writing the file just
keeps on growing, and when reading, you get an EOF when
you reach the end.

2. A circular file (my made up name for it). A file that has a maximum
file size (RingBuffer::filesize) that when reading and writing, you
need to loop back to the beginning.

The code is significantly complicated by the need to keep
on addressing these two separate cases in many places in
RingBuffer.cpp. 

A better solution might to be say that 1 (normalfile) is just a
special case of 2 (circularfile) where the filesize happens to be
1000TB. (I.e. ridiculously large). This will shrink the code size as
many of the checks for special cases evaporate.

Here a patch that starts what I'm talking about...
See DiskLoop() and ReadAheadThread() for examples.
I've not yet changed ::Read which would similarly
shrink.

Michael.

Index: libs/libmythtv/RingBuffer.cpp
===================================================================
RCS file: /var/lib/mythcvs/mythtv/libs/libmythtv/RingBuffer.cpp,v
retrieving revision 1.57
diff -u -r1.57 RingBuffer.cpp
--- libs/libmythtv/RingBuffer.cpp	3 May 2003 00:19:13 -0000	1.57
+++ libs/libmythtv/RingBuffer.cpp	11 May 2003 03:42:57 -0000
@@ -19,8 +19,14 @@
 #include "remotefile.h"
 #include "remoteencoder.h"
 
+/* ------------------------------------------------------------- */
+/* Private class for handling write-behind */
+/* ------------------------------------------------------------- */
+
 #define TFW_BUF_SIZE (2*1024*1024)
 
+#define NOT_CIRCULAR (100000000000000LL) // A very very big size. 100TB.
+
 class ThreadedFileWriter
 {
 public:
@@ -40,9 +46,12 @@
 private:
     int fd;
     char *buf;
+    int buf_size;
+    long long filesize, filepos;
     unsigned rpos,wpos;
     pthread_mutex_t buflock;
-    int in_dtor;
+    int in_dtor;		// True if we're waiting for the
+				// thread to exit.
     pthread_t writer;
     bool no_writes;
 };
@@ -52,19 +61,25 @@
     int ret;
     unsigned tot=0;
     unsigned errcnt=0;
+
     while(tot < sz) {
 	ret=write(fd, (char *)data+tot, sz-tot);
-	if(ret<0)
+
+	if(ret<0)		// Was there a problem?
 	{
 	    perror("ERROR: file I/O problem in 'safe_write()'\n");
 	    errcnt++;
 	    if(errcnt==3) break;
+	    continue;
 	}
-	else
+	if (ret == 0)		// Should never happen on a plain
+				// file, but just in case...
 	{
-	    tot += ret;
+	    usleep(1000);	// No progress. Pause for a moment.
+	    continue;
 	}
-	if(tot < sz) usleep(1000);
+
+	tot += ret;
     }
     return tot;
 }
@@ -77,7 +92,7 @@
 }
 
 ThreadedFileWriter::ThreadedFileWriter(const char *filename,
-				       int flags, mode_t mode)
+				       int flags, mode_t mode, long long fsize)
 {
     pthread_mutex_t init = PTHREAD_MUTEX_INITIALIZER;
 
@@ -86,6 +101,7 @@
     buf = NULL;
     rpos = wpos = 0;
     in_dtor = 0;
+    filesize = fsize;
 
     fd = open(filename, flags, mode);
 
@@ -93,20 +109,30 @@
     {
 	/* oops! */
 	printf("ERROR opening file '%s' in ThreadedFileWriter.\n", filename);
-        exit(0);
+        exit(0);		// FIXME: This should clear the
+				// thread, not the entire program.
     }
-    else
-    {
-	buf = (char *)malloc(TFW_BUF_SIZE);
 
-	pthread_create(&writer, NULL, boot_writer, this);
-    }
+    buf_size = TFW_BUF_SIZE;
+    buf = (char *)malloc(buf_size);
+
+    pthread_create(&writer, NULL, boot_writer, this);
+}
+
+// Method for writing to non-circular files.
+ThreadedFileWriter::ThreadedFileWriter(const char *filename,
+				       int flags, mode_t mode)
+{
+    ThreadedFileWriter(filename,flags,mode,NOT_CIRCULAR);
 }
 
+
 ThreadedFileWriter::~ThreadedFileWriter()
 {
     no_writes = true;
-    while(BufUsed() > 0)
+
+    while(BufUsed() > 0)	// If there's data, sleep,
+				// re-checking every 5ms.
         usleep(5000);
 
     in_dtor = 1; /* tells child thread to exit */
@@ -125,25 +151,23 @@
 
 unsigned ThreadedFileWriter::Write(const void *data, unsigned count)
 {
-    if (count == 0)
+    if (count == 0 || no_writes)
         return 0;
 
-    int first = 1;
+    int loops = 0;
 
     while(count > BufFree())
     {
-	if(first)
+	if(!loops++)
 	    printf("IOBOUND - blocking in ThreadedFileWriter::Write()\n");
-	first = 0;
+
 	usleep(5000);
     }
    
-    if (no_writes)
-        return 0;
  
-    if((wpos + count) > TFW_BUF_SIZE)
+    if((wpos + count) > buf_size)
     {
-	int first_chunk_size = TFW_BUF_SIZE - wpos;
+	int first_chunk_size = buf_size - wpos;
 	int second_chunk_size = count - first_chunk_size;
 	memcpy(buf + wpos, data, first_chunk_size );
 	memcpy(buf,        (char *)data+first_chunk_size, second_chunk_size );
@@ -153,7 +177,7 @@
 	memcpy(buf + wpos, data, count);
     }
     pthread_mutex_lock(&buflock);
-    wpos = (wpos + count) % TFW_BUF_SIZE;
+    wpos = (wpos + count) % buf_size;
     pthread_mutex_unlock(&buflock);
 
     return count;
@@ -167,6 +191,15 @@
     while(BufUsed() > 0)
 	usleep(5000);
 
+				// FIXME: 
+    if (whence == SEEK_SET)
+	filepos = pos;
+    else if (whence == SEEK_CUR)
+	filepos = (pos + filepos) % filesize;
+    else {			// SEEK_END
+	filepos = filesize + pos;
+    }
+
     return lseek(fd, pos, whence);
 }
 
@@ -188,23 +221,35 @@
 	   buffer is valid, and we try to write all of it at once which
 	   takes a long time. During this time, the other thread fills up
 	   the 10% that was free... */
-	if(size > (TFW_BUF_SIZE/4))
-	    size = TFW_BUF_SIZE/4;
+	if(size > (buf_size/4))
+	    size = buf_size/4;
 
-	if((rpos + size) > TFW_BUF_SIZE)
+
+	/* Shrink size to an inflection points that are in range. */
+	if((rpos + size) > buf_size)
 	{
-	    int first_chunk_size  = TFW_BUF_SIZE - rpos;
-	    int second_chunk_size = size - first_chunk_size;
-	    safe_write(fd, buf+rpos, first_chunk_size);
-	    safe_write(fd, buf,      second_chunk_size);
+	    size = buf_size - rpos;
 	}
-	else
+
+	if ((filepos + size) > filesize) 
 	{
-	    safe_write(fd, buf+rpos, size);
+	    size = filesize - filepos;
 	}
+
+	safe_write(fd, buf+rpos, size);
+
 	pthread_mutex_lock(&buflock);
-	rpos = (rpos + size) % TFW_BUF_SIZE;
+	rpos = (rpos + size) % buf_size;
 	pthread_mutex_unlock(&buflock);
+
+				// Have we reached the end of a
+				// circular file? If so, re-position
+				// at the beginning.
+	if ((filepos + size) >= filesize) {
+	    filepos -= filesize; // Should be zero.
+	    lseek(fd, filepos, SEEK_SET);
+	}
+
     }
 }
 
@@ -216,7 +261,7 @@
     if(wpos >= rpos)
 	ret = wpos - rpos;
     else
-	ret = TFW_BUF_SIZE - rpos + wpos;
+	ret = buf_size - rpos + wpos;
 
     pthread_mutex_unlock(&buflock);
     return ret;
@@ -228,7 +273,7 @@
     pthread_mutex_lock(&buflock);
 
     if(wpos >= rpos)
-	ret = rpos + TFW_BUF_SIZE - wpos - 1;
+	ret = rpos + buf_size - wpos - 1;
     else
 	ret = rpos - wpos - 1;
 
@@ -249,6 +294,8 @@
     recorder_num = 0;   
  
     normalfile = true;
+    filesize = NOT_CIRCULAR;
+
     filename = (QString)lfilename;
     tfw = NULL;
     remotefile = NULL;
@@ -301,10 +348,22 @@
     stopreads = false;
     dumpfw = NULL;
     dumpwritepos = 0;
+    requested = 0;
+
+    fetch_ahead = 300000;	// By default, fetch-ahead up to 300k
+				// of data from remote files.
 
     pthread_rwlock_init(&rwlock, NULL);
 }
 
+//
+// We're creating a new circular buffer file, as used by LiveTV and
+// similar things. There's a long of assumed behaviour here.
+//
+// In particular, it seems that this means we're ALWAYS writable,
+// and that the file size can't get more than 'size'.
+// I don't know what 'smudge' is for.
+
 RingBuffer::RingBuffer(const QString &lfilename, long long size, 
                        long long smudge, RemoteEncoder *enc)
 {
@@ -352,6 +411,10 @@
     stopreads = false;
     dumpfw = NULL;
     dumpwritepos = 0;
+    requested = 0;
+
+    fetch_ahead = 300000;	// By default, fetch-ahead up to 300k
+				// of data from remote files.
 
     pthread_rwlock_init(&rwlock, NULL);
 }
@@ -364,6 +427,7 @@
     if (remotefile)
     {
         delete remotefile;
+	requested = 0;
     }
     if (tfw)
     {
@@ -383,8 +447,10 @@
 
 void RingBuffer::Start(void)
 {
-    if (remotefile)
+    if (remotefile) {
         remotefile->Start();
+	requested = 0;
+    }
     if ((normalfile && writemode) || (!normalfile && !recorder_num))
     {
     }
@@ -411,35 +477,37 @@
 
 void RingBuffer::Reset(void)
 {
+
+    if (normalfile)
+	return;
+
     wantseek = true;
     pthread_rwlock_wrlock(&rwlock);
     wantseek = false;
 
-    if (!normalfile)
+    if (remotefile)
     {
-        if (remotefile)
-        {
-            remotefile->Reset();
-        }
-        else
-        {
-	    delete tfw;
-            close(fd2);
-
-            tfw = new ThreadedFileWriter(filename.ascii(),
-                                         O_WRONLY|O_CREAT|O_TRUNC|O_LARGEFILE, 
-                                         0644);
-            fd2 = open(filename.ascii(), O_RDONLY|O_LARGEFILE);
-        }
+	remotefile->Reset();
+	requested = 0;
+    }
+    else
+    {
+	delete tfw;
+	close(fd2);
 
-        totalwritepos = writepos = 0;
-        totalreadpos = readpos = 0;
+	tfw = new ThreadedFileWriter(filename.ascii(),
+				     O_WRONLY|O_CREAT|O_TRUNC|O_LARGEFILE, 
+				     0644);
+	fd2 = open(filename.ascii(), O_RDONLY|O_LARGEFILE);
+    }
 
-        wrapcount = 0;
+    totalwritepos = writepos = 0;
+    totalreadpos = readpos = 0;
+    
+    wrapcount = 0;
 
-        if (readaheadrunning)
-            ResetReadAhead(0);
-    }
+    if (readaheadrunning)
+	ResetReadAhead(0);
     
     pthread_rwlock_unlock(&rwlock);
 }
@@ -489,44 +557,39 @@
 
     QSocket *sock = rf->getSocket();
 
-    qApp->lock();
-    unsigned int available = sock->bytesAvailable();
-    qApp->unlock();
-
-    while (available < sz) 
+    do 
     {
         int reqsize = 128000;
 
-        if (rf->RequestBlock(reqsize))
-            break;
+	// Maintain a window of pre-fetched data.
+	while (requested < fetch_ahead) {
+	    if (rf->RequestBlock(reqsize))
+                break; // error. get out of here.
+	    requested += reqsize;
+	}
 
-        zerocnt++;
-        if (zerocnt >= 10)
-        {
-            break;
-        }
-        if (stopreads)
+        if (zerocnt++ >= 20)
+	    break;  		// Stop infinite loops?
+
+        if (stopreads)		// If someone has told us to stop
+				// reading, then get out of here.
             break;
 
-        usleep(100);
 
-        qApp->lock();
-        available = sock->bytesAvailable();
-        qApp->unlock();
-    }
+				// Try and read some data from the socket.
+        ret = sock->readBlock(((char *)data) + tot, sz - tot);
+	if (ret < 0)		// error?
+	    break;
 
-    qApp->lock();
-    available = sock->bytesAvailable();
-    qApp->unlock();
+	if (ret < 1) {		// No data ready?
+	    sock->waitForMore(1); // Wait up to 1 msec for some to arrive.
+	    continue;
+	}
 
-    if (available > 0)
-    {
-        qApp->lock();
-        ret = sock->readBlock(((char *)data) + tot, sz - tot);
-        qApp->unlock();
+	tot += ret;
+	requested -= ret;
+    } while (tot < sz);
 
-        tot += ret;
-    }
     return tot;
 }
 
@@ -661,7 +724,7 @@
 
 void RingBuffer::ReadAheadThread(void)
 {
-    long long totfree = 0;
+    long long toread = 0;
     int ret = -1;
     int used = 0;
 
@@ -670,7 +733,15 @@
     readAheadBuffer = new char[READ_AHEAD_SIZE + 256000];
     
     ResetReadAhead(0);
-    totfree = ReadBufFree();
+    toread = ReadBufFree();
+    
+    if (normalfile && writemode) { // Very bad! Just get out of here!
+	delete [] readAheadBuffer;
+	readAheadBuffer = NULL;
+	rbrpos = 0;
+	rbwpos = 0;
+	return;
+    }
 
     readaheadrunning = true;
     while (readaheadrunning)
@@ -686,81 +757,44 @@
         readaheadpaused = false;
 
         pthread_rwlock_rdlock(&rwlock);
-        if (totfree > READ_AHEAD_BLOCK_SIZE)
-        {
-            // limit the read size
-            totfree = READ_AHEAD_BLOCK_SIZE;
-
-            if (rbwpos + totfree > READ_AHEAD_SIZE)
-                totfree = READ_AHEAD_SIZE - rbwpos;
-
-            if (normalfile)
-            {
-                if (!writemode)
-                {
-                    if (remotefile)
-                    {
-                        ret = safe_read(remotefile, readAheadBuffer + rbwpos,
-                                        totfree);
-                        internalreadpos += ret;
-                    }
-                    else
-                    {
-                        ret = safe_read(fd2, readAheadBuffer + rbwpos, totfree);
-                        internalreadpos += ret;
-                    }
-                }
-            }
-            else
-            {
-                if (remotefile)
-                {
-                    ret = safe_read(remotefile, readAheadBuffer + rbwpos, 
-                                    totfree);
-
-                    if (internalreadpos + totfree > filesize)
-                    {
-                        int toread = filesize - readpos;
-                        int left = totfree - toread;
- 
-                        internalreadpos = left;
-                    }
-                    else
-                        internalreadpos += ret;
-                }
-                else if (internalreadpos + totfree > filesize)
-                {
-                    int toread = filesize - internalreadpos;
 
-                    ret = safe_read(fd2, readAheadBuffer + rbwpos, toread);
-
-                    int left = totfree - toread;
-                    lseek(fd2, 0, SEEK_SET);
-
-                    ret = safe_read(fd2, readAheadBuffer + rbwpos + toread, 
-                                    left);
-                    ret += toread;
-
-                    internalreadpos = left;
-                }
-                else
-                {
-                    ret = safe_read(fd2, readAheadBuffer + rbwpos, totfree);
-                    internalreadpos += ret;
-                }
-            }
+				// Don't read past inflection points
+				// in the range.
+        if (toread > READ_AHEAD_BLOCK_SIZE)
+            toread = READ_AHEAD_BLOCK_SIZE;
+
+	if (rbwpos + totfree > READ_AHEAD_SIZE)
+	    totfree = READ_AHEAD_SIZE - rbwpos;
+
+	if (internalreadpos + toread > filesize)
+	    toread = filesize - internalreadpos;
+
+	if (remotefile)
+	    ret = safe_read(remotefile, readAheadBuffer + rbwpos,
+			    toread);
+	else
+	    ret = safe_read(fd2, readAheadBuffer + rbwpos, toread);
 
-            pthread_mutex_lock(&readAheadLock);
-            rbwpos = (rbwpos + ret) % READ_AHEAD_SIZE;
-            pthread_mutex_unlock(&readAheadLock);
+				// Did we reach the end of a circular file?
+	if ((ret +  internalreadpos) >= filesize) { // Never happens
+						    // for normal
+						    // files.
+		internalreadpos = 0;
+		if (!remotefile)
+		    lseek(fd2, 0, SEEK_SET);
+	    }
+	}
 
-            if (ret != totfree && normalfile)
-            {
-                ateof = true;
-            }
-        }
+	pthread_mutex_lock(&readAheadLock);
+	rbwpos = (rbwpos + ret) % READ_AHEAD_SIZE;
+	pthread_mutex_unlock(&readAheadLock);
+	
+	if (ret == 0 && normalfile)
+        {
+	    ateof = true;
+	}
 
-        totfree = ReadBufFree();
+        int totfree = ReadBufFree();
         used = READ_AHEAD_SIZE - totfree;
 
         if (ateof)
@@ -971,6 +1005,11 @@
 {
     int ret = -1;
 
+    if (normalfile && !writemode) {
+	fprintf(stderr, "Attempt to write to a read only file\n");
+	return -1;
+    }
+
     pthread_rwlock_rdlock(&rwlock);
 
     if (!tfw)
@@ -979,50 +1018,14 @@
         return ret;
     }
 
-    if (normalfile)
-    {
-        if (writemode)
-        {
-	    ret = tfw->Write(buf, count);
-	    totalwritepos += ret;
-            writepos += ret;
-        }
-        else
-        {
-            fprintf(stderr, "Attempt to write to a read only file\n");
-        }
-    }
-    else
-    {
-//cout << "write: " << totalwritepos << " " << writepos << " " << count << " " << filesize << endl;
-        if (writepos + count > filesize)
-        {
-            int towrite = filesize - writepos;
-            ret = tfw->Write(buf, towrite);
-
-            int left = count - towrite;
-            tfw->Seek(0, SEEK_SET);
-
-            ret = tfw->Write((char *)buf + towrite, left);
-            writepos = left;
-
-            ret += towrite;
-
-            totalwritepos += ret;
-            wrapcount++;
-        }
-        else
-        {
-            ret = tfw->Write(buf, count);
-            writepos += ret;
-            totalwritepos += ret;
-        }
+    ret = tfw->Write(buf, count);
+    totalwritepos += ret;
+    writepos += ret;
 
-        if (dumpfw)
-        {
-            int ret2 = dumpfw->Write(buf, count);
-            dumpwritepos += ret2;
-        }
+    if (!normalfile && dumpfw)
+    {
+	int ret2 = dumpfw->Write(buf, count);
+	dumpwritepos += ret2;
     }
 
     pthread_rwlock_unlock(&rwlock);
@@ -1031,13 +1034,10 @@
 
 long long RingBuffer::GetFileWritePosition(void)
 {
-    long long ret = -1;
     if (dumpfw)
-        ret = dumpwritepos;
-    else
-        ret = totalwritepos;
+        return dumpwritepos;
 
-    return ret;
+    return totalwritepos;
 }
 
 long long RingBuffer::Seek(long long pos, int whence)
@@ -1114,41 +1114,40 @@
 
 long long RingBuffer::WriterSeek(long long pos, int whence)
 {
-    long long ret = -1;
 
     if (dumpfw)
-	ret = dumpfw->Seek(pos, whence);
-    else if (tfw)
-	ret = tfw->Seek(pos, whence);
+	return dumpfw->Seek(pos, whence);
 
-    return ret;
+    if (tfw)
+	return tfw->Seek(pos, whence);
+
+    return -1;
 }
 
+
+// Return the amount of available space in a circular file. Returns -1
+// if the file isn't circular.
+
 long long RingBuffer::GetFreeSpace(void)
 {
-    if (!normalfile)
-    {
-        if (remoteencoder)
-            return remoteencoder->GetFreeSpace(totalreadpos);
-        return totalreadpos + filesize - totalwritepos - smudgeamount;
-    }
-    else
-        return -1;
+    if (normalfile)
+	return -1;
+
+    if (remoteencoder)
+	return remoteencoder->GetFreeSpace(totalreadpos);
+
+    return totalreadpos + filesize - totalwritepos - smudgeamount;
 }
 
 long long RingBuffer::GetFreeSpaceWithReadChange(long long readchange)
 {
-    if (!normalfile)
-    {
-        if (readchange > 0)
-            readchange = 0 - (filesize - readchange);
-
-	return GetFreeSpace() + readchange; 
-    }
-    else
-    {
+    if (normalfile)
         return readpos + readchange;
-    }
+
+    if (readchange > 0)
+	readchange = 0 - (filesize - readchange);
+
+    return GetFreeSpace() + readchange; 
 }
 
 long long RingBuffer::GetReadPosition(void)
Index: libs/libmythtv/RingBuffer.h
===================================================================
RCS file: /var/lib/mythcvs/mythtv/libs/libmythtv/RingBuffer.h,v
retrieving revision 1.30
diff -u -r1.30 RingBuffer.h
--- libs/libmythtv/RingBuffer.h	26 Apr 2003 05:15:31 -0000	1.30
+++ libs/libmythtv/RingBuffer.h	11 May 2003 03:42:58 -0000
@@ -79,8 +79,8 @@
     ThreadedFileWriter *tfw, *dumpfw;
     int fd2;
  
-    bool normalfile;
-    bool writemode;
+    bool normalfile;		/* Is this a circular file? */
+    bool writemode;		/* Are we writing to this file? */
     
     long long writepos;
     long long totalwritepos;
@@ -95,6 +95,8 @@
     long long wrapcount;
 
     bool stopreads;
+    int requested;		/* How much data has been requested that hasn't been received */
+    int fetch_ahead;		/* How much data we should pre-fetch from remote files. */
 
     pthread_rwlock_t rwlock;
 


More information about the mythtv-dev mailing list