Polling Tasks Are Great Building Blocks For Windows Azure Roles

February 19, 2013 — 3 Comments

Polling Tasks are building blocks for my Worker Roles. They empower me to easily organize work loads and limit costs by backing off from services when there is no work to be done. The back off delay is calculated from the number of attempts where there was no work to be done.

Polling Tasks can be used to execute work from Windows Azure Queue Storage Service or from Windows Azure Blob Storage Service. It can also be used to pull work for any external system like from email severs, FTP servers or network drives.

The code from this Post is part of the Brisebois.WindowsAzure NuGet Package

To install Brisebois.WindowsAzure, run the following command in the Package Manager Console

PM> Install-Package Brisebois.WindowsAzure

Get more details about the Nuget Package.

 

Examples

  • The QueueWorker implements the PollingTask in order to read messages from a Queue Storage Service. Working with Queues in Windows Azure is quite common. In many circumstances I try to make specialized Queues. This means that I store messages that can be acted upon by a single consumer. Doing so requires me to create multiple QueueWorker instances in the same Worker Role. Having many specialized QueueWorker implementations running side by side also has it benefits. Each QueueWorker can be tuned differently. Some can process 32 messages in parallel while others can process 4, this gives you a lot of free way when it comes to consuming a maximum amount of the available resources. Furthermore, if the demand for your services grow, you can easily break apart the collection of QueueWorker instances and redistribute and reorganize them over more Worker Roles.
  • The BlobContainerWorker implements the PollingTask in order to read blobs from Blob Storage Service. Working with blobs in Windows Azure is very efficient. In most circumstances I try to keep private blobs compressed and publicly accessible blobs uncompressed. Keeping the private blobs compressed will ensure faster network communication and less idle time for your Worker Role CPUs.
  • The BlobSnapshotCleaner implements the PollingTask in order to deleted expired blob Snapshots from Blob Storage Service. Woking with blob Snapshots is a great way to version your data. The BlobSnapshotCleaner allows you to specify the maximum age for a Snapshot and it will take care of the rest.

public abstract class PollingTask
{
    private Task internalTask;
    private readonly CancellationTokenSource source;
    private int attempts;

    protected PollingTask()
    {
        source = new CancellationTokenSource();
    }

    public void Start()
    {
        if (internalTask != null)
            throw new Exception("Task is already running");

        internalTask = Task.Run(() =>
        {
            while (!source.IsCancellationRequested)
            {
                TryExecuteWorkItems();

                Report("Heart Beat");
            }
        }, source.Token);

    }

    private void TryExecuteWorkItems()
    {
        try
        {
            var files = GetWork();

            if (files.Any())
            {
                ResetAttempts();
                files.AsParallel()
                .ForAll(ExecuteWork);
            }
            else
                BackOff();
        }
        catch (Exception ex)
        {
            Report(ex.ToString());
            if (Debugger.IsAttached)
                Trace.TraceError(ex.ToString());
        }
    }

    private void ExecuteWork(TWorkItem workItem)
    {
        Report(string.Format("Started work on workItem"));
        var w = new Stopwatch();
        w.Start();
        Execute(workItem);
        w.Stop();
        Report(string.Format("Completed work on workItem in {0}",
        w.Elapsed.TotalMinutes));
        Completed(workItem);
    }

    protected void BackOff()
    {
        attempts++;

        var seconds = GetTimeoutAsTimeSpan();
        Report(string.Format("Sleep for {0}", seconds));
        Thread.Sleep(seconds);
    }

    private TimeSpan GetTimeoutAsTimeSpan()
    {
        var timeout = DelayCalculator.ExponentialDelay(attempts);

        var seconds = TimeSpan.FromSeconds(timeout);
        return seconds;
    }

    protected abstract void Execute(TWorkItem workItem);
    protected abstract void Completed(TWorkItem workItem);
    protected abstract ICollection GetWork();
    protected abstract void Report(string message);

    public void Cancel()
    {
        source.Cancel();
        internalTask = null;
    }

    public void ResetAttempts()
    {
        attempts = 0;
    }
}

3 responses to Polling Tasks Are Great Building Blocks For Windows Azure Roles

  1. 

    Thank you for this. I’ve been struggling with how to implement an Azure Queue polling service and this really helped me. I have one question and one suggestion.

    My question is how could I include the ability to pause/resume the PollingTask?

    My suggestion is to change the PollingTask slightly so that it immediately cancels when Cancel is signaled instead of waiting for the next heartbeat before cancelling. I achieved this by making the following changes:

    change the BackOff() method to return a bool instead of void and use the cancel token’s WaitHandle to wait:
    protected bool BackOff()
    {
    attempts++;

    var seconds = GetTimeoutAsTimeSpan();
    Report(string.Format("Sleep for {0}", seconds));
    return (!source.Token.WaitHandle.WaitOne(seconds);
    

    }

    Then, change the TryExecuteWorkItems() method to also return a bool:
    private bool TryExecuteWorkItems()
    {
    try
    {
    var files = GetWork();

        if (files.Any())
        {
            ResetAttempts();
            files.AsParallel()
                    .ForAll(ExecuteWork);
        }
        else
            return BackOff();
    }
    catch (Exception ex)
    {
        Report(ex.ToString());
        if (Debugger.IsAttached)
            Trace.TraceError(ex.ToString());
    }
    
    return true;
    

    }

    Finally, modify the internal task lambda in the Start() method so that it exits the while loop if TryExecuteWorkItems returns false:
    while (!source.IsCancellationRequested)
    {
    if (!TryExecuteWorkItems())
    break;

                Report("Heart Beat");
            }
    

    This seems to work well for me. Any thoughts?

    Liked by 1 person

Leave a reply to Alexandre Brisebois Cancel reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.