ReverseAsyncRead ()

Rx includes the AsyncRead () method that reads a stream from the beginning returning an IObservable<byte[]>. This method does the same, but in reverse. The public method overloads accept a stream and optionally a bufferSize and startingPosition.

public static IObservable<byte[]> ReverseAsyncRead (this Stream stream) {
    return ReverseAsyncRead (stream, 2 << 15);
}

public static IObservable<byte[]> ReverseAsyncRead (
    this Stream stream, 
    int bufferSize) {

    ValidateParameters (stream, bufferSize);

    return Observable.Iterate<byte[]> (observer =>
        ReverseAsyncRead (stream, bufferSize, observer, stream.Length));
}

public static IObservable<byte[]> ReverseAsyncRead (
    this Stream stream, 
    int bufferSize, 
    long startingPosition) {

    ValidateParameters (stream, bufferSize);
    if (0 > startingPosition)
        throw new ArgumentOutOfRangeException ("startingPosition");

    return Observable.Iterate<byte[]> (observer => 
        ReverseAsyncRead (stream, bufferSize, observer, startingPosition));
}

Using IEnumerable<IObservable<object>>, yield, Observable.FromAsyncPattern (), and Observable.Iterate (), we’re able to read a stream in blocks (of bufferSize) from the end to the front. The public method calls Iterate () which passes an observer instance to the following private method:

static void ValidateParameters (Stream stream, int bufferSize) {
    if (null == stream)
        throw new ArgumentNullException ("stream");
    if (!stream.CanSeek)
        throw new ArgumentOutOfRangeException ("stream""stream.CanSeek is false");
    if (1 > bufferSize)
        throw new ArgumentOutOfRangeException ("bufferSize");
}

static IEnumerable<IObservable<object>> ReverseAsyncRead (
    Stream stream, 
    int bufferSize,
    IObserver<byte[]> observer, 
    long startingPosition) {


    Func<byte[], intintIObservable<int>> asyncRead =
        Observable.FromAsyncPattern<byte[], intintint> (
            stream.BeginRead, 
            stream.EndRead);

    byte[] buffer = new byte[bufferSize];
    long position = startingPosition;
    int read = bufferSize;

    do {
        // decrement position by the amount read
        position -= read;
        if (position < 0) {
            bufferSize = (int)position * -1;
            position = 0;
        }
        // set stream's position
        stream.Position = position;

        // call stream's Begin/End Read
        {
            ListObservable<int> asyncReadResult;
            try {
                asyncReadResult = asyncRead (buffer, 0, bufferSize).Start ();
            }
            catch (Exception exception) {
                observer.OnError (exception); yield break;
            }

            yield return asyncReadResult;
            read = asyncReadResult.Value;
        }

        // send value read to observer
        try {
            if (bufferSize == read)
                observer.OnNext (buffer);
            else
                observer.OnNext (buffer.Take (read).ToArray ());
        }
        catch (Exception exception) {
            observer.OnError (exception);
            yield break;
        }

        // continue until we can't read anymore
    } while (read > 0 && position > 0);

    observer.OnCompleted ();
}

Using yield and Stream’s async method calls, we’re able to observe all byte[] blocks in the stream reading backwards.