Navigation
Links

Powered by Squarespace
Sunday
Nov252012

I'm lucky

Today I received an email stating I'm entitled to $1.5 million dollars. The only decision I need to make is if I should receive an ATM card to use that money, or, if I should opt for a check. There are benefits to either decision. Recommendations?

US-FBI-ShadedSeal.svg

Federal Bureau of Investigation

FBI Seattle Division
1110 Third Avenue
Seattle. 98101-2904

Attn:

This is Robert S. Mueller III of Federal Bureau of Investigation (FBI) The FBI in alliance with the Economic Community of West Asia States (ECOWAS), with their Head Office there in West Africa has been working towards the total eradication of fraudsters and scam in the Western part of West Africa With the help of the United States Government and the (U.N) United Nations. We have been able to track down so many of this scammers in various parts of African Region, which includes ( REPUBLIC OF BENIN, TOGO, GHANA CAMEROON, SENEGAL AND NIGERIA) and they are all in our custody over there, meanwhile during the process of arrest, We were able to recover some funds from the prosecuted scammer.

Regarding this great achievement, the (U.N) United Nations Anti-crime commission and the United State Government have ordered that the money recovered from these Scammers be shared among 100 Lucky people around the globe. This email is been directed to you because your email address was found in one of the file and computer hard disk recovered from the prosecuted scammers there in our custody. You are therefore being compensated with the sum of $1.5 Million Dollars. We have also arrested all those who claimed to be Barristers, Bank Officials, Lottery Agents, Person or group of Person's who claim to have funds in their custody that belonged to you or want you to be the Next of kin of such funds which do not exist.

Now how would you like to receive your payment? Because we have two method of payment which is by Check or by ATM card?

ATM Card: We will be issuing you a custom pin based ATM card which you will use to withdraw up to $3,000 per day from any ATM machine that has the Master Card Logo on it and the card have to be renewed in 4 years time which is 2016. Also with the ATM card you will be able to transfer your funds to your local bank account. The ATM card comes with a handbook or manual to enlighten you about how to use it, even if you do not have a bank account.

Check: To be deposited in your bank for it to be cleared within three working days. Your payment would be sent to you via any of your preferred option and would be mailed to you via UPS. Because we have signed a contract with UPS which should expire in next three weeks, you will only need to pay $205 instead of $420 saving you $215 So if you pay before the three weeks in October 25, 2012 you save $215 Take note that anyone asking you for some kind of money above the usual fee of $205usd is definitely a fraudsters and you will have to stop communication with every other person if you have been in contact with any.

Enclosure

Sunday
Aug282011

Different async options in .NET

To demonstrate the different async options in .NET, I wrote the following samples that call the async read methods of the .NET FileStream. The following APM, TPL, and Rx code samples complete in roughly the same amount of time. I am biased to Rx, so, it's the most elegent code sample of them all.

APM sample

int count = 0;
byte[] buffer = new byte[__bufferSize];
ManualResetEventSlim flag = new ManualResetEventSlim ();
Action read = null;
read = () => fileStream.BeginRead (buffer0__bufferSizear => {
    int bytesRead = fileStream.EndRead (ar);
    if (0 == bytesRead)
        flag.Set ();
    else {
        if (count++ > __count) {
            Console.WriteLine (stopwatch.Elapsed);
            count = 0;
        }
        read ();
    }
}, null);
 
read ();
flag.Wait ();

A counter, buffer, and waithandle are created. An action delegate calls the beginRead delegate, and registers an anonymous delegate as the callback. In this callback, the EndRead method is called which returns the bytesRead. If it's zero, the waithandle is signaled and the method exits, otherwise, a count is increased (and checked against a constant to write progress), and calls itself to loop.

TPL sample

int count = 0;
byte[] buffer = new byte[__bufferSize];
ManualResetEventSlim flag = new ManualResetEventSlim ();
 
Func<Task<int>> readAsync = () => Task.Factory.FromAsync<byte[], intintint> (
        fileStream.BeginRead,
        fileStream.EndRead,
        buffer,
        0,
        __bufferSize,
        null);
 
Action read = null;
read = () => {
    readAsync ().ContinueWith (task => {
        if (0 == task.Result)
            flag.Set ();
        else {
            if (count++ > __count) {
                Console.WriteLine (stopwatch.Elapsed);
                count = 0;
            }
            read ();
        }
    });
};
 
read ();
flag.Wait ();

An action delegate invokes the task and hooks in a continuation delegate to handle the completion. In the continuation if the result is 0 the waithandle is set and the method exists. Otherwise the count in increased  (and checked against a constant to write progress), and calls itself to loop.

Rx sample

byte[] buffer = new byte[__bufferSize];
var readAsync = Observable.FromAsyncPattern<byte[], intintint> (
    fileStream.BeginReadfileStream.EndRead);
 
