Getting Acquainted With #Azure Service Bus Event Hubs

The Microsoft Azure ecosystem just keeps growing. This week Microsoft unveiled a very welcomed addition to the Microsoft Azure Service Bus. Event Hubs join ranks with Queues, Topics and Relays to offer options adapted to your needs.

Contrasting available Service Bus Flavors?

  • Relays – are used to bridge communications over the cloud in a secure and transparent manner.
  • Queues – are pipes that allow for many publishes and many consumers to communicate over a single channel. This is great for Competing Consumers and for Queue-based Load Leveling.
  • Topics – are pipes that allow fan out scenarios, where each consumer gets his own copy of the inbound queue. It also has some handy features like filters. Use this flavor to implement Pipes and Filters.
  • Event Hubs – are a bit more complex. Event Hubs enable the collection of event streams at high throughput, from a diverse set of devices and services. In other words, they help us deal with the 3 Vs.
    • Volume (amount of data)
    • Velocity (speed of data in and out)
    • Variety (range of data types and sources).

Microsoft Azure Service Bus Event Hubs

Event Hub join ranks with Queues, Topics and Relays to offer options adapted to your needs. They province the mechanisms necessary to collection of event streams at high throughput, from a diverse set of devices and services. They are composed of a Published Policy, of Consumer Groups and of Partitions.

Event Hubs support the following scenarios:

  • Collecting event streams at high throughput from devices/services for use in real-time and batch processing.
  • Connecting millions of devices from diverse platforms for sending data (with individual authentication and flow control).
  • Process event streams per device “in order” using several backend services (publish/subscribe).

Considerations Prior to Creating an Event Hub

You must put some effort in capacity planning before you create an Event Hub. In order to make the right decisions let’s go over a couple details about Event Hubs.

Event Hubs are partitioned. The minimum number of partitions is 8 and the maximum (public) number of partitions is 32. If you need more partitions you must go through a support ticket. At that time, you can request up to 1024 (and higher) partitions for your Event Hub.

Each partition has a performance target of 1 MB ingress or 1000 operations and 2 MB egress per second. By default, each Event Hub is created with 16 partitions. This corresponds to 16 MB ingress or 16,000 operations and 32 MB egress per second.

At first glance this seems like way too much so let’s put some perspective on this.

Imagine that our messages are 5 KB in size. We could stream 3276.8 events per second. If messages were 1 KB in size we could technically hit the performance target of 1000 operations per second. This could technically represent 16,000 messages per second. In order to reach the performance targets on all partitions of the Event Hub, we would need as many uniquely identified publisher as we have partitions. Each publisher is pinned to one partition by the Event Hub Service.

Calculating the number of partitions you need starts by calculating potential throughput in megabytes. Then you need to calculate the number of messages per second. This will give you the first part of the equation. The second part of the equation is found by calculating the number of processing nodes required to meet your performance targets.

Since a single partition cannot be processed concurrently by multiple nodes, you must have at least as many partitions as you have backend node instances. For example, a system that must process 30 messages simultaneously, where each process requires a dedicated processing node, requires the Event Hub to have at least 30 partitions.

Number of partitions = MAX (cumulative throughput required for the stream – given 1 MB per partition, number of nodes needed by the backend processing application)

Calculating the number of partitions based on performance targets is important because it cannot be changed after the creation of the Event Hub.

Changing the number of partitions once in production can cause quite a bit of headaches because it means that we need to create a new Event Hub and reconfigure publishers to use the new Event Hub. While events are piling up let the backend nodes empty the Event Hub. Once it’s empty, you can reconfigure your backend nodes to consumer events from the new Event Hub. Switching to the backend processing nodes to the new Event Hub too early would break the order of events. From a billing perspective, the number of partitions is irrelevant because there is not charge for partitions.

Once the application is deployed you can provision throughput units to scale the Event Hub’s throughput capacity. A single throughput unit (as of July 2014) has the capacity of 1 MB per second of ingress events (events sent into an Event Hub), but no more than 1000 ingress events, management operations or control API calls per second. It has 2 MB per second of egress events (events consumed from an Event Hub) and 84 GB of event storage (sufficient for the default 24-hour retention period). In order to max out the total possible capacity of an Event Hub, you will need to provision one throughput unit for each partition. Event Hub throughput units are billed hourly, based on the maximum number of units selected during the given hour.

Creating an Event Hub

Click on the +NEW area at the bottom of the portal and navigate to the Event Hub service.

