Applying LINQ to new data types

[This article was originally published in MSDN C# Developer Center, February 2008]

Given the enormous expressive power that LINQ (Language Integrated Query) puts in the hands of developers, it is not surprising that we have started to search for ways to “LINQ-enable” more and more data types and technologies. As Dinesh Kulkarni humorously put it in this post, soon we will have even “LINQ to Coffee Machine” :-). .NET Framework 3.5 and C# 3.0 offer us several ways to achieve that goal.

In many occasions, all that is needed in order to be able to apply integrated queries to objects of a certain type (of our own, or belonging to the .NET Framework class library) is to define for the type one or more entry points which could serve as the generators to which all the basic object query machinery incorporated into .NET Framework (LINQ to Objects) can be applied. This has been, for instance, the approach used in LINQ to XML, where a set of iterators (implemented as extension methods) have been defined that produce subsets of the nodes that compose an XML document as IEnumerable<T> sequences; once you call any one of these methods, you obtain an XML node generator to which the predefined implementations of the standard query operators Where(), OrderBy(), etc. can be applied. LINQ to XML also introduces several specialized query operators (“extensions”), but that’s another story.

As an example of simple “LINQ activation” of classes, here we present the implementation of an extension that will allow us to use integrated query syntax over data received through a named pipe. Pipes are a well-known inter-process communication mechanism that has been in use since very long in the DOS/Windows world; but only with .NET Framework 3.5 we will be able to use them in our managed code applications without having to recur to platform invocation. Pipes can be anonymous, conceived to be used to communicate a parent process and a child process residing on the same machine; or named, which offer much more power and can be used over a network. Specifically, named pipes offer message-oriented communications.

The following program implements a server which sends UTF8-encoded string messages to its listener through a pipe named CS3:

using System;
using System.Text;
using System.IO;
using System.IO.Pipes;

namespace Demos
{
    class Server
    {
        static void Main(string[] args)
        {
            const string PipeName = “CS3”;

            using (NamedPipeServerStream pipeStream =
                new NamedPipeServerStream(PipeName, PipeDirection.InOut,
                    1, PipeTransmissionMode.Message, PipeOptions.None))
            {
                pipeStream.WaitForConnection();

                // sending messages
                UTF8Encoding encoding = new UTF8Encoding();
                for (int i = 1; i <= 1000; i++)
                {
                    string msg = i.ToString();
                    byte[] bytes = encoding.GetBytes(msg);
                    pipeStream.Write(bytes, 0, bytes.Length);
                }
            }
        }
    }
}

The client, on the other end, composes received messages: 

namespace Demos
{
    class Client
    {
        static void Main(string[] args)
        {
            const string Server = “.”;
            const string PipeName = “CS3”;

            using (NamedPipeClientStream pipeStream =
                    new NamedPipeClientStream(Server, PipeName, PipeDirection.InOut))
            {
                pipeStream.Connect();
                pipeStream.ReadMode = PipeTransmissionMode.Message;

                Decoder decoder = Encoding.UTF8.GetDecoder();

                const int BufferSize = 256;
                byte[] bytes = new byte[BufferSize];
                char[] chars = new char[BufferSize];
                int numBytes = 0;
                StringBuilder msg = new StringBuilder();
                do
                {
                    msg.Length = 0;
                    do
                    {
                        numBytes = pipeStream.Read(bytes, 0, BufferSize);
                        if (numBytes > 0)
                        {
                            int numChars = decoder.GetCharCount(bytes, 0, numBytes);
                            decoder.GetChars(bytes, 0, numBytes, chars, 0, false);
                            msg.Append(chars, 0, numChars);
                        }
                    } while (numBytes > 0 && !pipeStream.IsMessageComplete);
                    decoder.Reset();
                    if (numBytes > 0)
                    {
                        // we’ve got a message – process it
                        Console.WriteLine(msg.ToString());
                    }
                } while (numBytes != 0);
            }
            Console.ReadLine();
        }
    }
}

What if the client process needed to filter, sort, group, etc. the received strings? Traditional coding approaches to implement these tasks could be applied, of course, but a more far-sighted approach would be to define a “LINQ entry point” in the form of an extension method for the NamedPipeClientStream class, so that it can be used as source for language integrated queries: 

namespace PlainConcepts.Linq
{
    public static partial class Extensions
    {
        public static IEnumerable<string> GetMessages(
            this NamedPipeClientStream pipeStream)
        {
            pipeStream.Connect();
            pipeStream.ReadMode = PipeTransmissionMode.Message;
 
            Decoder decoder = Encoding.UTF8.GetDecoder();
 
            const int BufferSize = 256;
            byte[] bytes = new byte[BufferSize];
            char[] chars = new char[BufferSize];
            int numBytes = 0;
            StringBuilder msg = new StringBuilder();
            do
            {
                msg.Length = 0;
                do
                {
                    numBytes = pipeStream.Read(bytes, 0, BufferSize);
                    if (numBytes > 0)
                    {
                        int numChars = decoder.GetCharCount(bytes, 0, numBytes);
                        decoder.GetChars(bytes, 0, numBytes, chars, 0, false);
                        msg.Append(chars, 0, numChars);
                    }
                } while (numBytes > 0 && !pipeStream.IsMessageComplete);
                decoder.Reset();
                if (numBytes > 0)
                {
                    // we’ve got a message – yield it!
                    yield return msg.ToString();
                }
            } while (numBytes != 0);
        }
    }
}

Note that thanks to the use of an iterator, queries built around this extension method will take advantage of lazy evaluation, so that messages will be retrieved on demand, as they are needed by the client application.

With such an extension method at hand, we will be able to process messages received from a named pipe stream this way:

namespace Demos
{
    using PlainConcepts.Linq;
 
    class Client
    {
        static void Main(string[] args)
        {
            const string Server = “.”;
            const string PipeName = “CS3”;

            using (NamedPipeClientStream pipeStream =
                    new NamedPipeClientStream(Server, PipeName, PipeDirection.InOut))
            {
                var input = from s in pipeStream.GetMessages()
                            where string.Compare(s, “3”) < 0
                            orderby s
                            select s;
                foreach (var s in input)
                    Console.WriteLine(s);
            }
            Console.ReadLine();
        }
    }
}

Processing messages this way can save us a lot of coding, making at the same time our code much more elegant and readable.

Download source code (VS 2010)

Octavio Hernandez

Desarrollador y consultor en tecnologías .NET. Microsoft C# MVP entre 2004 y 2010.

Deja un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *