Async options in .NET

To demonstrate the different async options in .NET, I wrote the following samples that call the async read methods of the .NET FileStream. The following APM, TPL, and Rx code samples complete in roughly the same amount of time. I am biased to Rx, so, it’s the most elegent code sample of them all.

APM sample

int count = 0;
byte[] buffer = new byte[__bufferSize];
ManualResetEventSlim flag = new ManualResetEventSlim ();
Action read = null;
read = () => fileStream.BeginRead (buffer0__bufferSizear => {
    int bytesRead = fileStream.EndRead (ar);
    if (0 == bytesRead)
        flag.Set ();
    else {
        if (count++ > __count) {
            Console.WriteLine (stopwatch.Elapsed);
            count = 0;
        }
        read ();
    }
}, null);
read ();
flag.Wait ();

A counter, buffer, and waithandle are created. An action delegate calls the beginRead delegate, and registers an anonymous delegate as the callback. In this callback, the EndRead method is called which returns the bytesRead. If it’s zero, the waithandle is signaled and the method exits, otherwise, a count is increased (and checked against a constant to write progress), and calls itself to loop.

TPL sample

int count = 0;
byte[] buffer = new byte[__bufferSize];
ManualResetEventSlim flag = new ManualResetEventSlim ();
Func<Task<int>> readAsync = () => Task.Factory.FromAsync<byte[], intintint> (
        fileStream.BeginRead,
        fileStream.EndRead,
        buffer,
        0,
        __bufferSize,
        null);
Action read = null;
read = () => {
    readAsync ().ContinueWith (task => {
        if (0 == task.Result)
            flag.Set ();
        else {
            if (count++ > __count) {
                Console.WriteLine (stopwatch.Elapsed);
                count = 0;
            }
            read ();
        }
    });
};
read ();
flag.Wait ();

An action delegate invokes the task and hooks in a continuation delegate to handle the completion. In the continuation if the result is 0 the waithandle is set and the method exists. Otherwise the count in increased  (and checked against a constant to write progress), and calls itself to loop.

Rx sample

byte[] buffer = new byte[__bufferSize];
var readAsync = Observable.FromAsyncPattern<byte[], intintint> (
    fileStream.BeginReadfileStream.EndRead);
Observable.Repeat (0).SelectMany (_ =>
    readAsync (buffer0__bufferSize)).TakeWhile (bytesRead =>
        bytesRead > 0).Buffer (__count).ForEach (results =>
        Console.WriteLine ("{0} min:{1} max:{2} count:{3}",
        stopwatch.Elapsed,
        results.Min (),
        results.Max (),
        results.Count));

A infinite sequence of 0 is created, the value 0 is ignored by the SelectMany operator by invoking the observable factory function, constrain the sequence to only observe when bytesRead > 0, buffer the results into a sequence of IList<int> and we block with the ForEach operator to display the progress to the console.

Thanks to David Fowler for recomending the idea to compare the differences between the different aync options in .NET!