Sharded Queue – Handling up to 60,000 Messages per Second

July 24, 2013 — Leave a comment

server-caching-page-wordpressWindows Azure Storage Queues were designed with a performance target of 2,000 messages per second. I personally don’t see the queue’s performance target as a limitation. I see the queue as a partition that allows me to scale across logical and physical boundaries.

By scaling across many physical machines, the limitations imposed by the hardware becomes irrelevant.

This experiment has the goal of demonstrating how we can stitch together queues from multiple Windows Azure Storage Accounts in order to augment the maximum throughput of our Cloud Services.


The Sharded Queue

At some point, your application might need to scale beyond the targeted performance. In order to achieve the required performance, you will need to exploit many queues at once. By using a round robin algorithm, messages can be distributed over multiple queues that can be in the same storage account or in multiple storage accounts. Once you reach 10 queues, you will need to spread your queues over multiple accounts to maintain performance, because each Windows Azure Storage Queue is said to be able to process up to 2,000 messages per second and each account is limited to 20,000 transactions per second. 

Before the first read from the queue, a random position will need to be calculated in order to maintain an even distribution of message operations.

7-24-2013 8-49-18 PM

 

Using the Code

In order to test the ShardedQueue, this console application uses 3 storage accounts and each account has 10 queues. It creates Tasks to fill and empty the ShardedQueue asynchronously. 

static void Main(string[] args)
{
    var connectionStrings = new List<string>
    {
        "DefaultEndpointsProtocol=https;AccountName=brisebois01;AccountKey=abcdefg",
        "DefaultEndpointsProtocol=https;AccountName=brisebois02;AccountKey=abcdefg",
        "DefaultEndpointsProtocol=https;AccountName=brisebois03;AccountKey=abcdefg",
    };

    var accounts = connectionStrings.Select(CloudStorageAccount.Parse)
                                    .ToList();

    var shardedQueue = new ShardedQueue("tasks",10, accounts);
    shardedQueue.CreateIfNotExists();

    var writeSource = new CancellationTokenSource();
    var writeToken = writeSource.Token;

    var readSource = new CancellationTokenSource();
    var readToken = readSource.Token;

    Task.Run(() => QueueMessages(shardedQueue,writeToken), writeToken);
    Task.Run(() => ProcessQueueMessages(shardedQueue, readToken), readToken);

    Console.WriteLine("Press enter to stop adding messages");
    Console.ReadLine();

    writeSource.Cancel();
    Console.WriteLine("Press enter to stop processing messages");
    Console.ReadLine();
    
    Console.WriteLine("Press enter to exit");
    Console.ReadLine();
}

public static void QueueMessages(ShardedQueue queue,
                                    CancellationToken token)
{
    while (!token.IsCancellationRequested)
    {
        queue.Add(new CloudQueueMessage("queue message"));
    }
}

public static void ProcessQueueMessages(ShardedQueue queue,
                                        CancellationToken token)
{
    while (!token.IsCancellationRequested)
    {
        var messages = queue.Get(32);
        foreach (var message in messages)
        {
            queue.Delete(message);
        }
    }
}

 

The Code

The following code is a prototype I used to explore the ideas expressed in this post and should only be used as a starting point for your solution. The ShardedQueueMessage is used to encapsulate a CloudQueueMessage along with it’s origin queue, allowing us to update and delete the message.

public class ShardedQueue
{
    private readonly string queueName;
    private readonly int queueCount;
    private readonly List<CloudStorageAccount> accounts;

    private List<List<CloudQueue>> queues;

    private readonly int accountCount;

    private int lastWriteAccount;
    private int lastWriteQueue;

    private int lastReadAccount;
    private int lastReadQueue;

