Skip to main content

Windows Azure AppFabric Service Bus Queues API

 

Recently Microsoft released a new App Fabric SDK 2.0 CTP including some great new features,

You can grab all the bits and pieces from here and/or read the release notes.

Some of the highlights include: 

  • Publish/Subscribe which are called Topics
  • Message Queues
  • Visual Studio Tools
  • AppFabric Application Manager
  • Support for running WCF & WF

The part that interested me the most is the Queues feature and that is what I’m going to be exploring in this post.

Overview of Queues

Message Queues are not a new concept allow for more reliable and scalable communication between distributed systems than pure request/response. Solutions like MSMQ, NServiceBus already exist to solve this problem for locally connected systems.  What the Queues API provides is that it provides similar features but the messages are being transported across the internet and persisted in the cloud.  

There is currently a Message Buffer available in Azure but this has serious limitations:

  • Messages only persisted for a maximum 10 minutes
  • Maximum of 50 messages
  • Requires the client to be connected
  • Charged per connection not per message
  • Complex API
  • Max message size of 8 KB

The Queues API addresses a number of these issues by adding.

  • Long term persistence (No clarification on the SLA yet)
  • Simpler API
  • Max message size of 256 KB
  • Sessions
  • Larger size of queue (100 MB currently but set to increase)

To start testing out the Queues feature you need to first Login to the AppFabricLabs portal and create a test Service Namespace.

Then you will need to add Microsoft.ServiceBus and Microsoft.ServiceBus.Messaging dlls to your project. They can be found in C:\Program Files\Windows Azure AppFabric SDK\V2.0\Assemblies\NET4.0

Wrapping It Up

Before starting on the implementation I’m going to define a simple interface around the ServiceBus bits and also add an IMessage interface which all Messages will need to implement.

IServiceBus

    public interface IServiceBus
    {
        void Send(string queueName, IMessage message);
        
        T Receive<T>(string queueName) where T : IMessage; 
    }

 

IMessage

Although not required by the API it’s good practice to have a unique identifier for each message.
When architecting message based systems you need to allow for idempotence meaning that an operation should be able to be executed multiple times without changing the result. Most message queue systems guarantee that the messages will be delivered “at least once” so you need to allow for this in your code and having an unique identifier on the message makes implementing this a trivial exercise.

    public interface IMessage
    {
        Guid Id { get; set; }
    }
Note: All messages must be Serializable

 

Authentication

In order to authenticate against the service bus you need three values:

  • Issuer Name
  • Issuer Key
  • Service Namespace

The service namespace is just the name which you created in the portal, in this example it’s “appfabricdemo1”.

The issuer name & key can be found under the Default Key section in the Portal.

default_key

 

There is a little bit of ceremony in setting up the correct objects but I’ve put this into a single method called InitClient which is called from the Send and Receive methods.

       private readonly string issuerKey;
       private readonly string issuerName;
       private readonly string serviceNamespace;
       private TransportClientCredentialBase clientCredentials;
       private MessagingFactory messagingFactory;
       private ServiceBusNamespaceClient namespaceClient;
       private IList<Queue> queueList; 

       public AzureServiceBus(string issuerName, string issuerKey, string serviceNamespace)
       {
           this.issuerName = issuerName;
           this.issuerKey = issuerKey;
           this.serviceNamespace = serviceNamespace;
       }
       private void InitClient()
       {
           clientCredentials = TransportClientCredentialBase
               .CreateSharedSecretCredential(issuerName, issuerKey);

           var uri = ServiceBusEnvironment
               .CreateServiceUri("https", serviceNamespace, string.Empty);
           
           var runtimeUri = ServiceBusEnvironment
               .CreateServiceUri("sb", serviceNamespace, string.Empty);

           namespaceClient = new ServiceBusNamespaceClient(uri, clientCredentials);

           messagingFactory = MessagingFactory.Create(runtimeUri, clientCredentials);
       }

The key classes here are:

 

Creating the Queue

Before we can start sending and receiving messages you first have to create a queue. The API doesn’t currently have a method to check if a queue exists already so it has to be hand rolled as calling CreateQueue throws an exception if it already exists.

