Recently Microsoft released a new App Fabric SDK 2.0 CTP including some great new features,
You can grab all the bits and pieces from here and/or read the release notes.
Some of the highlights include:
- Publish/Subscribe which are called Topics
- Message Queues
- Visual Studio Tools
- AppFabric Application Manager
- Support for running WCF & WF
The part that interested me the most is the Queues feature and that is what I’m going to be exploring in this post.
Overview of Queues
Message Queues are not a new concept allow for more reliable and scalable communication between distributed systems than pure request/response. Solutions like MSMQ, NServiceBus already exist to solve this problem for locally connected systems. What the Queues API provides is that it provides similar features but the messages are being transported across the internet and persisted in the cloud.
There is currently a Message Buffer available in Azure but this has serious limitations:
- Messages only persisted for a maximum 10 minutes
- Maximum of 50 messages
- Requires the client to be connected
- Charged per connection not per message
- Complex API
- Max message size of 8 KB
The Queues API addresses a number of these issues by adding.
- Long term persistence (No clarification on the SLA yet)
- Simpler API
- Max message size of 256 KB
- Sessions
- Larger size of queue (100 MB currently but set to increase)
To start testing out the Queues feature you need to first Login to the AppFabricLabs portal and create a test Service Namespace.
Then you will need to add Microsoft.ServiceBus and Microsoft.ServiceBus.Messaging dlls to your project. They can be found in C:\Program Files\Windows Azure AppFabric SDK\V2.0\Assemblies\NET4.0
Wrapping It Up
Before starting on the implementation I’m going to define a simple interface around the ServiceBus bits and also add an IMessage interface which all Messages will need to implement.
IServiceBus
public interface IServiceBus { void Send(string queueName, IMessage message); T Receive<T>(string queueName) where T : IMessage; }
IMessage
Although not required by the API it’s good practice to have a unique identifier for each message.
When architecting message based systems you need to allow for idempotence meaning that an operation should be able to be executed multiple times without changing the result. Most message queue systems guarantee that the messages will be delivered “at least once” so you need to allow for this in your code and having an unique identifier on the message makes implementing this a trivial exercise.
public interface IMessage { Guid Id { get; set; } }
Note: All messages must be Serializable
Authentication
In order to authenticate against the service bus you need three values:
- Issuer Name
- Issuer Key
- Service Namespace
The service namespace is just the name which you created in the portal, in this example it’s “appfabricdemo1”.
The issuer name & key can be found under the Default Key section in the Portal.
There is a little bit of ceremony in setting up the correct objects but I’ve put this into a single method called InitClient which is called from the Send and Receive methods.
private readonly string issuerKey; private readonly string issuerName; private readonly string serviceNamespace; private TransportClientCredentialBase clientCredentials; private MessagingFactory messagingFactory; private ServiceBusNamespaceClient namespaceClient; private IList<Queue> queueList; public AzureServiceBus(string issuerName, string issuerKey, string serviceNamespace) { this.issuerName = issuerName; this.issuerKey = issuerKey; this.serviceNamespace = serviceNamespace; }
private void InitClient() { clientCredentials = TransportClientCredentialBase .CreateSharedSecretCredential(issuerName, issuerKey); var uri = ServiceBusEnvironment .CreateServiceUri("https", serviceNamespace, string.Empty); var runtimeUri = ServiceBusEnvironment .CreateServiceUri("sb", serviceNamespace, string.Empty); namespaceClient = new ServiceBusNamespaceClient(uri, clientCredentials); messagingFactory = MessagingFactory.Create(runtimeUri, clientCredentials); }
The key classes here are:
Creating the Queue
Before we can start sending and receiving messages you first have to create a queue. The API doesn’t currently have a method to check if a queue exists already so it has to be hand rolled as calling CreateQueue throws an exception if it already exists.
This is fairly easily achieved with two simple methods
private Queue GetOrCreateQueue(string path) { path = path.ToLower(); var queues = GetQueues(); var queueExists = queues.Any(q => q.Path == path); if (queueExists) { return queues.FirstOrDefault(q => q.Path == path); } var queue = namespaceClient.CreateQueue(path); queues.Add(queue); return queue; } private IList<Queue> GetQueues() { if (queueList != null) { return queueList; } queueList = new List<Queue>(); var queues = namespaceClient.GetQueues(); foreach (var queue in queues) { queueList.Add(queue); } return queueList; }
As the GetQueues method needs to make a remote call I keep an in-memory collection of the Queue objects so that it only calls the first time.
Note: Queue names are converted to lowercase when created.
Sending a Message
In order to send a message you need to create a QueueClient, MessageSender and then convert the message to a BrokeredMessage.
public void Send(string queueName, IMessage message) { InitClient(); var queue = GetOrCreateQueue(queueName); var queueClient = messagingFactory.CreateQueueClient(queue); using (var messageSender = queueClient.CreateSender()) { var brokeredMessage = ConvertToBrokeredMessage(message); messageSender.Send(brokeredMessage); } queueClient.Close(); messagingFactory.Close(); } private static BrokeredMessage ConvertToBrokeredMessage(IMessage message) { var brokeredMessage = BrokeredMessage.CreateMessage(message); brokeredMessage.MessageId = message.Id.ToString(); return brokeredMessage; }
The BrokeredMessage has a bunch of useful properties, most notable of which are:
- ContentType
- CorrelationId
- DeliveryCount
- Properties – which is a Dictionary<string, object>
- MessageId
Receiving a Message
Receiving a message is much the same as sending a message in that it requires a QueueClient & MessageReceiver.
There are two different modes when receiving a message.
ReceiveAndDelete which deletes the message immediately after reading or PeekLock which only locks the message and leaves you to manage the delete.
public T Receive<T>(string queueName) where T : IMessage { InitClient(); var queue = GetOrCreateQueue(queueName); var queueClient = messagingFactory.CreateQueueClient(queue); queueClient.CreateReceiver(); BrokeredMessage brokeredMessage; using (var messageReceiver = queueClient.CreateReceiver(ReceiveMode.ReceiveAndDelete)) { brokeredMessage = messageReceiver.Receive(); } if (brokeredMessage == null) { return default(T); } queueClient.Close(); messagingFactory.Close(); var message = brokeredMessage.GetBody<T>(); return message; }
The Demo App
I wanted to pull this together and so created an example application which is two console apps one being the client and one being the server. The server sends a message to the client on one queue and when the client receives the message it sends a reply message on another queue.
Server
internal class Program { private static void Main(string[] args) { Console.WriteLine("Welcome to the Queue Demo Server"); Console.WriteLine("Press any key to start:"); Console.ReadLine(); var serverQueue = "serverqueue"; var clientQueue = "clientqueue"; var serviceBus = new AzureServiceBus(ConfigurationManager.AppSettings["IssuerName"], ConfigurationManager.AppSettings["IssuerKey"], ConfigurationManager.AppSettings["ServiceNamespace"]); var message = new TestMessage("I've travelled along way just to get here."); serviceBus.Send(serverQueue, message); Console.WriteLine("Sent Message:" + message.Id + " at " + DateTime.Now); while (true) { message = serviceBus.Receive<TestMessage>(clientQueue); if (message == null || message.Id == Guid.Empty) { Console.WriteLine("No messages found yet I'll keep trying"); } else { Console.WriteLine("Read Message: " + message.Id + " at " + DateTime.Now); Console.WriteLine(message.Message); break; } } Console.ReadLine(); } }
Client
internal class Program { private static void Main(string[] args) { Console.WriteLine("Welcome to the Queue Demo Client"); Console.WriteLine("Press any key to start:"); Console.ReadLine(); var serverQueue = "serverQueue"; var clientQueue = "clientQueue"; var serviceBus = new AzureServiceBus(ConfigurationManager.AppSettings["IssuerName"], ConfigurationManager.AppSettings["IssuerKey"], ConfigurationManager.AppSettings["ServiceNamespace"]); while (true) { var message = serviceBus.Receive<TestMessage>(serverQueue); if (message == null || message.Id == Guid.Empty) { Console.WriteLine("No messages found yet I'll keep trying"); } else { Console.WriteLine("Read Message: " + message.Id + " at " + DateTime.Now); Console.WriteLine(message.Message); break; } } var responseMessage = new TestMessage("And so have I"); serviceBus.Send(clientQueue, responseMessage); Console.WriteLine("Sent Message:" + responseMessage.Id + " at " + DateTime.Now); Console.ReadLine(); } }
Fiddler
If you’re curious you can see what’s going on under the covers using Fiddler.
API Change Requests
Having played around with the API it’s pretty good and much better than the MessageBuffer API which is really awkward.
I’d like to see on the ServiceBusNamespaceClient a method that tells you if a Queue exists or not, something like:
bool QueueExists(string path)
Currently MessagingFactory, ServiceBusNamespaceClient, QueueClient do not implement interfaces which makes it much harder to Unit Test. Creating an abstracting around these classes would be a big win IMO.
Conclusion
All in all the Queue API is a much needed and welcomed addition to the Azure offerings and greatly simplifies messaging communication between not always connected clients and the server. It also appears to be very fast as well.
In my next post I’m going be looking at the Publish/Subscribe (Topics) features.