Producer/Consumer con buffer
El modelo producer/consumer es un ejemplo de relación mediante semáforos ó monitores entre dos hilos que intercambian elementos de forma segura y controlada, de forma que cuando el "productor" introduce un elemento avisa a al "consumidor" de que ya puede leer. De esta forma, ambas partes están en idle mientras no hay nada que hacer y se ponen en marcha automáticamente cuando hay "faena" que hacer.
Podemos ver un ejemplo en C# mediante el uso de la clase Monitor en la web de Jon Skeet.
Este ejemplo, es una variación que utiliza un buffer de elementos y añade la posibilidad de indicar que un elemento es el último, de forma que se anule la condición de espera para el/los hilos de lectura cuando no haya elementos, cuando le reader obtenga un null... es que no hay ni habrá más elementos en el buffer:
public class ProducerConsumerBuffered<T>
where T: class
{ private Queue<T> buffer = new Queue<T>();
private readonly Int32 max;
private Boolean last = false;
public ProducerConsumerBuffered(Int32 Max)
{ this.max = Max;
}
/// <summary>
/// Introduce un elemento.
/// </summary>
/// <param name="item">Elemento</param>
/// <param name="last">Indica si es el último elemento</param>
public void Put(T item, Boolean last)
{ lock (buffer)
{ if(!this.last)
this.last = last;
buffer.Enqueue(item);
Monitor.Pulse(buffer);
// Si se ha alcanzado el máximo
// bloqueo hasta que alguien lea
while (buffer.Count == max)
Monitor.Wait(buffer);
}
}
/// <summary>
/// Obtiene un elemento.
/// </summary>
/// <returns>Elemento</returns>
public T Pop()
{ T r = null;
lock (buffer)
{ // Si no hay elementos y no ha
// aparecido el elemento final espero
while(buffer.Count == 0 && !last)
Monitor.Wait(buffer);
if (buffer.Count>0)
r = buffer.Dequeue();
Monitor.Pulse(buffer);
}
return r;
}
}
Para probarlo, podemos usar un simple programa de prueba :
static ProducerConsumerBuffered<Byte[]> bf =
new ProducerConsumerBuffered<Byte[]>(20);
static void Main(string[] args)
{ new Thread(new ThreadStart(write)).Start();
new Thread(new ThreadStart(read)).Start();
Console.WriteLine("ok "); Console.ReadKey(true);
}
static void write()
{ Random r = new Random(5);
for(Int32 i = 0; i<100;i++)
{ bf.Put(Encoding.ASCII.GetBytes(i.ToString()),i==99);
}
Console.WriteLine("Writer Exited");}
static void read()
{ Random r = new Random(5);
Byte[] value = new Byte[0];
while (value!=null)
{ value = bf.Pop();
if(value!=null)
Console.WriteLine(Encoding.ASCII.GetString(value));
Thread.Sleep(r.Next(1, 50));
}
Console.WriteLine("Reader Exited");}
Vemos que el writer termina y el reader aun sigue con los 20 elementos del buffer.
Se aceptan sugerencias :D