    public ShardedQueue(string queueName,
                        int queueCount,
                        IEnumerable<CloudStorageAccount> storagAccounts)
    {
        this.queueName = queueName;
        this.queueCount = queueCount;

        accounts = storagAccounts.ToList();

        accountCount = accounts.Count();

        var random = new Random(DateTime.Now.Millisecond);
        lastWriteAccount = random.Next(accountCount);

        var randomQueue = new Random(DateTime.Now.Millisecond);
        lastWriteQueue = randomQueue.Next(queueCount);

        LoadQueues();
    }

    private void LoadQueues()
    {
        var clients = accounts.Select(a => a.CreateCloudQueueClient())
            .ToList();

        queues = clients.Select(c => c.ListQueues()
                                        .Where(q => q.Name.StartsWith(queueName))
                                        .ToList())
                        .ToList();
    }

    public void CreateIfNotExists()
    {
        var clients = accounts.Select(a => a.CreateCloudQueueClient())
                                .ToList();

        foreach (var cloudQueueClient in clients)
            CreateMissingQueues(cloudQueueClient);

        LoadQueues();
    }

    private void CreateMissingQueues(CloudQueueClient cloudQueueClient)
    {
        var shards = cloudQueueClient.ListQueues()
            .Where(q => q.Name.StartsWith(queueName))
            .Select(q => q.Name);

        var queueNames = Enumerable.Range(0, queueCount)
            .Select(i => queueName + i)
            .Where(n => !shards.Contains(n))
            .ToList();

        foreach (var name in queueNames)
        {
            var q = cloudQueueClient.GetQueueReference(name);
            q.CreateIfNotExists();
        }
    }

    public void Delete(ShardedQueueMessage message)
    {
        message.OriginQueue.DeleteMessage(message.Message);
    }

    public IEnumerable<ShardedQueueMessage> Get(int messageCount)
    {
        return Get(messageCount, null);
    }

    public IEnumerable<ShardedQueueMessage> Get(int messageCount,
                                                TimeSpan? visibilityTimeout)
    {
        lock (readLockObject)
        {
            lastReadAccount = (lastReadAccount + 1) % accountCount;
            lastReadQueue = (lastReadQueue + 1) % queueCount;
        }

        var queue = queues[lastReadAccount][lastReadQueue];

        var messages = queue.GetMessages(messageCount, visibilityTimeout)
            .Select(m => new ShardedQueueMessage
            {
                OriginQueue = queue,
                Message = m
            })
            .ToList();

        if (messages.Count > 0)
        {
            var trace = string.Format("Read {2} message to Account {0} Queue {1}",
                lastReadAccount,
                lastReadQueue,
                messages.Count);

            Trace.WriteLine(trace);    
        }    
            
        return messages;
    }

    readonly object readLockObject = new object();
    readonly object writeLockObject = new object();

    public void Add(CloudQueueMessage message)
    {
        lock (writeLockObject)
        {
            lastWriteAccount = (lastWriteAccount + 1) % accountCount;
            lastWriteQueue = (lastWriteQueue + 1) % queueCount;
        }

        var queue = queues[lastWriteAccount][lastWriteQueue];
        queue.AddMessage(message);

        var trace = string.Format("Added message to Account {0} Queue {1}",
                                        lastWriteAccount,
                                        lastWriteQueue);
        Trace.WriteLine(trace);
    }

    public void Add(IEnumerable<CloudQueueMessage> messages)
    {
        messages.AsParallel()
                .ForAll(m =>
        {
            var random = new Random();
            var nextAccount = random.Next(accountCount);

            var randomQueue = new Random();
            var nextQueue = randomQueue.Next(queueCount);

            var queue = queues[nextAccount][nextQueue];
            queue.AddMessage(m);

            var trace = string.Format("Added message to Account {0} Queue {1}",
                                        nextAccount,
                                        nextQueue);
            Trace.WriteLine(trace);
        });
    }
}

public class ShardedQueueMessage
{
    internal CloudQueue OriginQueue { get; set; }
    public CloudQueueMessage Message { get; set; }
}

No Comments

Be the first to start the conversation!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s