Robotics 2

Continuamos con Robotics, como habíamos comentado en el post anterior, con CCR podemos ejecutar tareas de manera concurrente, esta tareas normalmente van a ser el procesamiento de los valores que el robot nos devuelve a través de la clase Port<T>.

Vimos unos ejemplos de cómo trabajar con CCR, ahora continuamos con algún ejemplo más complejo.

void RunFromIterator()
{
    Dispatcher d = new Dispatcher(0, "Test Pool");

    DispatcherQueue taskQ = new DispatcherQueue("Test Queue", d);

    Console.WriteLine("Before Iterator submitted - thread {0}", Thread.CurrentThread.ManagedThreadId);

    Arbiter.Activate(taskQ,
        
        Arbiter.FromIteratorHandler(IteratorExample),
        Arbiter.FromIteratorHandler(IteratorExample),
        Arbiter.FromIteratorHandler(IteratorExample),
        Arbiter.FromIteratorHandler(IteratorExample)
        );

    Console.WriteLine("After Iterator submitted - thread {0}", Thread.CurrentThread.ManagedThreadId);

}

private static IEnumerator<ITask> IteratorExample()
{
    Port<int> p1 = new Port<int>();
    Port<int> p2 = new Port<int>();

    p1.Post(0);

    bool done = false;

    while (!done)
    {
        yield return Arbiter.Receive(false, p1,
            delegate(int i)
            {
                Console.WriteLine("P1 Thread {0}: {1}", Thread.CurrentThread.ManagedThreadId, i);
                p2.Post(i + 1);
            }
        );

        yield return Arbiter.Receive(false, p2,
            delegate(int i)
            {
                Console.WriteLine("P2 Thread {0}: {1}", Thread.CurrentThread.ManagedThreadId, i);
                if (i >= 10000000000)
                    done = true;
                else
                    p1.Post(i + 1);
            }
        );
    }

    yield break;
}

Si nos fijamos en este ejemplo nuestro Arbiter activa cuatro tareas que las genera a través de un método helper que se llama, Activator.FromIteratorHandler que básicamente es un delegado a un método del tipo:

IEnumerable<ITask> GetMethod();

Si nos fijamos en este método lo que devuelve es un IEnumerable<ITask>, ITask era la interface que teníamos que implementar para hacer una tarea que se ejecutase en el Dispatcher de CCR, así que de alguna manera estamos devolviendo una colección de tareas para que el Dispatcher las ejecute. Si nos fijamos en el código que tenemos en el cuerpo de la función, hacemos cosas como esta.

yield return Arbiter.Receive

Al utilizar la palabra reservada yield, estamos indicando que vamos a devolver un ITask, que lo provee Arbier.Receive, a ese enumerador infinito que tenemos. Y digo enumerado infinito porque en principio no sabemos cuando vamos a terminar de enumerar esta colección. Yield se utiliza cuando estamos haciendo la implementación de un IEnumerable y de base tenemos un array o una lista y devolvemos elemento a elemento. En este caso es algo parecido, solo que no tenemos una colección de base sino que tenemos la flexibilidad de decidir cuando ese enumerado termina. ¿Y cuando termina?, pues cuando se devuelve un yield break;.

¿Por qué se usa yield y IEnumerable<ITask>?, pues tiene que ver de nuevo con la concurrencia. Teniendo en mente lo que anteriormente dije sobre como CCR está implementado, el uso de yield hace que de alguna manera cuando el DispatcherQueue recorra esa lista por nostros, nostros en nuestro código seamos capaces de devolver esa tarea para que se ejecute en el Dispatcher y aunque salimos del cuerpo del método, el uso de yield haga que de alguna manera se acuerde de cuales eran los valores que teníamos antes en nuestro método y que todo eso sea gestionado por el Dispatcher.

Es desde luego una solución muy elegante a un problema de enumeración de tareas que no sean bloqueadas por el llamado y el que llama.

Uno después de ver este codigo se da un poco cuenta de la importancia que tiene ese procesamiento de colas dentro de CCR, y como Robotics hace uso de esto. Extendiendo un poco el tema de las colas, CCR introduce el conceto de PortSet. PortSet, que como su nombre indica, es un grupo de puertos, perite asociar varios tipos de datos para hacer puertos. Podemos hacer un PortSet de 20 tipos en .NET Framework y 8 tipos de CF. Por supuesto esta clase se usa con genericidad.

PortSet<string,int,int,double> portSet;

Esto permite aumentar las opciones de las que podemos hacer con una cola. Por ejemplo con Arbiter ahora podemos hacer lo siguiente:

void Choice()
{
    PortSet<bool, int> ps = new PortSet<bool, int>();

    
    Activate(
        Arbiter.Choice<bool, int>(ps,
            delegate(bool b)
            { Console.WriteLine("Choice: " + b.ToString()); },
            delegate(int n)
            { Console.WriteLine("Choice: " + n.ToString()); }
        )
    );

   
    ps.Post(1000000);
    ps.Post(true);
    ps.PostUnknownType(true);
    ps.PostUnknownType("asdad");
}

Si nos fijamos en el ejemplo, tenemos una activación de un PortSet, pero ahora indicamos dos tipos de Handler<T> para cada uno de los tipos que define nuestro PortSet, esto permite discrimiar el consumo de los valores del puerto a través de tu tipo. Al final de ejemplo podemos ver como tenemos para cada tipo una sobrecarga de Post(T value) y un método llamado PostUnknownType(object) que permite que posteemos un mensaje en ese grupo de puertos. En caso de que el tipo no se pueda resolver, en este caso string, se lanzará una excepción.

Tambien podemos hacer Join con los PortSet sentencias lógicas AND, veamos.

void Join()
{
    Port<bool> p1 = new Port<bool>();
    Port<int> p2 = new Port<int>();
    Port<string> p3 = new Port<string>();

    Arbiter.Activate(Environment.TaskQueue,
        Arbiter.JoinedReceive(
            false, p1, p2,
            delegate(bool b, int i)
            {
                Console.WriteLine("Join 1: {0} {1}", b, i);
                p2.Post(i + 1);
            }
        )
    );

     Arbiter.Activate(Environment.TaskQueue,
        Arbiter.JoinedReceive(
            false, p2, p3,
            delegate(int i, string s)
            {
                Console.WriteLine("Join 2: {0} {1}", i, s);
                p2.Post(i - 1);
            }
        )
    );

    p1.Post(true);
    p3.Post("hello");
    p2.Post(99);
}

En el que hasta que no se recivan dos valores en el puerto del tipo especificado no se van a llamar al método que los procese.

Expandiendo este conecpto podemos en vez de esperar un valor único que procesar, esperar una lista de valores de tamaño fijo (un array).

Port<int> p = new Port<int>();

Arbiter.Activate(Environment.TaskQueue,
   Arbiter.MultipleItemReceive(
       true, p, 6,
       delegate(int[] array)
       {
           string s = "";
           for (int i = 0; i < array.Length; i++)
               s += array[i].ToString() + " ";
           Console.WriteLine("{0} Items: {1}", array.Length, s);
       }
   )
);

En este ejemplo hasta que no se reciben 6 elementos no se procesan, y por supuesto se procesan todos a la vez.

También podemos tener multiples procesadores, para varios tipos de datos.

PortSet<int, double> pSet = new PortSet<int, double>();

Activate(
    Arbiter.MultipleItemReceive(
        pSet, 10,
        delegate(ICollection<int> colInts, ICollection<double> colDoubles)
        {
            Console.Write("Ints: ");
            foreach (int i in colInts)
                Console.Write(i + " ");
            Console.WriteLine();
            Console.Write("Doubles: ");
            foreach (double d in colDoubles)
                Console.Write("{0:F2} ", d);
            Console.WriteLine();
        }
    )
);

En el que hasta que no se reciban 10 valores, como mínimo, en los dos puertos no se ejecutará el método que los procesará.

Y ya para terminar esta breve pero intensa introducción a CCR, hablaremos de las casualidades. Las casualidades no son más que una manera que tiene el CCR de llamar a las excecpiones que se produzcan en nuestro código. Hay que tener en cuenta que en cualquier momento se puede lanzar una excepción y que tenemos que ser capaces de procesarla y que no haga que el codigo se rompa.

private void Causality()
{
    using (Dispatcher d = new Dispatcher())
    {
        using (DispatcherQueue taskQ = new DispatcherQueue("Causality Queue", d))
        {
            Port<Exception> ep = new Port<Exception>();
            Port<int> p = new Port<int>();

            Dispatcher.AddCausality(new Causality("Test", ep));

            Console.WriteLine("Main thread: {0}", Thread.CurrentThread.ManagedThreadId);

            Arbiter.Activate(taskQ, Arbiter.Receive(false, p, CausalityExample));

            p.Post(2);
            Wait(500);

            Exception e;
            while (ep.Test(out e))
                Console.WriteLine("Exception: " + e.Message);
        }
    }
}

Básicamente las casualidades son un Port<T> de excepciones en los que se van posteando automáticamente todas las excepciones que se producen en el código para después procesarlos.

Este último ejemplo también muestra un método Test que nos permite comprobar si hay un elemento en la cola.

Espero que os haya gustado esta introducción a CCR, es los dos próximos post explicare como esto es usado desde DSS que es el corazón de Robotics Studio.

Luis.

2 comentarios en “Robotics 2”

  1. Hola.

    Sigo tu blog y te quería hacer una pregunta no relacionada con su contenido.

    ¿cómo lo haces para poner las cajas en donde introduces el código?

    Es que en mi blog de .NET no consigo poner el código en condiciones y me gusta tu sistema.

    ¿Me puedes echar una mano?

  2. Estimado Bendem, utilizo Windows Live Writer para escribir los post, y hay un plug-in para live writer que permite formatear codigo de esta manera. Que sepas que casi todos los proveedores de blogs soportan live writer. El plug-in se llama, Insert Code Snippet.

    Saludos. Luis.

Deja un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *