Producer/Consumer con buffer

El modelo producer/consumer es un ejemplo de relación mediante semáforos ó monitores entre dos hilos que intercambian elementos de forma segura y controlada, de forma que cuando el “productor” introduce un elemento avisa a al “consumidor” de que ya puede leer. De esta forma, ambas partes están en idle mientras no hay nada que hacer y se ponen en marcha automáticamente cuando hay “faena” que hacer.

Podemos ver un ejemplo en C# mediante el uso de la clase Monitor en la web de Jon Skeet.

Este ejemplo, es una variación que utiliza un buffer de elementos y añade la posibilidad de indicar que un elemento es el último, de forma que se anule la condición de espera para el/los hilos de lectura cuando no haya elementos, cuando le reader obtenga un null… es que no hay ni habrá más elementos en el buffer:

 

public class ProducerConsumerBuffered<T> 
    where T: class
{
    private Queue<T> buffer = new Queue<T>();
    private readonly Int32 max;
    private Boolean last = false;
 
    public ProducerConsumerBuffered(Int32 Max)
    {
        this.max = Max;
    }
 
    /// <summary>
    /// Introduce un elemento.
    /// </summary>
    /// <param name="item">Elemento</param>
    /// <param name="last">Indica si es el último elemento</param>
    public void Put(T item, Boolean last)
    {
        lock (buffer)
        {
            if(!this.last)
                this.last = last;
 
            buffer.Enqueue(item);
 
            Monitor.Pulse(buffer);
 
            // Si se ha alcanzado el máximo
            // bloqueo hasta que alguien lea
            while (buffer.Count == max)
                Monitor.Wait(buffer);
        }
    }
 
    /// <summary>
    /// Obtiene un elemento.
    /// </summary>
    /// <returns>Elemento</returns>
    public T Pop()
    {
        T r = null;
        lock (buffer)
        {
            // Si no hay elementos y no ha
            // aparecido el elemento final espero
            while(buffer.Count == 0 && !last)
                Monitor.Wait(buffer);
 
            if (buffer.Count>0) 
                r = buffer.Dequeue();
 
            Monitor.Pulse(buffer);
        }
        return r;
    }
}

Para probarlo, podemos usar un simple programa de prueba :

static ProducerConsumerBuffered<Byte[]> bf = 
    new ProducerConsumerBuffered<Byte[]>(20);
static void Main(string[] args)
{
    new Thread(new ThreadStart(write)).Start();
    new Thread(new ThreadStart(read)).Start();
 
    Console.WriteLine("ok ");
    Console.ReadKey(true);
}
 
static void write()
{
    Random r = new Random(5);
    for(Int32 i = 0; i<100;i++)
    {
        bf.Put(Encoding.ASCII.GetBytes(i.ToString()),i==99);
    }
    Console.WriteLine("Writer Exited");
}
 
static void read()
{
    Random r = new Random(5);
    Byte[] value = new Byte[0];
    while (value!=null)
    {
        value = bf.Pop();
        if(value!=null) 
            Console.WriteLine(Encoding.ASCII.GetString(value));
        Thread.Sleep(r.Next(1, 50));
    }
    Console.WriteLine("Reader Exited");
}

 Vemos que el writer termina y el reader aun sigue con los 20 elementos del buffer.

Se aceptan sugerencias 😀

Producer/Consumer con buffer| vtortola.NET

7 comentarios en “Producer/Consumer con buffer”

  1. Bonito ejemplo para los que amamos la programación concurrente 🙂

    Sólo una puntualización, un monitor no es exactamente un semáforo, jeje. La solución al problema mediante el uso de semáforos implicaría hacer uso de la estrategia de “espera ocupada”, lo cual restaría eficiencia al algoritmo.

    En cuanto a sugerencias, dices que las aceptas :-P, ¿qué tal una v2.0 en la que el consumer empleara LINQ para leer del buffer?

    Un saludo!!

  2. Producer/Consumer es un modelo implementado con semáforos … generalmente, no hablaba específicamente de .NET… si no de semáforos como mecanismo genérico de excluxión mutua 😀 Cada plataforma tiene sus métodos de realizar estas operaciones y afortunadamente en .NET tenemos una forma “administrada” de hacerlo con Monitor.

    LINQ? Humm… ¿cual seria la ventaja de leer el buffer con LINQ? A fin de cuentas, es un almacen intermedio que no dispone de todos los datos ni puede soliciarlos a la fuente. Cuando realices una consulta sobre el buffer, obtendrás un resultado único por ese instante de tiempo, ya que instantes antes ó después puedes recibir más ó menos elementos con la misma consulta. No veo esto claro y es que todavía estoy un poco verde con el tema de LINQ pero estaré encantado de hacer la v2.0 si me contais el propósito 😀

    Un saludo.

  3. Disculpas de antemano por el sermón y posible flame, pero creo necesario aclarar conceptos para no confundir a los lectores (no a los del problema, consumers, sino a los del post, jeje)…

    En este caso tenemos la suerte de poder implementarlo usando monitores (la forma más sencilla y eficiente, gracias a la encapsulación del funcionamiento que la clase Monitor nos proporciona)… Pero esto es así porque las clases de .Net molan mucho, jejeje, en general, Producer/Consumer es un esquema que representa un problema típico en la Programación Concurrente; un problema en el que diferentes tipos de procesos “pelean” por el acceso a un recurso, y en el que como variable adicional que determina el comportamiento a seguir contamos con que la variable de estado del recurso no es bi-estado (libre/ocupado) sino que es tri-estado (libre/ocupado/vacío).

    Como la gran mayoría de problemas en el campo de la programación concurrente (salvo ciertas excepciones), el problema en sí se presta para la resolución mediante diferentes estrategias, a citar las principales: semáforos, regiones críticas condicionales, monitores o paso de mensajes… Cada una de ellas con sus ventajas e inconvenientes tanto a nivel de complejidad computacional como a nivel de complejidad de implementación. 🙂

    En este caso el uso de la clase Monitor es, como su propio nombre indica, una solución implementada con monitores; no con semáforos, jejeje.

    Las diferencias entre un monitor y un semáforo están claras:

    – En el caso de los semáforos, el estado del recurso se controla mediante el uso de variables enteras globales asociadas a cada recurso y métodos P() y V() encargados de incrementar/decrementar dicha variable y, condicionalmente respecto a dicho valor, acceder al recurso o mantenerse en una espera ocupada. Es un método bastante engorroso (por el uso de variables globales y métodos P() y V() (llamados así en la “literatura” sobre el tema, en .Net son respectivamente WaitOne() y Release()) dispersos por porciones de código nuestro, son costosas de depurar y testear en caso de existir fallos) y a la vez, dicha espera ocupada provoca ineficiencia.

    – Por contra, el uso de monitores se basa en un planteamiento diferente: El control de la exclusión mutua en un monitor se basa en la existencia de una cola asociada al monitor. La gestión de esta cola sigue el siguiente algoritmo:
    1.- Cuando un proceso activo ejecuta uno de los procedimientos del monitor, y otro proceso activo intenta ejecutar otro proceso del mismo monitor (o el mismo proceso, obviamente); el código de acceso al monitor (generado por el compilador) bloquea el proceso que realiza la llamada y lo inserta en la cola del monitor, siguiendo una política FIFO. De esta forma se impide que dos procesos activos estén al mismo tiempo dentro del monitor.

    2.- Cuando un proceso activo abandona el monitor (finaliza la ejecución del procedimiento del monitor), el propio monitor selecciona el elemento que se encuentra al frente de la cola asociada y lo desbloquea, con lo que se iniciará la ejecución del procedimiento del monitor cuya llamada provocó el bloqueo.

    Resumiendo, las diferencias entre un monitor y un semáforo son:

    Control de acceso:
    S: Uso de variables globales independientes del recurso que se intenta controlar. Espera activa (bucle de consulta a dicha variable)
    M: Uso de colas asociadas a cada recurso para controlar el acceso. Espera bloqueante (el proceso se bloquea y es desbloqueado por el monitor cuando éste finaliza su estado ocupado y el proceso está al frente de la cola)

    Responsabilidad del bloqueo:
    S: Recae en cada proceso invocador (mediante el bucle de espera ocupada u otros mecanismos más eficientes).
    M: Recae en el monitor asociado al recurso (cola asociada a cada monitor y operaciones Wait() y Pulse()).

    Por lo demás, y pasando a hablar de esta solución concreta, digamos que de las 5-6 variantes del problema en cuestión se adopta la política de: prioridad a los escritores (producers).

    Respecto al uso de LINQ, las posibilidades que nos ofrece son TAN REVOLUCIONARIAS, como que el mero uso de LINQ (personalizado desde cero por nosotros, o mediante el uso de proyectos ya existentes como PLINQ) nos permitiría gestionar el propio acceso a los recursos (buffer) desde LINQ, sin necesidad del uso de la clase Monitor, ni Semaphore…

    Desde el proceso de este comentario invoco al método Miguel.Lock(), encolando de esta forma la tarea de escribir el código usando LINQ (aunque por el momento no accede al recurso monitorizado, aka yo) 😛

    Un saludo!

  4. Humm.. humm.. aunque como decía me referia con “semaforo” a algo genérico, conceptualmente si que tenia la idea de que un semáforo y un monitor eran más ó menos lo mismo, a si que me ha venido de perlas tu explicación, gracias 🙂

    Asi que voy a editar lo de “relación mediante semáforos” por “relación mediante semáforos ó monitores”, supongo que asi queda más claro.

    Prometo que cuando me termine de leer el libro de Octavio intentaré algo en esa línea porque ahora me suena un poco a chino xD

    Un saludo.

  5. No hay de qué, crack. El libro de Octavio te ayudará bastante, concretamente el apartado de LINQ to Pipes (págs. 195-198) habla del acceso a dichos canales, y con ciertos refinamientos también basados en LINQ se podría controlar la prioridad de acceso.

    Dicha prioridad podemos manejarla en LINQ de forma nativa, al fin y al cabo, como hemos dicho un Monitor implementa internamente una cola (FIFO)… Y una cola FIFO es IEnumerable, verdad? 🙂

    La clave estará en manejar la concurrencia en el acceso a nuestras estructuras “monitoras”. En este punto habrá que tener algo de cuidado, podríamos caer en el antipatrón de “trasladar el problema de la concurrencia” del buffer a nuestra cola (o cualquier otra estructura de control que decidamos implementar).

    Happy Coding! 🙂

Deja un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *