Skip to main content

Windows Azure AppFabric Service Bus Queues API

 

Recently Microsoft released a new App Fabric SDK 2.0 CTP including some great new features,

You can grab all the bits and pieces from here and/or read the release notes.

Some of the highlights include: 

  • Publish/Subscribe which are called Topics
  • Message Queues
  • Visual Studio Tools
  • AppFabric Application Manager
  • Support for running WCF & WF

The part that interested me the most is the Queues feature and that is what I’m going to be exploring in this post.

Overview of Queues

Message Queues are not a new concept allow for more reliable and scalable communication between distributed systems than pure request/response. Solutions like MSMQ, NServiceBus already exist to solve this problem for locally connected systems.  What the Queues API provides is that it provides similar features but the messages are being transported across the internet and persisted in the cloud.  

There is currently a Message Buffer available in Azure but this has serious limitations:

  • Messages only persisted for a maximum 10 minutes
  • Maximum of 50 messages
  • Requires the client to be connected
  • Charged per connection not per message
  • Complex API
  • Max message size of 8 KB

The Queues API addresses a number of these issues by adding.

  • Long term persistence (No clarification on the SLA yet)
  • Simpler API
  • Max message size of 256 KB
  • Sessions
  • Larger size of queue (100 MB currently but set to increase)

To start testing out the Queues feature you need to first Login to the AppFabricLabs portal and create a test Service Namespace.

Then you will need to add Microsoft.ServiceBus and Microsoft.ServiceBus.Messaging dlls to your project. They can be found in C:\Program Files\Windows Azure AppFabric SDK\V2.0\Assemblies\NET4.0

Wrapping It Up

Before starting on the implementation I’m going to define a simple interface around the ServiceBus bits and also add an IMessage interface which all Messages will need to implement.

IServiceBus

    public interface IServiceBus
    {
        void Send(string queueName, IMessage message);
        
        T Receive<T>(string queueName) where T : IMessage; 
    }

 

IMessage

Although not required by the API it’s good practice to have a unique identifier for each message.
When architecting message based systems you need to allow for idempotence meaning that an operation should be able to be executed multiple times without changing the result. Most message queue systems guarantee that the messages will be delivered “at least once” so you need to allow for this in your code and having an unique identifier on the message makes implementing this a trivial exercise.

    public interface IMessage
    {
        Guid Id { get; set; }
    }
Note: All messages must be Serializable

 

Authentication

In order to authenticate against the service bus you need three values:

  • Issuer Name
  • Issuer Key
  • Service Namespace

The service namespace is just the name which you created in the portal, in this example it’s “appfabricdemo1”.

The issuer name & key can be found under the Default Key section in the Portal.

default_key

 

There is a little bit of ceremony in setting up the correct objects but I’ve put this into a single method called InitClient which is called from the Send and Receive methods.

       private readonly string issuerKey;
       private readonly string issuerName;
       private readonly string serviceNamespace;
       private TransportClientCredentialBase clientCredentials;
       private MessagingFactory messagingFactory;
       private ServiceBusNamespaceClient namespaceClient;
       private IList<Queue> queueList; 

       public AzureServiceBus(string issuerName, string issuerKey, string serviceNamespace)
       {
           this.issuerName = issuerName;
           this.issuerKey = issuerKey;
           this.serviceNamespace = serviceNamespace;
       }
       private void InitClient()
       {
           clientCredentials = TransportClientCredentialBase
               .CreateSharedSecretCredential(issuerName, issuerKey);

           var uri = ServiceBusEnvironment
               .CreateServiceUri("https", serviceNamespace, string.Empty);
           
           var runtimeUri = ServiceBusEnvironment
               .CreateServiceUri("sb", serviceNamespace, string.Empty);

           namespaceClient = new ServiceBusNamespaceClient(uri, clientCredentials);

           messagingFactory = MessagingFactory.Create(runtimeUri, clientCredentials);
       }

The key classes here are:

 

Creating the Queue

Before we can start sending and receiving messages you first have to create a queue. The API doesn’t currently have a method to check if a queue exists already so it has to be hand rolled as calling CreateQueue throws an exception if it already exists.

This is fairly easily achieved with two simple methods

       private Queue GetOrCreateQueue(string path)
       {
           path = path.ToLower();

           var queues = GetQueues();

           var queueExists = queues.Any(q => q.Path == path);

           if (queueExists)
           {
               return queues.FirstOrDefault(q => q.Path == path);
           }

           var queue = namespaceClient.CreateQueue(path);

           queues.Add(queue);

           return queue;
       }

       private IList<Queue> GetQueues()
       {
           if (queueList != null)
           {
               return queueList;
           }

           queueList = new List<Queue>();
           var queues = namespaceClient.GetQueues();

           foreach (var queue in queues)
           {
               queueList.Add(queue);
           }

           return queueList;
       }

As the GetQueues method needs to make a remote call I keep an in-memory collection of the Queue objects so that it only calls the first time.

Note: Queue names are converted to lowercase when created.

Sending a Message

In order to send a message you need to create a QueueClient, MessageSender and then convert the message to a BrokeredMessage.

        public void Send(string queueName, IMessage message)
        {
            InitClient();

            var queue = GetOrCreateQueue(queueName);

            var queueClient = messagingFactory.CreateQueueClient(queue);

            using (var messageSender = queueClient.CreateSender())
            {
                var brokeredMessage = ConvertToBrokeredMessage(message);

                messageSender.Send(brokeredMessage);
            }

            queueClient.Close();
            messagingFactory.Close();
        }

        private static BrokeredMessage ConvertToBrokeredMessage(IMessage message)
        {
            var brokeredMessage = BrokeredMessage.CreateMessage(message);

            brokeredMessage.MessageId = message.Id.ToString();

            return brokeredMessage;
        }

 

The BrokeredMessage has a bunch of useful properties, most notable of which are:

  • ContentType
  • CorrelationId
  • DeliveryCount
  • Properties – which is a Dictionary<string, object>
  • MessageId

 

Receiving a Message

Receiving a message is much the same as sending a message in that it requires a QueueClient & MessageReceiver.

There are two different modes when receiving a message.
ReceiveAndDelete which deletes the message immediately after reading or PeekLock which only locks the message and leaves you to manage the delete.

       public T Receive<T>(string queueName) where T : IMessage
       {
           InitClient();

           var queue = GetOrCreateQueue(queueName);

           var queueClient = messagingFactory.CreateQueueClient(queue);

           queueClient.CreateReceiver();

           BrokeredMessage brokeredMessage;

           using (var messageReceiver = queueClient.CreateReceiver(ReceiveMode.ReceiveAndDelete))
           {
               brokeredMessage = messageReceiver.Receive();
           }

           if (brokeredMessage == null)
           {
               return default(T);
           }

           queueClient.Close();
           messagingFactory.Close();

           var message = brokeredMessage.GetBody<T>();

           return message;
       }

 

The Demo App

I wanted to pull this together and so created an example application which is two console apps one being the client and one being the server. The server sends a message to the client on one queue and when the client receives the message it sends a reply message on another queue.

Server

    internal class Program
    {
        private static void Main(string[] args)
        {
            Console.WriteLine("Welcome to the Queue Demo Server");
            Console.WriteLine("Press any key to start:");
            Console.ReadLine();


            var serverQueue = "serverqueue";
            var clientQueue = "clientqueue"; 

            var serviceBus = new AzureServiceBus(ConfigurationManager.AppSettings["IssuerName"], 
                ConfigurationManager.AppSettings["IssuerKey"], 
                ConfigurationManager.AppSettings["ServiceNamespace"]);

            var message = new TestMessage("I've travelled along way just to get here.");

            
            serviceBus.Send(serverQueue, message);

            Console.WriteLine("Sent Message:" + message.Id + " at " + DateTime.Now);


            while (true)
            {
                message = serviceBus.Receive<TestMessage>(clientQueue);

                if (message == null || message.Id == Guid.Empty)
                {
                    Console.WriteLine("No messages found yet I'll keep trying");
                }
                else
                {
                    Console.WriteLine("Read Message: " + message.Id + " at " + DateTime.Now);
                    Console.WriteLine(message.Message);
                    break;
                }
            }


            Console.ReadLine();
        }
    }
server_console

Client

   internal class Program
   {
       private static void Main(string[] args)
       {
           Console.WriteLine("Welcome to the Queue Demo Client");
           Console.WriteLine("Press any key to start:");
           Console.ReadLine();
           
           var serverQueue = "serverQueue";
           var clientQueue = "clientQueue"; 

           var serviceBus = new AzureServiceBus(ConfigurationManager.AppSettings["IssuerName"], 
               ConfigurationManager.AppSettings["IssuerKey"], 
               ConfigurationManager.AppSettings["ServiceNamespace"]);


           while (true)
           {
               var message = serviceBus.Receive<TestMessage>(serverQueue);

               if (message == null || message.Id == Guid.Empty)
               {
                   Console.WriteLine("No messages found yet I'll keep trying"); 
               }
               else
               {
                   Console.WriteLine("Read Message: " + message.Id + " at " + DateTime.Now);
                   Console.WriteLine(message.Message);
                   break;
               }
           }


           var responseMessage = new TestMessage("And so have I");

           serviceBus.Send(clientQueue, responseMessage);

           Console.WriteLine("Sent Message:" + responseMessage.Id + " at " + DateTime.Now);



           Console.ReadLine();
       }
   }

