Bueno, con esta cerraremos la pequeña serie sobre colas en Service Bus, después de las dos entradas anterior ( I y II ). En esta ocasión tocaremos otros dos temas que pueden ser interesantes, la correlación de mensajes y el formato de los mismos, intentando poner algún ejemplo al igual que lo hemos ido haciendo anteriormente.
Session Id
La detección de duplicados no es la única de las características interesantes que tenemos dentro de las colas de Service Bus, puesto que como verá ahora, la posibilidad de otorgar un valor de correlación a cada mensaje es también una de esas cosas que solemos necesitar cuando nos enfrentamos a problemas reales. En ciertas situaciones, en las que tenemos diversos receptores para los mensajes de una cola ,es necesario que uno solo de estos clientes traten una serie de mensajes correlacionados, por ejemplo chunks de un mensaje más grande como se describe en el patrón Splitter. Para ayudarnos en esta situación el API nos proporciona una propiedad para todos los BrokeredMessage que se llama SessionId, gracias a la cual podremos establecer la correlación de una serie de mensajes. Por supuesto, como siempre, la cola tiene que soportar esta característica, pero esto es tan sencillo como se puede ver en las siguientes lineas de setup de una cola de Windows Azure Service Bus, fíjese en la propiedad RequiresSession.
1 2 3 4 5 6 7 8 9 10 |
<span class="rem">//set up queue</span> var queueDescription = <span class="kwrd">new</span> QueueDescription(<span class="str">"samples"</span>); queueDescription.RequiresSession = <span class="kwrd">true</span>; <span class="rem">//create the queue using the namespace manager</span> <span class="rem">//the connection string to service bus is </span> <span class="rem">//recovered from appsetttings "Microsoft.ServiceBusConnectionString" key</span> var namespaceManager = NamespaceManager.Create(); namespaceManager.CreateQueue(queueDescription); |
Con el fin de ver un ejemplo de esto crearemos una sencilla aplicación con el siguiente método de procesamiento de mensajes, por supuesto, simplemente con fines explicativos:
1 2 3 4 5 6 7 8 9 |
var messagingFactory = MessagingFactory.Create(); var queueClient = messagingFactory.CreateQueueClient(<span class="str">"samples"</span>); var messageSession = queueClient.AcceptMessageSession(); <span class="kwrd">while</span> (<span class="kwrd">true</span>) { var msg = messageSession.Receive(); Console.WriteLine(<span class="str">"A message {0} is received in thread {1}"</span>,msg.GetBody<<span class="kwrd">string</span>>(),Thread.CurrentThread.ManagedThreadId); } |
Ahora, en nuestra pequeña app levantaremos dos hilos utilizando este método de lectura tal que así:
1 2 3 4 5 |
Setup(); <span class="rem">//Create two receivers</span> <span class="kwrd">new</span> TaskFactory().StartNew(() => Receive()); <span class="kwrd">new</span> TaskFactory().StartNew(() => Receive()); |
Una vez creados los lectores solamente tenemos que poner una serie de mensajes en la cola, para ver nuestro propósito, pondremos a todos los mensajes el mismo identificador de sesión.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
var messagingFactory = MessagingFactory.Create(); var queue = messagingFactory.CreateQueueClient(<span class="str">"samples"</span>); <span class="rem">//create the session identifier</span> var sessionId = Guid.NewGuid(); <span class="kwrd">for</span> (<span class="kwrd">int</span> i = 0; i < 10; i++) { var relatedMsg = <span class="kwrd">new</span> BrokeredMessage(<span class="kwrd">string</span>.Format(<span class="str">"the message # {0} in session {1}"</span>,i,sessionId)); relatedMsg.SessionId = sessionId.ToString(); queue.Send(relatedMsg); } |
Al tener en todos los mensajes el mismo identificador de sesión observaremos que solamente uno de los clientes recibe todos los mensajes ( observar el ManagedThreadId ). El tip para esto está en la llamada a AcceptMessageSession de nuestro QueueClient, revise el código de recepción de mensaje, gracias al cual podemos agrupar todos los mensajes con un mismo identificador y procesarlos con una transacción simple.
Formateadores
Al igual que con MSMQ, aunque de forma diferentes, las colas de Service Bus también nos permiten establecer cuales son los formateadores que queremos poner a los mensajes, de tal forma que le contenido de los mismos pueda ser seriado de forma diferente. Con el fin de poner un ejemplo sencillo utilizaremos protobuff para serializar nuestros mensajes tratando de reducir el tamaño de los mismos, en .NET este serializador lo podemos utilizar por ejemplo, por medio del paquete de NuGet –http://nuget.org/packages/protobuf-net.
Cambiar el mecanismo de serialización es tan sencillo como indicar el mismo dentro de una de las sobrecargas que BrokeredMessage pone a nuestra disposición, en las siguientes lineas se ve como usar el nuevo serializador comentado anteriormente:
1 2 3 4 5 6 7 8 |
var messagingFactory = MessagingFactory.Create(); var queue = messagingFactory.CreateQueueClient(<span class="str">"samples"</span>); var message = <span class="kwrd">new</span> Message(Guid.NewGuid(), <span class="str">"the content"</span>); var serializer = XmlProtoSerializer.TryCreate(TypeModel.Create(), <span class="kwrd">typeof</span>(Message)); var brokeredMessage = <span class="kwrd">new</span> BrokeredMessage(message, serializer); queue.Send(brokeredMessage); |
Despues de unos pocos mensajes ya podemos ver como el mismo ha surtido efecto y el cambio de tamaño
es apreciable desde el nuevo portal de Windows Azure, como muestra la imagen de la derecha, pasando de 8.26 KB a 4.85 KB el tamaño de cada uno de los mensajes..
Por supuesto, en la recepción y posterior tratamiento del cuerpo de un mensaje en la cola también tendremos que indicar el serializador a usar como se ven el las siguientes lineas:
1 |
var body = msg.GetBody<Message>(serializer); |
Bueno, hasta aquí esta mini-serie acerca de Queues, lo siguiente Topics… espero que no sea dentro de mucho, ahora que hemos pillado un poco de ritmo….
Saludos
Unai
En las entradas anteriores ( I , II y III ) vimos una pequeña introducción a las colas de Service Bus