Observable.Repeat (0).SelectMany (_ =>
    readAsync (buffer0__bufferSize)).TakeWhile (bytesRead =>
        bytesRead > 0).Buffer (__count).ForEach (results =>
        Console.WriteLine ("{0} min:{1} max:{2} count:{3}",
        stopwatch.Elapsed,
        results.Min (),
        results.Max (),
        results.Count));


A infinite sequence of 0 is created, the value 0 is ignored by the SelectMany operator by invoking the observable factory function, constrain the sequence to only observe when bytesRead > 0, buffer the results into a sequence of IList<int> and we block with the ForEach operator to display the progress to the console.

Thanks to David Fowler for recomending the idea to compare the differences between the different aync options in .NET!

Monday
Jan102011

IEnumerable & IObservable Split ()

Another IObservable<> extension method I needed was Split – just like string.Split (), but generically for any array and an IObservable<T[]>. I started first with a generic IEnumberable<T> Split<T> ():

/// <summary>
/// Returns one or more arrays that contains the subarrays in this
/// instance that are delimited by elements of a specified array.
/// </summary>
/// <typeparam name="T">type of the array</typeparam>
/// <param name="value">value to split</param>
/// <param name="separator">one or more separators</param>
/// <remarks>not efficient with separator lengths > 1</remarks>
/// <returns></returns>
public static IEnumerable<T[]> Split<T> (this T[] value, params T[] separator) {
    int separatorLength = separator.Length;
    int startingIndex = 0;
    int index = -1;
    int length;

    do {
        /* loop through each item in the separator
            * array verifing it exists in the source array */
        for (int i = 0; i < separatorLength; i++) {
            index = Array.IndexOf (value, separator[i], startingIndex);
            if (-1 == index)
                break;
        }
        if (-1 < index) {
            // delimiter matched successfully
            length = index - startingIndex;
            T[] output = new T[length];
            Array.Copy (value, startingIndex, output, 0, length);
            yield return output;
            startingIndex = index + 1;
        }
    } while (-1 < index);

    if (0 == startingIndex)
        // no match, send entire value
        yield return value;
    else {
        // no more matches, send items left
        length = value.Length - startingIndex;
        T[] output = new T[length];
        Array.Copy (value, startingIndex, output, 0, length);
    }
}

Now that I can split an array, I need to an IObservable<T[]> Split() as well:

/// <summary>
/// Splits the arrays from the source observable by the separator
/// </summary>
/// <typeparam name="TSource">type of the source array</typeparam>
/// <param name="source">source array</param>
/// <param name="separator">the separator(s)</param>
/// <returns>An observable containing the same number of items (if the
/// separator(s) are not found) or additional items by spliting the incoming
/// arrays</returns>
public static IObservable<TSource[]> Split<TSource> (
    this IObservable<TSource[]> source, params TSource[][] separator) {

    if (source == null)
        throw new ArgumentNullException ("source", "source is null.");
    if (separator.Length < 1)
        throw new ArgumentOutOfRangeException ("separator");

    IObservable<TSource[]> value = source;
    /* foreach of the delimiter arrays passed in
     * chain the call to Split () */
    separator.Run (item => value = value.Split (item));
    return value;
}

/// <summary>
/// Splits the arrays from the source observable by the delimiter
/// </summary>
/// <typeparam name="TSource">type of the source array</typeparam>
/// <param name="source">source array</param>
/// <param name="separator">the delimiter</param>
/// <returns>An observable containing the same number of items (if the
/// delimiter is not found) or additional items by spliting the incoming
/// arrays</returns>
public static IObservable<TSource[]> Split<TSource> (
    this IObservable<TSource[]> source, params TSource[] separator) {

    if (source == null)
        throw new ArgumentNullException ("source");
    if (separator.Length < 1)
        throw new ArgumentOutOfRangeException ("delimiter");

    return Observable.CreateWithDisposable<TSource[]> (observer =>
        source.Subscribe<TSource[]> (value =>
                value.Split (separator).Run (observer.OnNext),
                observer.OnError, observer.OnCompleted)
    );
}

Of course these extension methods can be further optimized and handle more use cases - especially by using IList<T> instead of array.

Monday
Jan032011

ReverseAsyncRead ()

Rx includes the AsyncRead () method that reads a stream from the beginning returning an IObservable<byte[]>. This method does the same, but in reverse. The public method overloads accept a stream and optionally a bufferSize and startingPosition.

public static IObservable<byte[]> ReverseAsyncRead (this Stream stream) {
    return ReverseAsyncRead (stream, 2 << 15);
}

public static IObservable<byte[]> ReverseAsyncRead (
    this Stream stream, 
    int bufferSize) {

    ValidateParameters (stream, bufferSize);

    return Observable.Iterate<byte[]> (observer =>
        ReverseAsyncRead (stream, bufferSize, observer, stream.Length));
}

public static IObservable<byte[]> ReverseAsyncRead (
    this Stream stream, 
    int bufferSize, 
    long startingPosition) {

    ValidateParameters (stream, bufferSize);
    if (0 > startingPosition)
        throw new ArgumentOutOfRangeException ("startingPosition");

    return Observable.Iterate<byte[]> (observer => 
        ReverseAsyncRead (stream, bufferSize, observer, startingPosition));
}

Using IEnumerable<IObservable<object>>, yield, Observable.FromAsyncPattern (), and Observable.Iterate (), we’re able to read a stream in blocks (of bufferSize) from the end to the front. The public method calls Iterate () which passes an observer instance to the following private method:

static void ValidateParameters (Stream stream, int bufferSize) {
    if (null == stream)
        throw new ArgumentNullException ("stream");
    if (!stream.CanSeek)
        throw new ArgumentOutOfRangeException ("stream""stream.CanSeek is false");
    if (1 > bufferSize)
        throw new ArgumentOutOfRangeException ("bufferSize");
}

static IEnumerable<IObservable<object>> ReverseAsyncRead (
    Stream stream, 
    int bufferSize,
    IObserver<byte[]> observer, 
    long startingPosition) {


    Func<byte[], intintIObservable<int>> asyncRead =
        Observable.FromAsyncPattern<byte[], intintint> (
            stream.BeginRead, 
            stream.EndRead);

    byte[] buffer = new byte[bufferSize];
    long position = startingPosition;
    int read = bufferSize;

    do {
        // decrement position by the amount read
        position -= read;
        if (position < 0) {
            bufferSize = (int)position * -1;
            position = 0;
        }
        // set stream's position
        stream.Position = position;

        // call stream's Begin/End Read
        {
            ListObservable<int> asyncReadResult;
            try {
                asyncReadResult = asyncRead (buffer, 0, bufferSize).Start ();
            }
            catch (Exception exception) {
                observer.OnError (exception); yield break;
            }

            yield return asyncReadResult;
            read = asyncReadResult.Value;
        }

        // send value read to observer
        try {
            if (bufferSize == read)
                observer.OnNext (buffer);
            else
                observer.OnNext (buffer.Take (read).ToArray ());
        }
        catch (Exception exception) {
            observer.OnError (exception);
            yield break;
        }

        // continue until we can't read anymore
    } while (read > 0 && position > 0);

    observer.OnCompleted ();
}

Using yield and Stream’s async method calls, we’re able to observe all byte[] blocks in the stream reading backwards.

Wednesday
Dec292010

Optimizing tail with Rx

Chris was concerned that the initial Rx implementation was inefficiently reading an entire 5 GB text file throwing everything away except for the last 10 lines. To optimize I/O, I've modified the code to read the file in reverse. Here's a new version that tails a 5 GB text file in less than 150 ms:

const int bufferSize = 2 << 15;
using (var stream = new FileStream
(
                        args[0],
                       
FileMode
.Open,
                       
FileAccess.Read,
                       
FileShare.Read,
                        bufferSize,
                       
FileOptions.Asynchronous | FileOptions.RandomAccess)) {
   
// seek to (EOF - bufferSize) and read stream backwards, returning IObservable<byte[]>
    stream.ReverseAsyncRead (bufferSize)
        // reverse bytes inside byte[]
        .Select(bytes => bytes.Reverse().ToArray())
       
// split byte[] on \r and \n
        .Split (new byte[] { 13 }, new byte[] { 10 })
       
// take 10 of the byte[] results
        .Take (10)
       
// reverse order of byte[]s, and the bytes inside the byte[]s
        .ToEnumerable ().Reverse ().Select (bytes => bytes.Reverse ().ToArray ()).ToObservable ()
       
// using ASCII encoding, convert bytes to string
        .Decode (Encoding
.ASCII)
       
// output the IObservable<string> to the standard output
        .Run (Console.WriteLine);
}

Starting from the end of the file, reading chunks backwards, reversing the byte[] chunks, delimiting by \r and \n (creating smaller byte[] results), take the first 10, reversing them back, converting to string (via ASCII encoding), and then sending to standard output. In this version we are reading from disk only the information required to tail the file - no unnecessary disk IOs. Unfortunately some of these extension methods didn't exist, so I had to create three to build this:

  • ReverseAsyncRead (this Stream): just like AsyncRead (), but in reverse
  • Split<T> (this T[]): just like string.Split (), but generic, for any array, and for observable input arrays
  • Decode (this byte[]): using Encoding and a Decoder, converts a IObservable<byte[]> to IObservable<string>