IEnumerable & IObservable Split ()

Another IObservable<> extension method I needed was Split – just like string.Split (), but generically for any array and an IObservable<T[]>. I started first with a generic IEnumberable<T> Split<T> ():

/// <summary>
/// Returns one or more arrays that contains the subarrays in this
/// instance that are delimited by elements of a specified array.
/// </summary>
/// <typeparam name="T">type of the array</typeparam>
/// <param name="value">value to split</param>
/// <param name="separator">one or more separators</param>
/// <remarks>not efficient with separator lengths > 1</remarks>
/// <returns></returns>
public static IEnumerable<T[]> Split<T> (this T[] value, params T[] separator) {
    int separatorLength = separator.Length;
    int startingIndex = 0;
    int index = -1;
    int length;

    do {
        /* loop through each item in the separator
            * array verifing it exists in the source array */
        for (int i = 0; i < separatorLength; i++) {
            index = Array.IndexOf (value, separator[i], startingIndex);
            if (-1 == index)
                break;
        }
        if (-1 < index) {
            // delimiter matched successfully
            length = index - startingIndex;
            T[] output = new T[length];
            Array.Copy (value, startingIndex, output, 0, length);
            yield return output;
            startingIndex = index + 1;
        }
    } while (-1 < index);

    if (0 == startingIndex)
        // no match, send entire value
        yield return value;
    else {
        // no more matches, send items left
        length = value.Length - startingIndex;
        T[] output = new T[length];
        Array.Copy (value, startingIndex, output, 0, length);
    }
}

Now that I can split an array, I need to an IObservable<T[]> Split() as well:

/// <summary>
/// Splits the arrays from the source observable by the separator
/// </summary>
/// <typeparam name="TSource">type of the source array</typeparam>
/// <param name="source">source array</param>
/// <param name="separator">the separator(s)</param>
/// <returns>An observable containing the same number of items (if the
/// separator(s) are not found) or additional items by spliting the incoming
/// arrays</returns>
public static IObservable<TSource[]> Split<TSource> (
    this IObservable<TSource[]> source, params TSource[][] separator) {

    if (source == null)
        throw new ArgumentNullException ("source", "source is null.");
    if (separator.Length < 1)
        throw new ArgumentOutOfRangeException ("separator");

    IObservable<TSource[]> value = source;
    /* foreach of the delimiter arrays passed in
     * chain the call to Split () */
    separator.Run (item => value = value.Split (item));
    return value;
}

/// <summary>
/// Splits the arrays from the source observable by the delimiter
/// </summary>
/// <typeparam name="TSource">type of the source array</typeparam>
/// <param name="source">source array</param>
/// <param name="separator">the delimiter</param>
/// <returns>An observable containing the same number of items (if the
/// delimiter is not found) or additional items by spliting the incoming
/// arrays</returns>
public static IObservable<TSource[]> Split<TSource> (
    this IObservable<TSource[]> source, params TSource[] separator) {

    if (source == null)
        throw new ArgumentNullException ("source");
    if (separator.Length < 1)
        throw new ArgumentOutOfRangeException ("delimiter");

    return Observable.CreateWithDisposable<TSource[]> (observer =>
        source.Subscribe<TSource[]> (value =>
                value.Split (separator).Run (observer.OnNext),
                observer.OnError, observer.OnCompleted)
    );
}

Of course these extension methods can be further optimized and handle more use cases – especially by using IList<T> instead of array.