Windows Azure IaaS Overload

I took a short breather after TechEd North America and TechEd Europe back-to-back but I did want to put up a post to summarize the sessions around Windows Azure Virtual Machines and Virtual Networks from TechEd 2012. This is a big and extremely important launch for Windows Azure so we have quite a bit of coverage on the subject 🙂

If you were looking for a crash course on Windows Azure IaaS here it is!


Meet the New Windows Azure – Scott Guthrie

Windows Azure Virtual Machines and Virtual Networks – Mark Russinovich

Windows Azure IaaS and How it Works – Corey Sanders

Extending Enterprise Networks to Windows Azure using Windows Azure Virtual Networks – Ganesh Srinivasan

Deep Dive on Windows Azure Virtual Machines – Vijay Rajagopalan

Running Linux on Windows Azure Virtual Machines – Tad Brockway

Migrating Applications to Windows Azure Virtual Machines – Michael Washam

Deploying SharePoint Farms on Windows Azure Virtual Machines – Paul Stubbs

Migrating SQL Server database applications to Windows Azure Virtual Machines – Guy Bowerman, Madhan Arumugam

Running Active Directory on Windows Azure Virtual Machine – Dean Wells

How to Move and Enhance Existing Apps for Windows Azure – Tom Fuller, Greg Varveris, Purush Vankireddy

Deconstructing the Hybrid Twitter Demo at BUILD

Many of you may have watched the Windows Server 8 Session at BUILD and were awed by how cool the Twitter demo delivered by Stefan Schackow was. (Well maybe not awed but at least impressed!). I will admit that it was a fun demo to build because we were using such varied technologies such as IIS8 Web Sockets, AppFabric Topics and Subscriptions and of course Windows Azure Worker Roles.

The demo was built by myself and Paul Stubbs

Here is a screenshot of the application as it ran live:

image

 

Here is a slide depicting the architecture:

image

 

Essentially, the demo shows how you could take data from the cloud and pass it to an application behind your firewall for additional processing (Hybrid Cloud Scenario). The method of transferring data we chose was to use Windows Azure AppFabric Topics (a really powerful queue) as the data transport.

One of the beauties of using Windows Azure AppFabric Topic’s is the ability for multiple clients to receive messages across the topic with a different view of the data using filtered subscriptions.

In our demo we could have client A view our Twitter feed with certain tags enabled while client B had a completely different set enabled.

So on to the source code!

Within the Windows Azure Worker Role I am using a modified version of Guy Burstein’s example to search Twitter based on a set of hashtags.

Sending Twitter Search Results to a Topic


TwitterSubscriptionClient tsc = new TwitterSubscriptionClient();
tsc.CreateTopic();

TwitterWrapper wrapper = new TwitterWrapper();
while (true)
{
    if (IsFeedEnabled())
    {
        try
        {
            // Call out to Twitter with the set of hash tags we are interested in 
            // getting search results from. 
            SearchResults results = wrapper.GetSearchResults(GetHashTags(), SearchResultType.recent, TweetCount, strLiveLatestID);
            if (results != null && results.Results.Count > 0)
            {
                // Save the last ID so we can use it as a continuation token 
                // for the next query to Twitter
                strLiveLatestID = results.Results[0].Id;

                // Send the search results over the Service Bus Topic (250 ms delay)
                TwitterSubscriptionClient.SendTweets(results, 250);
            }
        }
        catch (Exception exc)
        {
            System.Diagnostics.Trace.TraceError(DateTime.Now.ToString() + " " + exc.ToString());
        }
    }
    else
    {
        System.Diagnostics.Trace.TraceInformation("Waiting - Twitter Search is Not Enabled.");
    }
    Thread.Sleep(GetInterval());
    Trace.WriteLine("Working", "Information");
}

That covers the meat of the service but let’s go a bit deeper on how the Topic is created.

The constructor of the TwitterSubscriptionClient is where the intialization of all of the Service Bus classes take place.

Initialize and Authenticate the Service Bus Classes


static string serviceNamespace = String.Empty;
static string topicName = String.Empty;
static string issuerName = String.Empty;
static string issuerKey = String.Empty;
static TokenProvider tokenProvider = null;
static Uri serviceUri = null;
NamespaceManager nsMgr = null;

public TwitterSubscriptionClient()
{
    serviceNamespace = RoleEnvironment.GetConfigurationSettingValue("serviceNamespace");
    topicName = RoleEnvironment.GetConfigurationSettingValue("topicName");
    issuerName = RoleEnvironment.GetConfigurationSettingValue("issuerName");
    issuerKey = RoleEnvironment.GetConfigurationSettingValue("issuerKey");
    serviceUri = ServiceBusEnvironment.CreateServiceUri("sb", serviceNamespace, string.Empty);
    tokenProvider = TokenProvider.CreateSharedSecretTokenProvider(issuerName, issuerKey);
    NamespaceManagerSettings nms = new NamespaceManagerSettings();
    nms.TokenProvider = tokenProvider;
    nsMgr = new NamespaceManager(serviceUri, nms);
}

The code below simply tests to see if the topic has already been created and if it has not creates it.

As with all things in the cloud the below code uses some basic retry logic on each operation that operates on a Windows Azure service.

Creating the Topic

public void CreateTopic()
{
    int retrySeconds = 3;
    int maxRetries = 5;
    int retryCounter = 0;

    RetryN(() =>
    {
        if (nsMgr.TopicExists(topicName) == false)
        {
            TopicDescription twitterTopic = nsMgr.CreateTopic(topicName);
            System.Diagnostics.Trace.TraceInformation("Created and Configured Topic");
        }
    }, null, maxRetries, TimeSpan.FromSeconds(retrySeconds), ref retrySeconds);
            
    if (retryCounter > 0)
        System.Diagnostics.Trace.TraceWarning(String.Format("Retried {0} Times Creating Topic.", retryCounter));

}

The method below takes the search results from Twitter and sends them individually into the topic.
Note I’m adding an additional property onto the BrokeredMessage “LowerText” so on the receiving end I can easily filter for various tags on my subscription.
As the text is all lower case I don’t care whether the tag is #Microsoft or #microsoft.

Sending Tweets through the Topic

public static void SendTweets(SearchResults srs, int MSDelay)
{
    TopicClient client = null;
    MessagingFactory messagingFactory = null;
    int retrySeconds = 3;
    int maxRetries = 5;

    try
    {
        MessagingFactorySettings settings = new MessagingFactorySettings();
        settings.TokenProvider = tokenProvider;
        messagingFactory = MessagingFactory.Create(serviceUri, settings);
        client = messagingFactory.CreateTopicClient(topicName);
        foreach (SearchResult sr in srs.Results)
        {
            int retryCounter = 0;
            if (MSDelay > 0)
                System.Threading.Thread.Sleep(MSDelay);
            try
            {
                if (TwitterSubscriptionClient.PassesFilter(sr.Text))
                {
                    RetryN(() =>
                    {
                        BrokeredMessage message = new BrokeredMessage(sr);
                        message.TimeToLive = TimeSpan.FromMinutes(15);
                        message.Properties.Add("Text", sr.Text);
                        message.Properties.Add("LowerText", sr.Text.ToLower());
                        client.Send(message);
                    }, null, maxRetries, TimeSpan.FromSeconds(retrySeconds), ref retryCounter);

                    if (retryCounter > 0)
                        System.Diagnostics.Trace.TraceInformation(String.Format("Retried {0} Times Sending Tweet.", retryCounter));
                }
            }
            catch (Exception exc)
            {
                System.Diagnostics.Trace.TraceError(DateTime.Now.ToString() + " " + exc.Message);
                System.Threading.Thread.Sleep(5000);
            }
        }
    }
    finally
    {
        if(client != null)
            client.Close();
        if(messagingFactory != null)
            messagingFactory.Close();
    }
}

So now we have built a feed that searches tweets looking for specific hash tags (see the project for the Twitter integration code) and we send the results out on the ServiceBus through a topic. How does our on-premises application consume the data?

The code below is client side javascript code that opens a WebSocket to our IIS8 Server. The socket.onmessage handler essentially waits on data to be sent back from the server which it then parses into a JSON object. If the JSON object has a .Key property I update the Stream Insight UI if not I update the Twitter feed.
There is also a send() method. The only time I send data to the server is when the user has clicked on one of the hash tags on the UI. This updates a structure that holds the current hash tags filter that I send back to the server via the socket.

Initialize the Web Socket

        var socket;
        var socketReady = false;
        var hashTagsFilter = new Object();

        function initializeWebSocket() {
            var host = "ws://<%: Request.Url.Host %>:<%: Request.Url.Port %><%: Response.ApplyAppPathModifier("~/TopicStartHandler.ashx") %>";
            try {
                socket = new WebSocket(host);
                socket.onopen = function(msg){
                    var s = ' Socket Open';
                    $("#serverStatus").html(s);
                    socketReady = true;
                    InitFilter();
                    send();
                };
                socket.onmessage = function(msg){
                    var s = msg.data;
                    var response = window.JSON.parse(s);

                    // Stream insight response
                    if(response.Key != null)
                    {
                        AddStreamInsightResponse(response);
                    }
                    // Tweet from the live feed
                    else
                    {
                        AddTweet(response);
                    }
                };
                socket.onclose = function(msg){ 
                    try
                    {
                        var s = ' Socket Closed';
                        $("#serverStatus").html(s);
                        socketReady = false;
                    }catch(e) {}
                };
            } 
            catch(ex){ 
                console.log(ex.name + "\n" + ex.xmessage); 
            }
        }

        function send() {
            if(socketReady == false)
            {
                alert("Socket isn't ready.");
                return;
            }
            var subscriptions = window.JSON.stringify(hashTagsFilter);
            socket.send(subscriptions);
        }

        function InitFilter()
        {
            hashTagsFilter.microsoft = true;
            hashTagsFilter.windows = true;
            hashTagsFilter.windows8 = true;
            hashTagsFilter.azure = true;
            hashTagsFilter.wp7 = true;
            hashTagsFilter.build = true;
            hashTagsFilter.technology = true;
            hashTagsFilter.ie = true;
            hashTagsFilter.xbox = true;
            hashTagsFilter.webdesign = true;
            hashTagsFilter.mobile = true;
        }
      
        initializeWebSocket();

When a client logs in to our application. We first verify that indeed it is a web socket request and assuming it is create a unique ClientHandler object to facilitate state/communication with the client.

Client Setup

public void ProcessRequest (HttpContext context) {
  if (context.IsWebSocketRequest)
  {
      ClientHandler h = new ClientHandler();
      context.AcceptWebSocketRequest(t => h.ProcessTopicMessages(t));
  }
}

The implementation of ClientHandler is fairly straight forward. ConfigureServiceBus() sets up the subscriptions to the topic (1 for the live feed and 1 for Stream Insight). The ProcessTopicMessages method is an async method that just waits for input from the user. The only input from the client is the update of the hash tag filter.

If we receive data DeserializeFilter() is called which uses the JSON serializer to deserialize the passed in data to a C# class (TwitterFilter).

Receiving Data from a Web Sockets Client

    public ClientHandler() 
    {
        ConfigureServiceBus();
    }
    
    public async Task ProcessTopicMessages(AspNetWebSocketContext context)
    {
        ClientList.ActiveClients[this.GetHashCode()] = this;
        socket = context.WebSocket as AspNetWebSocket;
        try
        {
            while (true)
            {
                WebSocketReceiveResult input = await socket.ReceiveAsync(buffer, CancellationToken.None);
                if (socket.State != WebSocketState.Open)
                    break;

                userFilter = DeserializeFilter(buffer, input);
                ProcessMessagesFromTopic();
            }
        }
        finally
        {
            Cleanup();
        }
    }

    private TwitterFilter DeserializeFilter(ArraySegment<byte> buffer, WebSocketReceiveResult input)
    {
        String jsonString = Encoding.UTF8.GetString(buffer.Array, 0, input.Count);
        TwitterFilter newFilter = jsonSerializer.Deserialize<TwitterFilter>(jsonString);
        FilterChanged = true;
        return newFilter;
    }

    // Details of ConfigureServiceBus
    private void ConfigureServiceBus()
    {
        FilteredSubscripionLiveName = this.GetHashCode().ToString() + "live";
        FilteredSubscripionStreamInsightName = this.GetHashCode().ToString() + "si";
        serviceUri = ServiceBusEnvironment.CreateServiceUri("sb", serviceNamespace, string.Empty);
        tokenProvider = TokenProvider.CreateSharedSecretTokenProvider(issuerName, issuerKey);
        NamespaceManagerSettings nms = new NamespaceManagerSettings();
        nms.TokenProvider = tokenProvider;
        ns = new NamespaceManager(serviceUri, nms);
        MessagingFactorySettings mfs = new MessagingFactorySettings();
        mfs.TokenProvider = tokenProvider;
        int retryCount = 3;
        do
        {
            try
            {
                messagingFactory = MessagingFactory.Create(serviceUri, mfs);

                // Subscription for live feed 
                ns.CreateSubscription(topicName, FilteredSubscripionLiveName, new FalseFilter());
                subClientLF = messagingFactory.CreateSubscriptionClient(topicName, FilteredSubscripionLiveName, ReceiveMode.ReceiveAndDelete);
                ClearSubscriptionRules(subClientLF);

                // Subscription for Stream Insight
                ns.CreateSubscription(topicName, FilteredSubscripionStreamInsightName, new FalseFilter());
                subClientSI = messagingFactory.CreateSubscriptionClient(topicName, FilteredSubscripionStreamInsightName, ReceiveMode.ReceiveAndDelete);
                ClearSubscriptionRules(subClientSI);
                break;
            }
            catch (Exception) { }
        } while (retryCount-- > 0);

        // Start the stream insight engine 
        StartStreamInsight(FilteredSubscripionStreamInsightName);
    }

GetSubscriptionMessages() is called by ProcessMessagesfromTopic. The method sits in a loop testing to see if the users filter has changed and if it has it will dynamically update the SQLFilterExpression for both subscriptions (removing the existing rules in the process). If not it queries the Topic for the next Tweet available and passes it back to the user over the web socket using SendMessage(). The GetFilterString is below as well. It dynamically creates the query for the SQLFilterExpression for the Stream Insight client and the Live Feed client.

