Navigation
Links

Powered by Squarespace

Entries in LINQ (7)

Monday
Jan102011

IEnumerable & IObservable Split ()

Another IObservable<> extension method I needed was Split – just like string.Split (), but generically for any array and an IObservable<T[]>. I started first with a generic IEnumberable<T> Split<T> ():

/// <summary>
/// Returns one or more arrays that contains the subarrays in this
/// instance that are delimited by elements of a specified array.
/// </summary>
/// <typeparam name="T">type of the array</typeparam>
/// <param name="value">value to split</param>
/// <param name="separator">one or more separators</param>
/// <remarks>not efficient with separator lengths > 1</remarks>
/// <returns></returns>
public static IEnumerable<T[]> Split<T> (this T[] value, params T[] separator) {
    int separatorLength = separator.Length;
    int startingIndex = 0;
    int index = -1;
    int length;

    do {
        /* loop through each item in the separator
            * array verifing it exists in the source array */
        for (int i = 0; i < separatorLength; i++) {
            index = Array.IndexOf (value, separator[i], startingIndex);
            if (-1 == index)
                break;
        }
        if (-1 < index) {
            // delimiter matched successfully
            length = index - startingIndex;
            T[] output = new T[length];
            Array.Copy (value, startingIndex, output, 0, length);
            yield return output;
            startingIndex = index + 1;
        }
    } while (-1 < index);

    if (0 == startingIndex)
        // no match, send entire value
        yield return value;
    else {
        // no more matches, send items left
        length = value.Length - startingIndex;
        T[] output = new T[length];
        Array.Copy (value, startingIndex, output, 0, length);
    }
}

Now that I can split an array, I need to an IObservable<T[]> Split() as well:

/// <summary>
/// Splits the arrays from the source observable by the separator
/// </summary>
/// <typeparam name="TSource">type of the source array</typeparam>
/// <param name="source">source array</param>
/// <param name="separator">the separator(s)</param>
/// <returns>An observable containing the same number of items (if the
/// separator(s) are not found) or additional items by spliting the incoming
/// arrays</returns>
public static IObservable<TSource[]> Split<TSource> (
    this IObservable<TSource[]> source, params TSource[][] separator) {

    if (source == null)
        throw new ArgumentNullException ("source", "source is null.");
    if (separator.Length < 1)
        throw new ArgumentOutOfRangeException ("separator");

    IObservable<TSource[]> value = source;
    /* foreach of the delimiter arrays passed in
     * chain the call to Split () */
    separator.Run (item => value = value.Split (item));
    return value;
}

/// <summary>
/// Splits the arrays from the source observable by the delimiter
/// </summary>
/// <typeparam name="TSource">type of the source array</typeparam>
/// <param name="source">source array</param>
/// <param name="separator">the delimiter</param>
/// <returns>An observable containing the same number of items (if the
/// delimiter is not found) or additional items by spliting the incoming
/// arrays</returns>
public static IObservable<TSource[]> Split<TSource> (
    this IObservable<TSource[]> source, params TSource[] separator) {

    if (source == null)
        throw new ArgumentNullException ("source");
    if (separator.Length < 1)
        throw new ArgumentOutOfRangeException ("delimiter");

    return Observable.CreateWithDisposable<TSource[]> (observer =>
        source.Subscribe<TSource[]> (value =>
                value.Split (separator).Run (observer.OnNext),
                observer.OnError, observer.OnCompleted)
    );
}

Of course these extension methods can be further optimized and handle more use cases - especially by using IList<T> instead of array.

Wednesday
Dec292010

Optimizing tail with Rx

Chris was concerned that the initial Rx implementation was inefficiently reading an entire 5 GB text file throwing everything away except for the last 10 lines. To optimize I/O, I've modified the code to read the file in reverse. Here's a new version that tails a 5 GB text file in less than 150 ms:

const int bufferSize = 2 << 15;
using (var stream = new FileStream
(
                        args[0],
                       
FileMode
.Open,
                       
FileAccess.Read,
                       
FileShare.Read,
                        bufferSize,
                       
FileOptions.Asynchronous | FileOptions.RandomAccess)) {
   
// seek to (EOF - bufferSize) and read stream backwards, returning IObservable<byte[]>
    stream.ReverseAsyncRead (bufferSize)
        // reverse bytes inside byte[]
        .Select(bytes => bytes.Reverse().ToArray())
       
// split byte[] on \r and \n
        .Split (new byte[] { 13 }, new byte[] { 10 })
       
// take 10 of the byte[] results
        .Take (10)
       
// reverse order of byte[]s, and the bytes inside the byte[]s
        .ToEnumerable ().Reverse ().Select (bytes => bytes.Reverse ().ToArray ()).ToObservable ()
       
// using ASCII encoding, convert bytes to string
        .Decode (Encoding
.ASCII)
       
// output the IObservable<string> to the standard output
        .Run (Console.WriteLine);
}

