Async options in .NET

To demonstrate the different async options in .NET, I wrote the following samples that call the async read methods of the .NET FileStream. The following APM, TPL, and Rx code samples complete in roughly the same amount of time. I am biased to Rx, so, it’s the most elegent code sample of them all.

APM sample

int count = 0;
byte[] buffer = new byte[__bufferSize];
ManualResetEventSlim flag = new ManualResetEventSlim ();
Action read = null;
read = () => fileStream.BeginRead (buffer0__bufferSizear => {
    int bytesRead = fileStream.EndRead (ar);
    if (0 == bytesRead)
        flag.Set ();
    else {
        if (count++ > __count) {
            Console.WriteLine (stopwatch.Elapsed);
            count = 0;
        }
        read ();
    }
}, null);
read ();
flag.Wait ();

A counter, buffer, and waithandle are created. An action delegate calls the beginRead delegate, and registers an anonymous delegate as the callback. In this callback, the EndRead method is called which returns the bytesRead. If it’s zero, the waithandle is signaled and the method exits, otherwise, a count is increased (and checked against a constant to write progress), and calls itself to loop.

TPL sample

int count = 0;
byte[] buffer = new byte[__bufferSize];
ManualResetEventSlim flag = new ManualResetEventSlim ();
Func<Task<int>> readAsync = () => Task.Factory.FromAsync<byte[], intintint> (
        fileStream.BeginRead,
        fileStream.EndRead,
        buffer,
        0,
        __bufferSize,
        null);
Action read = null;
read = () => {
    readAsync ().ContinueWith (task => {
        if (0 == task.Result)
            flag.Set ();
        else {
            if (count++ > __count) {
                Console.WriteLine (stopwatch.Elapsed);
                count = 0;
            }
            read ();
        }
    });
};
read ();
flag.Wait ();

An action delegate invokes the task and hooks in a continuation delegate to handle the completion. In the continuation if the result is 0 the waithandle is set and the method exists. Otherwise the count in increased  (and checked against a constant to write progress), and calls itself to loop.

Rx sample

byte[] buffer = new byte[__bufferSize];
var readAsync = Observable.FromAsyncPattern<byte[], intintint> (
    fileStream.BeginReadfileStream.EndRead);
Observable.Repeat (0).SelectMany (_ =>
    readAsync (buffer0__bufferSize)).TakeWhile (bytesRead =>
        bytesRead > 0).Buffer (__count).ForEach (results =>
        Console.WriteLine ("{0} min:{1} max:{2} count:{3}",
        stopwatch.Elapsed,
        results.Min (),
        results.Max (),
        results.Count));

A infinite sequence of 0 is created, the value 0 is ignored by the SelectMany operator by invoking the observable factory function, constrain the sequence to only observe when bytesRead > 0, buffer the results into a sequence of IList<int> and we block with the ForEach operator to display the progress to the console.

Thanks to David Fowler for recomending the idea to compare the differences between the different aync options in .NET!

IEnumerable & IObservable Split ()

Another IObservable<> extension method I needed was Split – just like string.Split (), but generically for any array and an IObservable<T[]>. I started first with a generic IEnumberable<T> Split<T> ():

/// <summary>
/// Returns one or more arrays that contains the subarrays in this
/// instance that are delimited by elements of a specified array.
/// </summary>
/// <typeparam name="T">type of the array</typeparam>
/// <param name="value">value to split</param>
/// <param name="separator">one or more separators</param>
/// <remarks>not efficient with separator lengths > 1</remarks>
/// <returns></returns>
public static IEnumerable<T[]> Split<T> (this T[] value, params T[] separator) {
    int separatorLength = separator.Length;
    int startingIndex = 0;
    int index = -1;
    int length;

    do {
        /* loop through each item in the separator
            * array verifing it exists in the source array */
        for (int i = 0; i < separatorLength; i++) {
            index = Array.IndexOf (value, separator[i], startingIndex);
            if (-1 == index)
                break;
        }
        if (-1 < index) {
            // delimiter matched successfully
            length = index - startingIndex;
            T[] output = new T[length];
            Array.Copy (value, startingIndex, output, 0, length);
            yield return output;
            startingIndex = index + 1;
        }
    } while (-1 < index);

    if (0 == startingIndex)
        // no match, send entire value
        yield return value;
    else {
        // no more matches, send items left
        length = value.Length - startingIndex;
        T[] output = new T[length];
        Array.Copy (value, startingIndex, output, 0, length);
    }
}

Now that I can split an array, I need to an IObservable<T[]> Split() as well:

/// <summary>
/// Splits the arrays from the source observable by the separator
/// </summary>
/// <typeparam name="TSource">type of the source array</typeparam>
/// <param name="source">source array</param>
/// <param name="separator">the separator(s)</param>
/// <returns>An observable containing the same number of items (if the
/// separator(s) are not found) or additional items by spliting the incoming
/// arrays</returns>
public static IObservable<TSource[]> Split<TSource> (
    this IObservable<TSource[]> source, params TSource[][] separator) {

    if (source == null)
        throw new ArgumentNullException ("source", "source is null.");
    if (separator.Length < 1)
        throw new ArgumentOutOfRangeException ("separator");

    IObservable<TSource[]> value = source;
    /* foreach of the delimiter arrays passed in
     * chain the call to Split () */
    separator.Run (item => value = value.Split (item));
    return value;
}

/// <summary>
/// Splits the arrays from the source observable by the delimiter
/// </summary>
/// <typeparam name="TSource">type of the source array</typeparam>
/// <param name="source">source array</param>
/// <param name="separator">the delimiter</param>
/// <returns>An observable containing the same number of items (if the
/// delimiter is not found) or additional items by spliting the incoming
/// arrays</returns>
public static IObservable<TSource[]> Split<TSource> (
    this IObservable<TSource[]> source, params TSource[] separator) {

    if (source == null)
        throw new ArgumentNullException ("source");
    if (separator.Length < 1)
        throw new ArgumentOutOfRangeException ("delimiter");

    return Observable.CreateWithDisposable<TSource[]> (observer =>
        source.Subscribe<TSource[]> (value =>
                value.Split (separator).Run (observer.OnNext),
                observer.OnError, observer.OnCompleted)
    );
}

Of course these extension methods can be further optimized and handle more use cases – especially by using IList<T> instead of array.

ReverseAsyncRead ()

Rx includes the AsyncRead () method that reads a stream from the beginning returning an IObservable<byte[]>. This method does the same, but in reverse. The public method overloads accept a stream and optionally a bufferSize and startingPosition.

public static IObservable<byte[]> ReverseAsyncRead (this Stream stream) {
    return ReverseAsyncRead (stream, 2 << 15);
}

public static IObservable<byte[]> ReverseAsyncRead (
    this Stream stream, 
    int bufferSize) {

    ValidateParameters (stream, bufferSize);

    return Observable.Iterate<byte[]> (observer =>
        ReverseAsyncRead (stream, bufferSize, observer, stream.Length));
}

public static IObservable<byte[]> ReverseAsyncRead (
    this Stream stream, 
    int bufferSize, 
    long startingPosition) {

    ValidateParameters (stream, bufferSize);
    if (0 > startingPosition)
        throw new ArgumentOutOfRangeException ("startingPosition");

    return Observable.Iterate<byte[]> (observer => 
        ReverseAsyncRead (stream, bufferSize, observer, startingPosition));
}

Using IEnumerable<IObservable<object>>, yield, Observable.FromAsyncPattern (), and Observable.Iterate (), we’re able to read a stream in blocks (of bufferSize) from the end to the front. The public method calls Iterate () which passes an observer instance to the following private method:

static void ValidateParameters (Stream stream, int bufferSize) {
    if (null == stream)
        throw new ArgumentNullException ("stream");
    if (!stream.CanSeek)
        throw new ArgumentOutOfRangeException ("stream""stream.CanSeek is false");
    if (1 > bufferSize)
        throw new ArgumentOutOfRangeException ("bufferSize");
}

static IEnumerable<IObservable<object>> ReverseAsyncRead (
    Stream stream, 
    int bufferSize,
    IObserver<byte[]> observer, 
    long startingPosition) {


    Func<byte[], intintIObservable<int>> asyncRead =
        Observable.FromAsyncPattern<byte[], intintint> (
            stream.BeginRead, 
            stream.EndRead);

    byte[] buffer = new byte[bufferSize];
    long position = startingPosition;
    int read = bufferSize;

    do {
        // decrement position by the amount read
        position -= read;
        if (position < 0) {
            bufferSize = (int)position * -1;
            position = 0;
        }
        // set stream's position
        stream.Position = position;

        // call stream's Begin/End Read
        {
            ListObservable<int> asyncReadResult;
            try {
                asyncReadResult = asyncRead (buffer, 0, bufferSize).Start ();
            }
            catch (Exception exception) {
                observer.OnError (exception); yield break;
            }

            yield return asyncReadResult;
            read = asyncReadResult.Value;
        }

        // send value read to observer
        try {
            if (bufferSize == read)
                observer.OnNext (buffer);
            else
                observer.OnNext (buffer.Take (read).ToArray ());
        }
        catch (Exception exception) {
            observer.OnError (exception);
            yield break;
        }

        // continue until we can't read anymore
    } while (read > 0 && position > 0);

    observer.OnCompleted ();
}

Using yield and Stream’s async method calls, we’re able to observe all byte[] blocks in the stream reading backwards.

Optimizing tail with Rx

Chris was concerned that the initial Rx implementation was inefficiently reading an entire 5 GB text file throwing everything away except for the last 10 lines. To optimize I/O, I’ve modified the code to read the file in reverse. Here’s a new version that tails a 5 GB text file in less than 150 ms:

const int bufferSize = 2 << 15;
using (var stream = new FileStream
(
                        args[0],
                       
FileMode
.Open,
                       
FileAccess.Read,
                       
FileShare.Read,
                        bufferSize,
                       
FileOptions.Asynchronous | FileOptions.RandomAccess)) {
   
// seek to (EOF - bufferSize) and read stream backwards, returning IObservable<byte[]>
    stream.ReverseAsyncRead (bufferSize)
        // reverse bytes inside byte[]
        .Select(bytes => bytes.Reverse().ToArray())
       
// split byte[] on \r and \n
        .Split (new byte[] { 13 }, new byte[] { 10 })
       
// take 10 of the byte[] results
        .Take (10)
       
// reverse order of byte[]s, and the bytes inside the byte[]s
        .ToEnumerable ().Reverse ().Select (bytes => bytes.Reverse ().ToArray ()).ToObservable ()
       
// using ASCII encoding, convert bytes to string
        .Decode (Encoding
.ASCII)
       
// output the IObservable<string> to the standard output
        .Run (Console.WriteLine);
}

Starting from the end of the file, reading chunks backwards, reversing the byte[] chunks, delimiting by \r and \n (creating smaller byte[] results), take the first 10, reversing them back, converting to string (via ASCII encoding), and then sending to standard output. In this version we are reading from disk only the information required to tail the file – no unnecessary disk IOs. Unfortunately some of these extension methods didn’t exist, so I had to create three to build this:

  • ReverseAsyncRead (this Stream): just like AsyncRead (), but in reverse
  • Split<T> (this T[]): just like string.Split (), but generic, for any array, and for observable input arrays
  • Decode (this byte[]): using Encoding and a Decoder, converts a IObservable<byte[]> to IObservable<string>