Receive Tweets from the Service Bus Topic and Send them over the Web Socket.


// Pulls messages from a filtered subscription 
public void GetSubscriptionMessages()
{
    while (true)
    {
        if(FilterChanged == true)
        {
            FilterChanged = false;
            String SQLFilterExpression = SBHelpers.GetFilterString(userFilter);
            ClearSubscriptionRules(subClientLF);
            ClearSubscriptionRules(subClientSI);
            if (SQLFilterExpression != String.Empty)
            {
                RuleDescription subRule = new RuleDescription("subscriptionRule", new SqlFilter(SQLFilterExpression));
                subClientLF.AddRule(subRule);
                subClientSI.AddRule(subRule);
            }
        }
        String lfResponse = GetIncomingTweet();
        if(lfResponse != String.Empty)
            SendMessage(lfResponse);
    }
}

// Clears existing rules from subscription 
private void ClearSubscriptionRules(SubscriptionClient subClient)
{
    foreach (var r in ns.GetRules(topicName, subClient.Name))
    {
        subClient.RemoveRule(r.Name);
    }
}
public static String GetFilterString(TwitterFilter filter)
{
    String subRules = String.Empty;
    if (filter.microsoft)
       subRules = " LowerText like '%#microsoft%' OR ";
    if (filter.windows)
       subRules += " LowerText Like '%#windows%' OR ";
    if (filter.windows8)
       subRules += " LowerText Like '%#win8%' OR ";
    if (filter.azure)
       subRules += " LowerText Like '%#azure%' OR ";
    if (filter.wp7)
       subRules += " LowerText Like '%#wp7%' OR ";
    if (filter.build)
       subRules += " LowerText Like '%#bldwin%' OR ";
    if (filter.technology)
       subRules += " LowerText Like '%#technology%' OR ";
    if (filter.ie)
       subRules += " LowerText Like '%#ie%' OR ";
    if (filter.xbox)
       subRules += " LowerText Like '%#xbox%' OR ";
    if (filter.mobile)
       subRules += " LowerText Like '%#mobile%' OR ";
    if (filter.webdesign)
       subRules += " LowerText Like '%#webdesign%' OR ";
    if (filter.build)
       subRules += " LowerText Like '%#build%' OR ";
    if (subRules.EndsWith(" OR "))
       subRules = subRules.Substring(0, subRules.Length - 4);
    return subRules;
}

GetIncomingTweet waits for a short period of time looking for Tweets. Once it receives one it serializes it into a JSON object to be processed by the browser client.

Pull Tweet of the Topic and pass the JSON serialized version back.

// Read from our subscription to the TwitterTopic to look for the next tweet 
public string GetIncomingTweet()
{
    String jsonTweet = String.Empty;
    try
    {
        BrokeredMessage message = subClientLF.Receive(TimeSpan.FromSeconds(3));
        if (message != null)
        {
            var obj = message.GetBody<SearchResult>();
            jsonTweet = jsonSerializer.Serialize(obj);
        }
    }
    catch (Exception)
    {
        // Client exited while we were still processing messages 
        return String.Empty;
    }
    return jsonTweet;
}

Stream Insight is configured in a method called StartStreamInsight. We have an adapter for Stream Insight in the project to read events form the Service Bus Topic that allows SI to calculate data on the events we pass in. The query it is executing is the Linq query below that basically tells it to calculate how many times each hash tag has been tweeted in the last 60 seconds and to recalulate that value every 1 second. Very Powerful!

Stream Insight Linq Query

//Create the Query template from the Event Stream
var result = from e in inputStream.Split(e => e.Text, "#")
                group e by e.ToLower() into g
                from win in g.HoppingWindow(TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(1))
                select new SearchResultsData
                {
                    Count = win.Count(),
                    Key = g.Key
                };

Finally, the method to send the data back to the client over the web socket.

Using socket.SendAsync to send data over the web socket.

public async Task SendMessage(string message)
{
  if (socket.State == WebSocketState.Open)
  {
      ArraySegment<byte> outputBuffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes(message));
      await socket.SendAsync(outputBuffer, WebSocketMessageType.Text, true, CancellationToken.None);
  }
}

If you would like to try this demo out for yourself I’ve uploaded the TwitterFeed project (Dev 10) and the Web Sockets Demo (Dev 11). Since this project uses Web Sockets it requires IIS 8 – tested with the developer build Windows Server 8 Developer Preview along with Visual Studio Developer Preview. Both of which can be downloaded from MSDN.

You will also need to install Stream Insight from http://www.microsoft.com/download/en/details.aspx?id=26720 download and install the 1033\x64\StreamInsight.msi instance. Use all defaults and ”StreamInsightInstance” as the instance name.