client_console

 

Fiddler

If you’re curious you can see what’s going on under the covers using Fiddler.

fiddler

 

API Change Requests

Having played around with the API it’s pretty good and much better than the MessageBuffer API which is really awkward.

I’d like to see on the ServiceBusNamespaceClient a method that tells you if a Queue exists or not, something like:

bool QueueExists(string path)

Currently MessagingFactory, ServiceBusNamespaceClient, QueueClient do not implement interfaces which makes it much harder to Unit Test. Creating an abstracting around these classes would be a big win IMO.

 

Conclusion

All in all the Queue API is a much needed and welcomed addition to the Azure offerings and greatly simplifies messaging communication between not always connected clients and the server. It also appears to be very fast as well.

Grab the code.

In my next post I’m going be looking at the Publish/Subscribe (Topics) features.

Comments

  1. James Bruiners5 March 2012 at 06:25

    Hi There, I have created an xml file that I am pushing into a que its not that big, its a 100kb XML file that I convert to a string, but it doesnt make it to the recievers, if I keep the string length lower (sorry not tested how long exactly) then its recieved fine, do you know what the correct way to do this is with a package say 500kb - 1mb message.

    ReplyDelete
  2. All the documentation states that 64KB is the maximum message size. 
    http://msdn.microsoft.com/en-us/library/dd179363.aspx
    If you're trying to pass around messages bigger than this then you would need to use a different  mechanism. Why not pass a lightweight message with a Key that can be used to callback to a Web Service from the client to get the full payload?

    ReplyDelete

Post a Comment

Popular posts from this blog

Freeing Disk Space on C:\ Windows Server 2008

I just spent the last little while trying to clear space on our servers in order to install .NET 4.5. Decided to post so my future self can find the information when I next have to do this. I performed all the usual tasks: Deleting any files/folders from C:\windows\temp and C:\Users\%UserName%\AppData\Local\TempDelete all EventViewer logs Save to another Disk if you want to keep themRemove any unused programs, e.g. FirefoxRemove anything in C:\inetpub\logsRemove any file/folders C:\Windows\System32\LogFilesRemove any file/folders from C:\Users\%UserName%\DownloadsRemove any file/folders able to be removed from C:\Users\%UserName%\DesktopRemove any file/folders able to be removed from C:\Users\%UserName%\My DocumentsStop Windows Update service and remove all files/folders from C:\Windows\SoftwareDistributionDeleting an Event Logs Run COMPCLN.exe Move the Virtual Memory file to another disk However this wasn’t enough & I found the most space was cleared by using the Disk Cleanup to…

Consuming the SSRS ReportExecutionService from a .NET Client

I’ve just finished writing a nice wrapper which internally calls the SSRS ReportExecutionService to generate reports.
Whilst it was fairly simple to implement there has been some major changes between 2005 and 2008 and the majority of online and documentation is based on the 2005 implementation. The most important change is that the Report Server and Report Manager are no longer hosted in IIS which will be a welcomed change to Sys Admins but makes the security model and hosting model vastly different. So far I’ve yet to figure out how to allow Anonymous Access, if anyone knows how to do this leave a comment and it will be most appreciated. Getting StartedTo get started you’ll want to add a service reference to http://localhost/ReportServer_SQL2008/ReportExecution2005.asmx where ReportServer_SQL2008 is the name you configure in the Reporting Services Configuration Manager. The Web Application files are located in C:\Program Files\Microsoft SQL Server\MSRS10.SQL2008\Reporting Servic…

Log Shipping in SQL Server Express 2008

When you entrust your Applications and Data to the Cloud Based Service Providers such as GoGrid and Amazon EC2 it becomes absolutely critical to employ a strict Disaster Recovery strategy. As part of of our strategy it was imperative that we have a failover for our SQL Server. The only problem was that SQL Server Express 2008 doesn’t support Log Shipping.
Well technically it does but SQL Server Express does not have SQL Server Agent. Now I’m a big fan of SQL Server, but when it comes to doing something a bit more complicated the Express editions limitations become a bit of a problem, that’s why it’s time to find creative solutions. To set up Log Shipping the first step is to Backup your database and then Restore it on another instance making sure set the Recovery state as “RESTORE WITH STANDBY”. See belowTo get this going you’ll need three stored procedures, one in the Master database and two in the slave database. Now when you are trying to do this across Networks and Hosting Pro…