This is fairly easily achieved with two simple methods

       private Queue GetOrCreateQueue(string path)
       {
           path = path.ToLower();

           var queues = GetQueues();

           var queueExists = queues.Any(q => q.Path == path);

           if (queueExists)
           {
               return queues.FirstOrDefault(q => q.Path == path);
           }

           var queue = namespaceClient.CreateQueue(path);

           queues.Add(queue);

           return queue;
       }

       private IList<Queue> GetQueues()
       {
           if (queueList != null)
           {
               return queueList;
           }

           queueList = new List<Queue>();
           var queues = namespaceClient.GetQueues();

           foreach (var queue in queues)
           {
               queueList.Add(queue);
           }

           return queueList;
       }

As the GetQueues method needs to make a remote call I keep an in-memory collection of the Queue objects so that it only calls the first time.

Note: Queue names are converted to lowercase when created.

Sending a Message

In order to send a message you need to create a QueueClient, MessageSender and then convert the message to a BrokeredMessage.

        public void Send(string queueName, IMessage message)
        {
            InitClient();

            var queue = GetOrCreateQueue(queueName);

            var queueClient = messagingFactory.CreateQueueClient(queue);

            using (var messageSender = queueClient.CreateSender())
            {
                var brokeredMessage = ConvertToBrokeredMessage(message);

                messageSender.Send(brokeredMessage);
            }

            queueClient.Close();
            messagingFactory.Close();
        }

        private static BrokeredMessage ConvertToBrokeredMessage(IMessage message)
        {
            var brokeredMessage = BrokeredMessage.CreateMessage(message);

            brokeredMessage.MessageId = message.Id.ToString();

            return brokeredMessage;
        }

 

The BrokeredMessage has a bunch of useful properties, most notable of which are:

  • ContentType
  • CorrelationId
  • DeliveryCount
  • Properties – which is a Dictionary<string, object>
  • MessageId

 

Receiving a Message

Receiving a message is much the same as sending a message in that it requires a QueueClient & MessageReceiver.

There are two different modes when receiving a message.
ReceiveAndDelete which deletes the message immediately after reading or PeekLock which only locks the message and leaves you to manage the delete.

       public T Receive<T>(string queueName) where T : IMessage
       {
           InitClient();

           var queue = GetOrCreateQueue(queueName);

           var queueClient = messagingFactory.CreateQueueClient(queue);

           queueClient.CreateReceiver();

           BrokeredMessage brokeredMessage;

           using (var messageReceiver = queueClient.CreateReceiver(ReceiveMode.ReceiveAndDelete))
           {
               brokeredMessage = messageReceiver.Receive();
           }

           if (brokeredMessage == null)
           {
               return default(T);
           }

           queueClient.Close();
           messagingFactory.Close();

           var message = brokeredMessage.GetBody<T>();

           return message;
       }

 

The Demo App

I wanted to pull this together and so created an example application which is two console apps one being the client and one being the server. The server sends a message to the client on one queue and when the client receives the message it sends a reply message on another queue.

Server

    internal class Program
    {
        private static void Main(string[] args)
        {
            Console.WriteLine("Welcome to the Queue Demo Server");
            Console.WriteLine("Press any key to start:");
            Console.ReadLine();


            var serverQueue = "serverqueue";
            var clientQueue = "clientqueue"; 

            var serviceBus = new AzureServiceBus(ConfigurationManager.AppSettings["IssuerName"], 
                ConfigurationManager.AppSettings["IssuerKey"], 
                ConfigurationManager.AppSettings["ServiceNamespace"]);

            var message = new TestMessage("I've travelled along way just to get here.");

            
            serviceBus.Send(serverQueue, message);

            Console.WriteLine("Sent Message:" + message.Id + " at " + DateTime.Now);


            while (true)
            {
                message = serviceBus.Receive<TestMessage>(clientQueue);

                if (message == null || message.Id == Guid.Empty)
                {
                    Console.WriteLine("No messages found yet I'll keep trying");
                }
                else
                {
                    Console.WriteLine("Read Message: " + message.Id + " at " + DateTime.Now);
                    Console.WriteLine(message.Message);
                    break;
                }
            }


            Console.ReadLine();
        }
    }
server_console

Client

   internal class Program
   {
       private static void Main(string[] args)
       {
           Console.WriteLine("Welcome to the Queue Demo Client");
           Console.WriteLine("Press any key to start:");
           Console.ReadLine();
           
           var serverQueue = "serverQueue";
           var clientQueue = "clientQueue"; 

           var serviceBus = new AzureServiceBus(ConfigurationManager.AppSettings["IssuerName"], 
               ConfigurationManager.AppSettings["IssuerKey"], 
               ConfigurationManager.AppSettings["ServiceNamespace"]);


           while (true)
           {
               var message = serviceBus.Receive<TestMessage>(serverQueue);

               if (message == null || message.Id == Guid.Empty)
               {
                   Console.WriteLine("No messages found yet I'll keep trying"); 
               }
               else
               {
                   Console.WriteLine("Read Message: " + message.Id + " at " + DateTime.Now);
                   Console.WriteLine(message.Message);
                   break;
               }
           }


           var responseMessage = new TestMessage("And so have I");

           serviceBus.Send(clientQueue, responseMessage);

           Console.WriteLine("Sent Message:" + responseMessage.Id + " at " + DateTime.Now);



           Console.ReadLine();
       }
   }

client_console

 

Fiddler

If you’re curious you can see what’s going on under the covers using Fiddler.

fiddler

 

API Change Requests

Having played around with the API it’s pretty good and much better than the MessageBuffer API which is really awkward.

I’d like to see on the ServiceBusNamespaceClient a method that tells you if a Queue exists or not, something like:

bool QueueExists(string path)

Currently MessagingFactory, ServiceBusNamespaceClient, QueueClient do not implement interfaces which makes it much harder to Unit Test. Creating an abstracting around these classes would be a big win IMO.

 

Conclusion

All in all the Queue API is a much needed and welcomed addition to the Azure offerings and greatly simplifies messaging communication between not always connected clients and the server. It also appears to be very fast as well.

Grab the code.

In my next post I’m going be looking at the Publish/Subscribe (Topics) features.

Popular posts from this blog

ASP.NET MVC Release Candidate - Upgrade issues - Spec#

First of all, great news that the ASP.NET MVC Release Candidate has finally been released.  Full credit to the team for the hard work on this.  You can get the download here  However this is the first time I have had upgrade issues.  Phil Haack has noted some of the issues here   If like me you have lot's of CTP's and Add-Ins then you might experience some pain in Uninstalling MVC Beta on Vista SP1  This is the list of Add-Ins / CTP's I had to uninstall to get it to work  Spec# PEX Resharper 4.1  Sourcelinks ANTS Profiler 4   Can't say I'm too impressed as it wasted over an hour of my time.  As it turned out Spec# turned out to be the offending culprit, it's forgiveable to have issues with a third party product but a Microsoft one? Guess no-one on the ASP.NET team has Spec# installed. 

Freeing Disk Space on C:\ Windows Server 2008

  I just spent the last little while trying to clear space on our servers in order to install .NET 4.5 . Decided to post so my future self can find the information when I next have to do this. I performed all the usual tasks: Deleting any files/folders from C:\windows\temp and C:\Users\%UserName%\AppData\Local\Temp Delete all EventViewer logs Save to another Disk if you want to keep them Remove any unused programs, e.g. Firefox Remove anything in C:\inetpub\logs Remove any file/folders C:\Windows\System32\LogFiles Remove any file/folders from C:\Users\%UserName%\Downloads Remove any file/folders able to be removed from C:\Users\%UserName%\Desktop Remove any file/folders able to be removed from C:\Users\%UserName%\My Documents Stop Windows Update service and remove all files/folders from C:\Windows\SoftwareDistribution Deleting an Event Logs Run COMPCLN.exe Move the Virtual Memory file to another disk However this wasn’t enough & I found the most space...

CPF Contribution Rates for new Singapore Permanent Residents (SPR’s)

Recently my wife and I applied and got approved for Singapore Permanent Residency. After completing the formalities the most significant immediate change is the contribution to CPF which is Singapore’s mandatory social security savings scheme requiring contributions from employers and employees. CPF contributions start from the date you obtain SPR status, which is the date of the entry permit.   Being a relentless budgeter I needed to know exactly how much I and my employer would have to contribute so that I could adjust my budget accordingly as the employee contributions get deducted from the monthly salary. After doing some research I discovered that there is a “graduated” approach to CPF contributions for new SPR’s where the contributions gradually increase in the first and second year and then upon reaching the third year are at the full amount. Note: There is an option for employers to contribute the full amount for year 1 and year 2 and the employee can use the gra...