Windows Azure Blob Storage Polling Task

February 20, 2013 — 8 Comments

2013-02-20_21h10_10

The code below uses a BlobContainerWorker which implements PollingTask<TWorkItem> to read blobs from a container and compress uncompressed blobs. If it does not find uncompressed blobs, it uses an exponential back off strategy to reduce the number of transactions made on Windows Azure Blob Storage Service. Without the back off strategy, the service would continuously make requests to check if files needed to be compressed.

Consequences of making too many requests to Blob Storage Service Include

  • The service might throttle your requests
  • The cost of operation would be significantly higher than it should be

The screenshot above is of my test console, showing that the PollingTask is effectively applying the back off strategy. If the BlobContainerWorker finds new files to convert the back off delay will be reset to zero.

The code from this Post is part of the Brisebois.WindowsAzure NuGet Package

To install Brisebois.WindowsAzure, run the following command in the Package Manager Console

PM> Install-Package Brisebois.WindowsAzure

Get more details about the Nuget Package.

This console application is used as a host for the BlobCompressor. You can find the source for the BlobCompressor below. Since the BlobCompressor implements the PollingTask it is instantiated and used pretty much like the MessageQueueWorker implemented in an earlier post. The BlobCompressor needs a Storage Account Connection String and the name of the blob container that it will work with.

class Program
{
    static void Main(string[] args)
    {
        var b = new BlocCompressor("StorageConnectionString",
                                   "my-blob-container");
        b.Start();

        Console.WriteLine("Started");
        Console.ReadLine();
    }
}

The BlobCompressor is responsible for tasks related to compression and reporting. It implements a BlobContainerWorker which we will get to later in the post. Before compressing a blob it first inspects the Metadata property of the Cloud Block Blob to see if it’s already compressed. Then it downloads the Cloud Block Blob as a Stream which is then compressed and uploaded back to the blob container using the same URI. This specific implementation of the BlobContrainerWorker is configured not to delete blobs once it has processed them.

public class BlocCompressor : BlobContainerWorker
{
    private const string CompressedFlag = "Compressed";

    public BlocCompressor(string connectionString, string containerName)
        : base(connectionString, containerName, false)
    {
    }

    protected override void Report(string message)
    {
        Console.WriteLine(message);
    }

    protected override void OnExecuting(CloudBlockBlob workItem)
    {
        if (workItem.Metadata.ContainsKey(CompressedFlag))
            return;

        using (var blobStream = new MemoryStream())
        {
            workItem.DownloadToStream(blobStream);

            using (var compressedStream = new MemoryStream())
            {
                CompressStream(compressedStream, blobStream);

                SetCompressedFlag(workItem);

                workItem.UploadFromStream(compressedStream);
            }
        }
    }

    protected override ICollection<IListBlobItem> GetWork()
    {
        return base.GetWork()
                    .Cast<CloudBlockBlob>()
                    .Where(b => !b.Metadata.ContainsKey(CompressedFlag))
                    .Cast<IListBlobItem>()
                    .ToList();
    }

    private static void SetCompressedFlag(CloudBlockBlob workItem)
    {
        workItem.Metadata.Add(new KeyValuePair<string, string>(CompressedFlag, 
                                                               "true"));
        workItem.SetMetadata();
    }

    protected void CompressStream(MemoryStream compressedStream,
                                    MemoryStream blobStream)
    {
        blobStream.Position = 0;
        using (var compressionStream = MakeCompressionStream(compressedStream))
        {
            blobStream.CopyTo(compressionStream);
            compressedStream.Position = 0;
        }
    }

    private static GZipStream MakeCompressionStream(Stream compressedStream)
    {
        return new GZipStream(compressedStream, CompressionLevel.Optimal, true);
    }
}

The BlobContainerWorker implements the PollingTask in order to read blobs from Blob Storage Service. Working with blobs in Windows Azure is very efficient. In most circumstances I try to keep private blobs compressed and publicly accessible blobs uncompressed. Keeping the private blobs compressed will ensure faster network communication and less idle time for your Worker Role CPUs.

It is responsible for getting a list of blobs that need to be processed. If configured, the blobs can be returned in a predetermined batch size. Depending on the configurations it can also delete blobs once they have been processed.

When the BlobContainerWorker is instantiated, it is configured with the desired container. If the container does not exist, it will be created. Implement OnExecuting to define how blobs are processed.

public abstract class BlobContainerWorker : PollingTask<IListBlobItem>
{
    private readonly bool deleteBlobOnCompleted;
    protected readonly CloudBlobClient client;
    protected readonly CloudBlobContainer container;
    private readonly int? batchSize;

    protected BlobContainerWorker(string connectionString,
                                    string containerName,
                                    bool deleteBlobOnCompleted = true,
                                    int? batchSize = null)
    {
        this.deleteBlobOnCompleted = deleteBlobOnCompleted;
        this.batchSize = batchSize;
        var cs = CloudConfigurationManager.GetSetting(connectionString);
        var account = CloudStorageAccount.Parse(cs);

        client = account.CreateCloudBlobClient();

        var deltaBackoff = new TimeSpan(0, 0, 0, 2);
        client.RetryPolicy = new ExponentialRetry(deltaBackoff, 10);

        container = client.GetContainerReference(containerName);
        container.CreateIfNotExists();

    }

    protected override void Completed(IListBlobItem workItem)
    {
        if (deleteBlobOnCompleted)
        {
            var blob = workItem as CloudBlockBlob;
            if (blob != null)
                blob.DeleteIfExists(DeleteSnapshotsOption.IncludeSnapshots);
        }
    }

    protected abstract void OnExecuting(CloudBlockBlob workItem);

    protected override void Execute(IListBlobItem workItem)
    {
        var blob = workItem as CloudBlockBlob;
        if (blob == null)
            return;

        OnExecuting(blob);
    }

    protected override ICollection<IListBlobItem> GetWork()
    {
        if (batchSize.HasValue)
            return container.ListBlobs("", true, BlobListingDetails.Metadata)
                            .Take(batchSize.Value)
                            .ToList();

        return container.ListBlobs("", true, BlobListingDetails.Metadata)
                        .ToList();
    }
}

Before executing any of this code be sure to include the PollingTask<TWorkItem> class in your project. Further more be sure to configure the storage connection string in your App.Config or Cloud Configurations.

<appSettings>
    <add key="StorageConnectionString" value="..." />
</appSettings>

8 responses to Windows Azure Blob Storage Polling Task

  1. 

    And if I have more than one instance, it will bring the same blobs to be processed?

    Like

    • 

      you can use blob leasing to control which instance is running or which instance has access.

      I use this code to import data into Windows Azure SQL Database. So essentially I only have a single instance. This is by design so that I don’t overload my SQL Datbase.

      Like

  2. 

    It would be great it work with more than one instance because that we would have the microsoft SLA

    Like

Trackbacks and Pingbacks:

  1. Dew Drop – February 21, 2013 (#1,503) | Alvin Ashcraft's Morning Dew - February 21, 2013

    […] Windows Azure Blob Storage Polling Task (Alexandre Brisebois) […]

    Like

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.