Windows Azure Queue Storage Service Polling Task

February 19, 2013 — 5 Comments

2013-02-19_18h12_24

The screenshot above is of my test console, showing that the PollingTask is effectively applying the back off strategy. If the QueueWorker finds new messages to process the back off delay will be reset to zero.

The code from this Post is part of the Brisebois.WindowsAzure NuGet Package

To install Brisebois.WindowsAzure, run the following command in the Package Manager Console

PM> Install-Package Brisebois.WindowsAzure

Get more details about the Nuget Package.

The code below uses a QueueWorker which implements PollingTask<TWorkItem> to dequeue 32 messages from a test queue, it then processes the messages in parallel. If the action is successful, the message is deleted from the test queue. When a message is read from the queue, it will not be visible for the next minute. If for some reason an exception is thrown during the action, the message will become visible and will be dequeued a second time. Once dequeued for a second time, the message dequeue count will be check. If the dequeue count is greater or equal to 1, the message will be removed from the test queue and be placed on the poison-test queue.

If it does not find messages in the queue it uses an exponential back off strategy to reduce the number of transactions made on Windows Azure Queue Storage Service. Without the back off strategy, the service would continuously make requests to check if new messages needed to be processed.

Consequences of making too many requests to Queue Storage Service Include

  • The service might throttle your requests
  • The cost of operation would be significantly higher than it should be

 

class Program
{
    static void Main(string[] args)
    {
        var w = new MessageQueueWorker("StorageConnectionString",
                                       "test",
                                       "poison-test",
                                        1,
                                        1);
        w.Start();
        Console.WriteLine("Started");
        Console.ReadLine();
    }
}

Implementing the MessageQueueWorker mostly consists of implementing a Report method. This method is used to trace your code through diagnostics or logging frameworks. The OnExecution method is where the real magic happens. This is where you can implement message specific logic. The QueueWorker handles all the boiler plate interactions with the source and poison queues.

public class MessageQueueWorker: QueueWorker
{
    public MessageQueueWorker(string accountConnectionStringName, 
                              string queueName, 
                              string poisonQueueName, 
                              int maxAttempts = 10,
                              int visibilityTimeOutInMinutes = 10)
        : base(accountConnectionStringName, 
                queueName, 
                poisonQueueName, 
                maxAttempts,
                visibilityTimeOutInMinutes)
    {
    }

    protected override void Report(string message)
    {
        Console.WriteLine(message);
    }
        
    protected override void OnExecuting(CloudQueueMessage workItem)
    {
        //Do some work 
        var message = workItem.AsString;
        Trace.WriteLine(message);
            
        //Used for testing the poison queue
        if(message=="fail")
            throw new Exception(message);

        Thread.Sleep(TimeSpan.FromSeconds(10));
    }
}

The QueueWorker implements the PollingTask in order to read messages from a Queue Storage Service. Working with Queues in Windows Azure is quite common. In many circumstances I try to make specialized Queues. This means that I store messages that can be acted upon by a single consumer. Doing so requires me to create multiple QueueWorker instances in the same Worker Role. Having many specialized QueueWorker implementations running side by side also has it benefits. Each QueueWorker can be tuned differently. Some can process 32 messages in parallel while others can process 4, this gives you a lot of free way when it comes to consuming a maximum amount of the available resources. Furthermore, if the demand for your services grow, you can easily break apart the collection of QueueWorker instances and redistribute and reorganize them over more Worker Roles.

The QueueWorker handles all the queue specific work. This is where messages are dequeued, deleted and placed in the poison queue. Messages are dequeued with a visibility timeout of 10 minutes by default and messages are considered as poison after 10 dequeue attemps. Both of these parameters can be altered through the constructor. 

An Exponential RetryPolicy is used to make the QueueWorker tolerant transient faults.

public abstract class QueueWorker : PollingTask<CloudQueueMessage>
{
    private readonly CloudQueueClient client;
    private readonly CloudQueue poisonQueue;
    private readonly CloudQueue queue;
    private readonly TimeSpan visibilityTimeout;
    private readonly int maxAttempts;

    protected QueueWorker(string connectionString, 
                        string queueName, 
                        string poisonQueueName,
                        int maxAttempts = 10,
                        int visibilityTimeoutInMinutes = 10)
    {
        this.maxAttempts = maxAttempts;

        var cs = CloudConfigurationManager.GetSetting(connectionString);
        var account = CloudStorageAccount.Parse(cs);

        client = account.CreateCloudQueueClient();

        client.RetryPolicy = new ExponentialRetry(new TimeSpan(0, 0, 0, 2), 10);
           
        queue = client.GetQueueReference(queueName);
        queue.CreateIfNotExists();

        poisonQueue = client.GetQueueReference(poisonQueueName);
        poisonQueue.CreateIfNotExists();

        visibilityTimeout = TimeSpan.FromMinutes(visibilityTimeoutInMinutes);
    }
       
    protected override void Execute(CloudQueueMessage workItem)
    {
        if (workItem.DequeueCount > maxAttempts)
        {
            PlaceMessageOnPoisonQueue(workItem);
            return;
        }

        OnExecuting(workItem);
    }

    protected abstract void OnExecuting(CloudQueueMessage workItem);

    private void PlaceMessageOnPoisonQueue(CloudQueueMessage workItem)
    {
        var message = new CloudQueueMessage(workItem.AsString);
        poisonQueue.AddMessage(message);
        Completed(workItem);
    }

    protected override void Completed(CloudQueueMessage workItem)
    {
        try
        {
            queue.DeleteMessage(workItem);
        }
        catch (Exception ex)
        {
            Report(ex.ToString());
        }
    }

    protected override ICollection<CloudQueueMessage> GetWork()
    {
        return queue.GetMessages(32, visibilityTimeout)
                    .ToList();
    }
}

Before executing any of this code be sure to include the PollingTask<TWorkItem> class in your project. Further more be sure to configure the storage connection string in your App.Config or Cloud Configurations.

  <appSettings>
    <add key="StorageConnectionString" value="..." />
  </appSettings>

5 responses to Windows Azure Queue Storage Service Polling Task

  1. 

    Alexandre,

    I installed the nuget-package. It adds a lot of dependencies to my project, yet no code or libraries of your own. How can I find the code? Is it installed on disk somewhere?

    Like

  2. 

    It’s a very good idea, but Thread.Sleep() will block an entire thread, they say.

    Like

  3. 

    Would it be best creating multiple QueueWorker instances within the same Worker Role, or running a single QueueWorker on a ThreadedRole?

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.