Friday, March 25, 2011

Segmenting the Queue across Multiple Instances in Windows Azure

A common usage pattern in cloud computing is to process all tasks that are similar in the same instance, at the same time leveraging the scalability of cloud computing by running multiple instances for all the tasks. Grouping like tasks within an instance is done for a couple of reasons:

  1. Avoid data contention across tenants. For example, if you are aggregating data you might want to performance all the aggregation in memory before writing the data to storage. If you can segment the data that need to be aggregated together on a single tenant you will not have another tenant beating you to the write.
  2. Optimize your local memory cache. If you are caching expensive to retrieve data in the instance, segmenting your tasks to a instance with the task appropriate cache can increase your cache hit ratio. In other words different cached data specific to the work that the instance is doing.

This article will talk about how to successfully accomplish task segmentation in Windows Azure.

The solution that I am proposing works like this:

  1. Add all your tasks onto a Windows Azure Queue
  2. Create multiple instances of a worker role that knows how to process the tasks
  3. Each worker role:
    1. Takes 32 messages from the queue for 60 seconds.
    2. Segment out the messages that it will process and process them
    3. Deletes only the messages that it processed.

60 seconds is an arbitrary number, it needs to be adjusted based on your work load.

Adding Tasks to the Queue

I add tasks to the queue to guarantee that they will be completed. The worker role:

  1. Gets the task from the queue in the form of a queue message.
  2. Processes it.
  3. Then deletes the task from the queue.

If the worker role is terminated for any reason, the task will appear back on the queue when its visibility window expires. Because the queue is acting as the transaction manager, worker roles can: terminate, reboot, have exceptions, and run out of memory without loss of data. The trick is that the worker role has to be able to reprocess a previous processed partially completed task successfully. For more about this concept, see Steve Marx’s Blog post entitled: Deleting Windows Azure Queue Messages: Handling Exceptions.

Serialization of an Object onto the Queue

One thing I do to make life easy on myself is that I create C# class that support serialization, and then write them to the queue as bytes. This is a handy way to store the data I need for processing. An example of the object might look like this:

[Serializable]
public class Task
{
    public string Name { get; set; }
}

Here are the methods I use to serialize and deserialize the object. Notice I am gzipping it to reduce its size, Windows Azure Queues can only holds maximum size of eight kilobytes, so if you have bigger classes then the example, gzipping can help you get a little more data in the queue.

public T DeserializeCompressed<T>(byte[] input) where T : class
{
    using (var ms = new MemoryStream())
    {
        var binaryFormatter = new BinaryFormatter();
        ms.Write(input, 0, input.Length);
        ms.Seek(0, SeekOrigin.Begin);
        ms.Position = 0;

        using (var cs = new GZipStream(ms, CompressionMode.Decompress, true))
        {
            var deserializedRaw = binaryFormatter.Deserialize(cs);
            cs.Close();
            return deserializedRaw as T;
        }
    }
}

public static byte[] SerializeUncompressed<T>(T input) where T : class
{
    using (var ms = new MemoryStream())
    {
        var binaryFormatter = new BinaryFormatter();
        ms.Position = 0;
        ms.Seek(0, SeekOrigin.Begin);

        binaryFormatter.Serialize(ms, input);
        ms.Close();
        return ms.ToArray();
    }
}

Segmenting the Messages

The technique segment the messages is to hash a value (in the example below it is the task name) that I want to use for segmentation. Then, I determine the number of running instances for this worker role and divide it. If I take the modulo and compare it to the instance id of the current instance equality means this task name should be processed on this instance, otherwise it is for another instance

The code for the math above looks like this:

 

/// <summary>
/// Hashes the Object And Determines If This Instance Should Be Processing It
/// </summary>
/// <param name="obj">Object To Hash</param>
/// <returns>True If This Instance Should Process It, Otherwise False</returns>
public static bool ShouldProcess(object obj)
{
    int hash = obj.GetHashCode();

    return ((hash % WorkerRole.InstanceCount) == WorkerRole.InstanceId);
}

 

The GetHashCode() method is support by all objects in the CLR. You can override it if you want to create your own hash for your object.

Getting the instance identifier as an integer for the running instance is tricky in Windows Azure:

/// <summary>
/// Cached Instance Id Of The Current Windows Azure Instance
/// </summary>
private static int? instanceId = null;

/// <summary>
/// Get the Current Windows Azure Instance Id
/// </summary>
public static int InstanceId
{
    get
    {
        if (instanceId == null)
        {
            // WWB: Array of Instance Ids (as strings) Sorted By Name
            string[] array = RoleEnvironment.CurrentRoleInstance.Role.Instances.Select(_ => _.Id).ToArray();
            Array.Sort(array);

            // WWB: Find the One That Matches The Current Id From the List
            for (int index = 0; index < array.Length; index++)
            {
                if (array[index] == RoleEnvironment.CurrentRoleInstance.Id)
                {
                    // WWB: Cache The Index
                    instanceId = index;
                    break;
                }
            }
        }

        return instanceId.Value;
    }
}

I also need to get the instance count to divide the hash by. That code looks like this:

/// <summary>
/// Count of Windows Azure Instances In This Role.
/// </summary>
public static int InstanceCount
{
    get
    {
        return RoleEnvironment.CurrentRoleInstance.Role.Instances.Count;
    }
}

Putting It All Together

Now that we have seen the pieces of the puzzle, let’s put it all together to process the tasks on the queue. Here is what the code looks like:

// WWB: Get 100 Message For 60 Seconds
IEnumerable<CloudQueueMessage> messages = cloudQueue.GetMessages(32, TimeSpan.FromSeconds(60));

// WWB: Iterate Across All the Messages
foreach(CloudQueueMessage message in messages)
{
    // WWB: Get A Task Object From the Cloud Message
    Task task = this.DeserializeCompressed<Task>(message.AsBytes);

    // WWB: Segment the Tasks Based On Name
    if (WorkerRole.ShouldProcess(task.Name))
    {
        this.Process(task);

        cloudQueue.DeleteMessage(message);
    }
}

 

You will have to adjust the visibility time of the messages from 60 seconds to whatever works for you, the maximum time to process the maximum number of messages which is 32.

 

Notice that the code only deletes messages that have been processed. If an exception is thrown, the message will not get deleted, and the worker role will be recycled by Windows Azure. Once the worker role is restarted, it will pick up the message again and try to process.

 

So what happens to the messages that are not deleted – those whose names are not for this instance? They are made visible to the other instances after their visibility window timeouts out – 60 seconds in the code example above. This allows the other instances to pick them up and process them. There is no code that has to be implemented to make this happen, it is in the design of Windows Azure Queues.

Why Here Why Now?

Why does the example split the tasks after they are read from the queue? Why not before they are inserted? The code could create a bunch of queues, and then segment the tasks when they are written to the queue – with each queue having similar tasks. The reason I do it this way is that I can easily increase the number of instances in Windows Azure using the Windows Azure Management Portal, scaling down and up depending on the amount of work I need to accomplish. It is much harder to increase or decrease the number of queues.

Summary

The code demonstrated above will localize processing to a particular Windows Azure instance based on property of the task that needs to be accomplished as it an elegant solution to segmentation in Windows Azure.

 

{6230289B-5BEE-409e-932A-2F01FA407A92}

2 comments: