Optimizing tail with Rx

Chris was concerned that the initial Rx implementation was inefficiently reading an entire 5 GB text file throwing everything away except for the last 10 lines. To optimize I/O, I’ve modified the code to read the file in reverse. Here’s a new version that tails a 5 GB text file in less than 150 ms:

const int bufferSize = 2 << 15;
using (var stream = new FileStream
(
                        args[0],
                       
FileMode
.Open,
                       
FileAccess.Read,
                       
FileShare.Read,
                        bufferSize,
                       
FileOptions.Asynchronous | FileOptions.RandomAccess)) {
   
// seek to (EOF - bufferSize) and read stream backwards, returning IObservable<byte[]>
    stream.ReverseAsyncRead (bufferSize)
        // reverse bytes inside byte[]
        .Select(bytes => bytes.Reverse().ToArray())
       
// split byte[] on \r and \n
        .Split (new byte[] { 13 }, new byte[] { 10 })
       
// take 10 of the byte[] results
        .Take (10)
       
// reverse order of byte[]s, and the bytes inside the byte[]s
        .ToEnumerable ().Reverse ().Select (bytes => bytes.Reverse ().ToArray ()).ToObservable ()
       
// using ASCII encoding, convert bytes to string
        .Decode (Encoding
.ASCII)
       
// output the IObservable<string> to the standard output
        .Run (Console.WriteLine);
}

Starting from the end of the file, reading chunks backwards, reversing the byte[] chunks, delimiting by \r and \n (creating smaller byte[] results), take the first 10, reversing them back, converting to string (via ASCII encoding), and then sending to standard output. In this version we are reading from disk only the information required to tail the file – no unnecessary disk IOs. Unfortunately some of these extension methods didn’t exist, so I had to create three to build this:

  • ReverseAsyncRead (this Stream): just like AsyncRead (), but in reverse
  • Split<T> (this T[]): just like string.Split (), but generic, for any array, and for observable input arrays
  • Decode (this byte[]): using Encoding and a Decoder, converts a IObservable<byte[]> to IObservable<string>