A wizard will show on screen to assist you with the creation. Enter an Event Hub name, select a Region that hosts the services that will process the events and click next. Be sure that you are creating the resource on the correct subscription and desired Azure Service Bus Namespace.

The next panel prompts for important information. This is where you specify the number of partitions for your Event Hub. Then because this is a Standard Event Hub, you can specie the number of days that an event stays in the Event Hub. This is especially handy when you need to replay events from a week ago. The maximum number of days is set to 30, the minimum is set to 1 day.

Publishing Events

In order to publish events to an Event Hub, you must create Shared Access Policies. I strongly recommend creating a policy that allows single actions. In order to publish an event we need a Shared Access Policy that allows us to send events. Navigate to the Service Bus namespace and click on the Event Hubs tab. From the list choose the newly created Event Hub.

Then click on Configure. In this screen, you will be able to create a Send enabled Shared Access Policy.

Now we need to retrieve a SAS key connection strings to authenticate and interact with the Event Hub. Select the Dashboard tab and click on the View Connection String link.

From the wizard copy the Connection String that is enabled for Send operations. Click the copy button to store the connection string in you clipboard. This will allow you to paste the connection string in your project’s configurations.

Getting down to the code. Use the connection string we copied from the portal to send events to our newly create Microsoft Azure Service Bus Event Hub. The EventData object is the equivalent of the Service Bus BrokeredMessage with a twist. The EventData object requires a Partition Key. This key is used internally to decide which Event Hub partition is used to storage and convey the event. The EventHubClient used to send the EventData is part of the Microsoft Azure Service Bus 2.4.1.1 NuGet package.

var cs = @"[Service Bus Connection String] ";

var builder = new ServiceBusConnectionStringBuilder(cs)
                    {
                        TransportType = TransportType.Amqp
                    };

var client = EventHubClient.CreateFromConnectionString(builder.ToString(), "brisebois");

try
{
    var e = new Event
    {
        Message = "Test Message"
    };

    var serializedString = JsonConvert.SerializeObject(e);
    var data = new EventData(Encoding.Unicode.GetBytes(serializedString))
    {
        PartitionKey = "test-app"
    };

    // Set user properties if needed
    data.Properties.Add("Type", "Event");

    await client.SendAsync(data);
}
catch (Exception exp)
{
    Console.WriteLine("Error on send: " + exp.Message);
}

The data encapsulated inside an EventData object is stored in bytes. This allows us to use DataContracs and efficient serialization libraries like Json.NET, Protobuf and Avro. Keep in mind that like Service Bus Queues and Topics, messages are charged by slices of 64 KB.

[DataContract]
public class Event
{
    [DataMember]
    public string Message { get; set; }
}

Consuming Events

The first step in consuming events from the Event Hub is to implement IEventProcessor. The Event Processor is used to process events. It’s a great way to build an event processing pipeline. By using the Properties collection of the EventData object we can store the type of the event, which we can use to choose whether to deserialize and process the body of the EventData object.

class EventProcessor : IEventProcessor
{
    public Task OpenAsync(PartitionContext context)
    {
        return Task.FromResult<object>(null);
    }

    public Task ProcessEventsAsync(PartitionContext context, IEnumerable events)
    {
        foreach (var eventData in events)
        {
            if (eventData.Properties["Type"].ToString() != "Event")
                continue;

            var bytes = Encoding.Unicode.GetString(eventData.GetBytes());
            var data = JsonConvert.DeserializeObject<Event>(bytes);

            Console.WriteLine("Processing EVENT [(Message: {0}) PartitionKey: {1}] at PartitionId: {2}",
                                data.Message,
                                eventData.PartitionKey,
                                context.Lease.PartitionId);

            foreach (KeyValuePair<string, object=""> p in eventData.Properties)
            {
                if (!p.Key.Equals("ContentType"))
                {
                    Console.WriteLine("  [Property: {0} = {1}]", p.Key, p.Value);
                }
            }
        }

        return Task.FromResult<object>(null);
    }

    public Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        throw new NotImplementedException();
    }
}

There are a couple ways to consume messages from the Event Hub. The easiest is to use the EventProcessorHost. It takes care of scaling and distributing compute resources to process the Event Hub’s partitions. Create an instance by providing a Connection String that is capable of reading from the Event Hub. Provide an Azure Storage Connection String, it will be used by the EventProcessorHost to scale and checkpoint the event stream offset to Blob Storage.

This example reads from the default Consumer Group. In practice, we can use multiple Consumer Groups as we use multiple Subscription on Service Bus Topics. Consumer groups are Partitioned Subscriptions. Each Consumer Group gets a copy of the event. Each Consumer Group can be processed in parallel.

var cs = @"[Service Bus Connection String] ";

var blobConnectionString = @"[Storage Connection String]";

var builder = new ServiceBusConnectionStringBuilder(cs)
                    {
                        TransportType = TransportType.Amqp
                    };

var client = EventHubClient.CreateFromConnectionString(builder.ToString(), "brisebois");

try
{
    var eventProcessorHost = new EventProcessorHost("singleworker",
                                                    client.Path,
                                                    client.GetDefaultConsumerGroup().GroupName,
                                                    builder.ToString(),
                                                    blobConnectionString);

    eventProcessorHost.RegisterEventProcessorAsync<EventProcessor>();
}
catch (Exception exp)
{
    Console.WriteLine("Error on send: " + exp.Message);
}

We can also manage the way we process events from the Event Hub partitions. The first step is to implement an ICheckpointManager. This is used to persist the offset location of the last processed event from an Event Hub partition.

public class EventProcessorCheckpointManager : ICheckpointManager
{
    public Task CheckpointAsync(Lease lease, string offset)
    {
        Console.WriteLine("PartitionId: " + lease.PartitionId);
        Console.WriteLine("Offset: " + offset);
        return Task.FromResult<object>(null);
    }
}

Then by using the EventHubClient and the NamespaceManager we can pick and choose Consumer Groups and partitions to subscribe to. In order to get this solution working, the connection string must contain a SAS key allowing the consuming code to manage the EventHub.

var cs = @"[Service Bus Connection String] ";

var blobConnectionString = @"[Storage Connection String]";

var builder = new ServiceBusConnectionStringBuilder(cs)
                    {
                        TransportType = TransportType.Amqp
                    };

var client = EventHubClient.CreateFromConnectionString(builder.ToString(), "brisebois");

try
{
    var subscriberGroup = client.GetDefaultConsumerGroup();

    var eventHub = NamespaceManager.CreateFromConnectionString(builder.ToString()).GetEventHub("brisebois");

    // Register event processor with each shard to start consuming messages
    foreach (var partitionId in eventHub.PartitionIds)
    {
        subscriberGroup.RegisterProcessor<EventProcessor>(new Lease
        {
            PartitionId = partitionId
        }, new EventProcessorCheckpointManager());

        Console.WriteLine("Processing : " + partitionId);
    }

}
catch (Exception exp)
{
    Console.WriteLine("Error on send: " + exp.Message);
}

Summary

The Azure Service Bus Event Hubs enables us to stream millions of events. It allows us to partition and organize the consumption of events. Furthermore, we can use offsets to replay event streams. This is a powerful service. It has its particularities and takes a bit of effort to tame, but once you nailed down the details… its flat out awesome!

To grok Event Hubs, one has to get to know each of its parts. The service that has many great use cases like the Internet of things (IoT).

In this post I tried to focus on working with Event Hubs from C# using the NuGet packages, these packages are made available by the Azure Service Bus team. Its important to note that you can interact with Even Hubs though a REST API over HTTP. An example of this usage pattern can be found in the sample projects listed below.

Take a look at the following resources. As of July 2014, the documentation isn’t abundant and it requires us to explore. The next few months will probably bring forth much more documentation and use cases. Until then, please share your findings and your thoughts about this new service.

NuGet Packages

Documentation

Sample Projects

Trackbacks and Pingbacks:

  1. Reading Notes 2014-07-28 | Matricis - July 28, 2014

    […] Getting Acquainted with #Azure Service Bus Event Hubs – This nice post describes the event bus of Azure, when more muscles are needed. […]

    Like

  2. From my reading list #14 – July 28th, 2014 | Pascal Laurin - July 28, 2014

    […] Getting Acquainted With #Azure Service Bus Event Hubs by Alexandre Brisebois […]

    Like

  3. Communicating with Microsoft Azure EventHub using Arduino | Experimenting with "IoT" - December 25, 2014

    […] MB ingress or 16,000 operations and 32 MB egress per second. The above information is taken from https://alexandrebrisebois.wordpress.com/2014/07/18/getting-acquainted-with-azure-service-bus-event-…. For more information please visit the mentioned […]

    Like

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.