Starting from the end of the file, reading chunks backwards, reversing the byte[] chunks, delimiting by \r and \n (creating smaller byte[] results), take the first 10, reversing them back, converting to string (via ASCII encoding), and then sending to standard output. In this version we are reading from disk only the information required to tail the file - no unnecessary disk IOs. Unfortunately some of these extension methods didn't exist, so I had to create three to build this:

  • ReverseAsyncRead (this Stream): just like AsyncRead (), but in reverse
  • Split<T> (this T[]): just like string.Split (), but generic, for any array, and for observable input arrays
  • Decode (this byte[]): using Encoding and a Decoder, converts a IObservable<byte[]> to IObservable<string>
Wednesday
Dec102008

Sequence membership

Given two sorted sequences, this method returns the intersection, a-b, and b-a, using a single iteration over both sequences. The return type is an IEnumerable<KeyValuePair<T, int>>, where:

  • only in A = -1

  • in both = 0

  • only in B = 1


/// <summary>
/// Return an int for every element in sortedSetA and sortedSetB specifying objects
/// that belong to A but not to B (-1), objects which are both in A and in B (0), and objects
/// that belong to B but not A (1).
/// </summary>
/// <remarks>This method is an O(n+m) operation when the two sets have different members, where n
/// is the count of <paramref name="sortedSetA"/> and m is the count of <paramref name="sortedSetB"/>.
/// Otherwise the operation approaches O(n+m-n) where m is the count of the larger set and n is the
/// smaller set.
/// </remarks>
/// <typeparam name="T">Type of element in each set</typeparam>
/// <param name="setA">A set where each element is of type <typeparamref name="T"/></param>
/// <param name="setB">A set where each element is of type <typeparamref name="T"/></param>
/// <returns>Returns A \ B (-1), the intersection of A and B (0), and B \ A (1)</returns>
public static IEnumerable<KeyValuePair<T, int>> Membership<T> (IEnumerable<T> sortedSetA, IEnumerable<T> sortedSetB) {
return Membership<T> (sortedSetA, sortedSetB, Comparer<T>.Default);
}
/// <summary>
/// Return a KeyValuePair<int, T> for every element in sortedSetA and sortedSetB where the int
/// specifies membership and T is the element. The int specifies objects that belong to A but
/// not to B (-1), objects which are both in A and in B (0), and objects that belong to B but
/// not A (1).
/// </summary>
/// <remarks>This method is an O(n+m) operation when the two sets have different members, where n
/// is the count of <paramref name="sortedSetA"/> and m is the count of <paramref name="sortedSetB"/>.
/// Otherwise the operation approaches O(n+m-n) where m is the count of the larger set and n is the
/// smaller set.
/// </remarks>
/// <typeparam name="T">Type of element in each set</typeparam>
/// <param name="setA">A set where each element is of type <typeparamref name="T"/></param>
/// <param name="setB">A set where each element is of type <typeparamref name="T"/></param>
/// <param name="comparer"><see cref="IComparer"/> used to sort the sets</param>
/// <returns>Returns A \ B (-1), the intersection of A and B (0), and B \ A (1)</returns>
public static IEnumerable<KeyValuePair<T, int>> Membership<T> (IEnumerable<T> sortedSetA, IEnumerable<T> sortedSetB, IComparer<T> comparer) {
const int onlyA = -1;
const int both = 0;
const int onlyB = 1;

if (sortedSetA == null)
throw new ArgumentNullException ("sortedSetA");
if (sortedSetB == null)
throw new ArgumentNullException ("sortedSetB");
if (comparer == null)
throw new ArgumentNullException ("comparer");

IEnumerator<T> enumeratorA = sortedSetA.GetEnumerator ();
IEnumerator<T> enumeratorB = sortedSetB.GetEnumerator ();

bool nextA = enumeratorA.MoveNext ();
bool nextB = enumeratorB.MoveNext ();

// default value for type T
T a = default (T);
// default value for type T
T b = default (T);

// if both collections have a value
if (nextA & nextB) {
// get current value
a = enumeratorA.Current;
// get current value
b = enumeratorB.Current;

do {
// Compare a to b: is a < b, a == b, or a > b ?
int val = comparer.Compare (a, b);
// a == b
if (val == 0) {
// return both collections have the value a
yield return new KeyValuePair<T, int> (a, both);
// if collection a can move next
if (nextA = enumeratorA.MoveNext ())
// get the next a
a = enumeratorA.Current;
// if collection b can move next
if (nextB = enumeratorB.MoveNext ())
// get the next b
b = enumeratorB.Current;
}
// a < b
else if (val < 0) {
// return only collection a has the value a
yield return new KeyValuePair<T, int> (a, onlyA);
// if collection a can move next
if (nextA = enumeratorA.MoveNext ())
// get the next a
a = enumeratorA.Current;
}
// a > b
else {
// return only collection b has the value b
yield return new KeyValuePair<T, int> (b, onlyB);
// if collection b can move next
if (nextB = enumeratorB.MoveNext ())
// get the next b
b = enumeratorB.Current;
}
// loop while there are values for both collections
} while (nextA & nextB);
}
// if collection a has more values
if (nextA) {
// return only collection a has the value a
yield return new KeyValuePair<T, int> (a, onlyA);
// if collection a can move next
while (enumeratorA.MoveNext ()) {
// return only collection a has the value a
yield return new KeyValuePair<T, int> (enumeratorA.Current, onlyA);
}
}
// if collection b has more values
else if (nextB) {
// return only collection b has the value b
yield return new KeyValuePair<T, int> (b, onlyB);
// if collection b can move next
while (enumeratorB.MoveNext ()) {
// return only collection b has the value b
yield return new KeyValuePair<T, int> (enumeratorB.Current, onlyB);
}
}
}
Wednesday
Sep102008

LINQ to SQL produces incorrect TSQL when using UNION or CONCAT

When a LINQ to SQL query contains a Union or Concat with a second query, and the second query references a column twice, a SqlException will occur.

var a = from address in dc.Addresses
select new {
ID = address.AddressID,
Address1 = address.AddressLine1,
Address2 = address.AddressLine2,
};
var b = from address in dc.Addresses
select new {
ID = address.AddressID,
Address1 = address.AddressLine1,
Address2 = address.AddressLine1, // notice AddressLine1 repeated
};
var q = a.Take(10).Union (b.Take(10));
q.ToArray ();


SqlException: All the queries in a query expression containing a UNION operator must have the same number of expressions in their select lists.

SELECT [t2].[AddressID] AS [ID], [t2].[AddressLine1] AS [Address1], [t2].[AddressLine2] AS [Address2]
FROM (
SELECT TOP (10) [t0].[AddressID], [t0].[AddressLine1], [t0].[AddressLine2]
FROM [Person].[Address] AS [t0]
UNION
SELECT TOP (10) [t1].[AddressID], [t1].[AddressLine1]
FROM [Person].[Address] AS [t1]
) AS [t2]


Notice the third SELECT statement is only selecting two columns instead of the required three.

Please rate and validate this bug at the MSDN Microsoft Product Feedback Center so Microsoft responds with a solution or workaround.
Tuesday
Sep092008

Extending LINQ to SQL

Last year, Scott Guthrie stated “You can actually override the raw SQL that LINQ to SQL uses if you want absolute control over the SQL executed”, but I can’t find documentation describing an extensibility method.

I would like to modify the following LINQ to SQL query:
using (NorthwindContext northwind = new NorthwindContext ()) {
var q = from row in northwind.Customers
let orderCount = row.Orders.Count ()
select new {
row.ContactName,
orderCount
};
}

Which results in the following TSQL:
SELECT [t0].[ContactName], (
SELECT COUNT(*)
FROM [dbo].[Orders] AS [t1]
WHERE [t1].[CustomerID] = [t0].[CustomerID]
) AS [orderCount]
FROM [dbo].[Customers] AS [t0]

To:
using (NorthwindContext northwind = new NorthwindContext ()) {
var q = from row in northwind.Customers.With (
TableHint.NoLock, TableHint.Index (0))
let orderCount = row.Orders.With (
TableHint.HoldLock).Count ()
select new {
row.ContactName,
orderCount
};
}

Which would result in the following TSQL:
SELECT [t0].[ContactName], (
SELECT COUNT(*)
FROM [dbo].[Orders] AS [t1] WITH (HOLDLOCK)
WHERE [t1].[CustomerID] = [t0].[CustomerID]
) AS [orderCount]
FROM [dbo].[Customers] AS [t0] WITH (NOLOCK, INDEX(0))

Using:
public static Table<TEntity> With<TEntity> (
this Table<TEntity> table,
params TableHint[] args) where TEntity : class {

//TODO: implement
return table;
}
public static EntitySet<TEntity> With<TEntity> (
this EntitySet<TEntity> entitySet,
params TableHint[] args) where TEntity : class {

//TODO: implement
return entitySet;
}

And

public class TableHint {
//TODO: implement
public static TableHint NoLock;
public static TableHint HoldLock;
public static TableHint Index (int id) {
return null;
}
public static TableHint Index (string name) {
return null;
}
}


Using some type of LINQ to SQL extensibility, other than this one. Any ideas?

Please comment on this question over at StackOverflow.com