Windows Azure AppFabric Queues with Native C++

I’ve been playing around with writing native C++ lately. As a proof of concept I wrote a short sample that demonstrates how to access the Windows Azure AppFabric v2 Queues (topics coming soon!).

For a little context see the following:
Interview with AppFabric Team regarding Queues and Topics and
Service Bus Messaging Features

The sample below uses WinHTTP as a very basic REST client and performs the basic operations of creating a queue and writing messages to it. It could easily be extended to do some of the more advanced messaging functionality in the v2 release.

// QueueClient.h
#pragma once
#include <string>
#include "WinHTTPREST.h"
using namespace std;

struct MessageProperties
{
	string Label;
	string MessageBody;
};

class QueueClient
{
private:
	WinHTTPREST * _rest;
	string _serviceAddress;
	DWORD _httpResponseCode;
	string _httpResponse;
public:
	QueueClient(string serviceNameSpace);
	~QueueClient(void);
	BOOL CreateQueue(string queueName, string authHeader);
	BOOL DeleteItem(string itemName, string authHeader);
	BOOL SubmitMessageToQueue(string queueName, string authHeader, string label, string messageBody);
	string GetMessageFromQueue(string queueName, string authHeader);
	string GetHttpResponse();
	DWORD GetHTTPResponseCode();
};
// QueueClient.cpp
#include "StdAfx.h"
#include "QueueClient.h"
#include <vector>
#include <Windows.h>

QueueClient::QueueClient(string serviceNamespace)
{
	_serviceAddress = serviceNamespace + ".servicebus.appfabriclabs.com";
	_rest = new WinHTTPREST(_serviceAddress, TRUE);
}

QueueClient::~QueueClient(void)
{
	if(_rest != NULL)
		delete _rest;
}

BOOL QueueClient::CreateQueue(string queueName, string authHeader)
{
	// Build the XML to describe the queue
	// Other properties could be set here 
	// http://msdn.microsoft.com/en-us/library/gg278338.aspx#BKMK_REST5

	string putData = "<entry xmlns=\"http://www.w3.org/2005/Atom\"><title type=\"text\">" + queueName + "</title>"
				   + "<content type=\"application/xml\">"
				   + "<QueueDescription xmlns:i=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns=\"http://schemas.microsoft.com/netservices/2010/10/servicebus/connect\" />"
				   + "</content></entry>";

	vector<string> headers;
	headers.push_back("Authorization: " + authHeader);
	headers.push_back("Content-Type: application/atom+xml");
	BOOL bRequest = _rest->SendRequest("PUT", queueName, _httpResponseCode, _httpResponse, headers, putData);

	if(bRequest == FALSE || _httpResponseCode != 201)
		return FALSE;

	return TRUE;
}

// Reference for header/property mappings 
// Additional properties could be set through adding headers
// http://msdn.microsoft.com/en-us/library/gg278338.aspx
BOOL QueueClient::SubmitMessageToQueue(string queueName, string authHeader,string label, string messageBody)
{
	string sendAddress = queueName + "/Messages";
	vector<string> headers;
	headers.push_back("Authorization: " + authHeader);
	headers.push_back("X-MS-LABEL: " + label);
	BOOL bRequest = _rest->SendRequest("POST", sendAddress, _httpResponseCode, _httpResponse, headers, messageBody.c_str());
	if(bRequest == FALSE || _httpResponseCode != 201)
		return FALSE;
	return TRUE;
}

// Read and remove from the queue
string QueueClient::GetMessageFromQueue(string queueName, string authHeader)
{
	string receiveAddress = queueName + "/Messages/Head?timeout=30";
	vector<string> headers;
	headers.push_back("Authorization: " + authHeader);
	_rest->SendRequest("DELETE", receiveAddress, _httpResponseCode, _httpResponse, headers, "");
	return _httpResponse;
}

string QueueClient::GetHttpResponse()
{
	return _httpResponse;
}

DWORD QueueClient::GetHTTPResponseCode()
{
	return _httpResponseCode;
}

BOOL QueueClient::DeleteItem(string uri, string authHeader)
{
	string putData = "";
	vector<string> headers;
	headers.push_back("Authorization: " + authHeader);
	BOOL bRequest = _rest->SendRequest("DELETE", uri, _httpResponseCode, _httpResponse, headers, putData);
	if(bRequest == FALSE || _httpResponseCode != 200)
		return FALSE;
	return TRUE;
}

Here is the WinHTTP REST wrapper

// WinHTTPRest.h
#pragma once
#include <string>
#include <Windows.h>
#include <winhttp.h>
#include <string>
#include <sstream>
#include <iomanip>
#include <vector>
using namespace std;


// Helper class for REST
class WinHTTPREST
{
private:
	string _serverName;
	string _httpProtocol;
	HINTERNET _hSession;
	HINTERNET _hConnect;
	BOOL _secureSession;
	
	std::wstring StringToWString(const std::string& s);
	std::string WStringToString(const std::wstring& s);
	int hex2dec( char hex );
public:
	WinHTTPREST(string ServerName, BOOL SecureSession, string HTTPProtocol="HTTP/1.1");
	~WinHTTPREST(void);
	BOOL SendRequest(string Verb, string relativeUri, DWORD &dwStatusCode, string &httpResponse);
	BOOL SendRequest(string Verb, string relativeUri,DWORD &dwStatusCode, string &httpResponse,string PostData);
	BOOL SendRequest(string Verb, string relativeUri, DWORD &dwStatusCode, string &httpResponse, vector<string> headers, string PostData);
	std::string UriEncode(const std::string& input);
	std::string UriDecode(const std::string& sSrc);
};
// WinHTTPRest.cpp
#include "StdAfx.h"
#include "WinHTTPREST.h"
#include <vector>

WinHTTPREST::WinHTTPREST(string ServerName, BOOL SecureSession, string HTTPProtocol)
{
	_serverName = ServerName;
	_secureSession = SecureSession;
	_httpProtocol = HTTPProtocol;
	_hSession = WinHttpOpen(L"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)", 
                            WINHTTP_ACCESS_TYPE_DEFAULT_PROXY,
                            WINHTTP_NO_PROXY_NAME, 
                            WINHTTP_NO_PROXY_BYPASS, 0);
	
	// Specify an HTTP server.
	if( _hSession )
		_hConnect = WinHttpConnect( _hSession,
								   StringToWString(_serverName).c_str(),
								   INTERNET_DEFAULT_HTTPS_PORT,
								   0 );
}


WinHTTPREST::~WinHTTPREST(void)
{
	// Close open handles.
	if( _hSession ) 
		WinHttpCloseHandle( _hSession );
	if( _hConnect ) 
		WinHttpCloseHandle( _hConnect );
}


BOOL WinHTTPREST::SendRequest(string Verb, string relativeUri, DWORD &dwStatusCode, string &httpResponse)
{
	vector<string> noheaders;
	return SendRequest(Verb, relativeUri, dwStatusCode, httpResponse, noheaders, "");
}

BOOL WinHTTPREST::SendRequest(string Verb, string relativeUri,DWORD &dwStatusCode, string &httpResponse,string Data)
{
	vector<string> noheaders;
	return SendRequest(Verb, relativeUri, dwStatusCode, httpResponse, noheaders, Data);
}

BOOL WinHTTPREST::SendRequest(string Verb, string relativeUri, DWORD &dwStatusCode, string &httpResponse, vector<string> headers, string Data)
{
	HINTERNET hRequest = NULL;

	BOOL bResults = FALSE;
	if(_hConnect)
		hRequest = WinHttpOpenRequest(_hConnect, StringToWString(Verb).c_str(), StringToWString(relativeUri).c_str(), StringToWString(_httpProtocol.c_str()).c_str(), WINHTTP_NO_REFERER, WINHTTP_DEFAULT_ACCEPT_TYPES, WINHTTP_FLAG_SECURE );
	
	if(hRequest)
	{
		if(headers.size() > 0)
		{
			for(std::vector<string>::size_type i = 0; i != headers.size(); i++) 
			{
				WinHttpAddRequestHeaders( hRequest, 
										this->StringToWString(headers[i].c_str()).c_str(),
										headers[i].length(),
										WINHTTP_ADDREQ_FLAG_ADD );
			}

		}

		if((Verb == "PUT" || Verb == "POST") && Data != "")
		{
			bResults = WinHttpSendRequest(hRequest, WINHTTP_NO_ADDITIONAL_HEADERS, 0, NULL, 0, Data.length(), NULL );

			if(bResults)
			{
				DWORD dwBytesWritten = 0;
				bResults = WinHttpWriteData( hRequest, Data.c_str(), 
												Data.length(), 
												&dwBytesWritten);

			}
		}
		else if(Verb == "DELETE")
		{
			bResults = WinHttpSendRequest(hRequest, WINHTTP_NO_ADDITIONAL_HEADERS, 0, NULL, 0, 0, NULL );
		}
		else if(Verb == "GET")
		{
			bResults = WinHttpSendRequest(hRequest, WINHTTP_NO_ADDITIONAL_HEADERS, 0, WINHTTP_NO_REQUEST_DATA, 0, 0, NULL );
		}

	}

	if( bResults )
		bResults = WinHttpReceiveResponse( hRequest, NULL);

	if(bResults)
	{
		DWORD dwSize = sizeof(DWORD);
		bResults = WinHttpQueryHeaders( hRequest, 
							WINHTTP_QUERY_STATUS_CODE | WINHTTP_QUERY_FLAG_NUMBER,
							NULL, 
							&dwStatusCode,
							&dwSize,
							WINHTTP_NO_HEADER_INDEX );


	}

	if(bResults)
	{
		LPSTR pszOutBuffer = NULL;
		DWORD dwDownloaded = 0;
		DWORD dwSize = 0;

		// Keep checking for data until there is nothing left.
		do
		{
			// Check for available data.
			if (!WinHttpQueryDataAvailable( hRequest, &dwSize))
				printf("Error %u.\n", GetLastError());

			// Allocate space for the buffer.
			pszOutBuffer = (LPSTR) malloc (sizeof(LPSTR) * (dwSize+1));
			if (!pszOutBuffer)
			{
				printf("Out of memory\n");
				dwSize = 0;
			}
			else
			{
				// Read the Data.
				ZeroMemory(pszOutBuffer, dwSize+1);

				if (!WinHttpReadData( hRequest, (LPVOID)pszOutBuffer, dwSize, &dwDownloaded))
					printf( "Error %u in WinHttpReadData.\n", GetLastError());
				else
				{
					httpResponse.append(pszOutBuffer);
				}
				// Free the memory allocated to the buffer.
				free((void*) pszOutBuffer );
			}

		} while (dwSize > 0);
	}

	if( hRequest ) 
		WinHttpCloseHandle( hRequest );

	if(bResults == FALSE)
		printf("Error %u.\n", GetLastError());

	return bResults;
}


std::wstring WinHTTPREST::StringToWString(const std::string& s)
{
	std::wstring temp(s.length(),L' ');
	std::copy(s.begin(), s.end(), temp.begin());
	return temp; 
}


std::string WinHTTPREST::WStringToString(const std::wstring& s)
{
	std::string temp(s.length(), ' ');
	std::copy(s.begin(), s.end(), temp.begin());
	return temp; 
}

Of course, what fun is C++ without a bunch of helper functions to “tweak” strings?

These examples were “borrowed” from http://www.codeguru.com/cpp/cpp/string/conversions/article.php/c12759.

// UrlEncodingHelpers.h
// Uri encode and decode.
// RFC1630, RFC1738, RFC2396

#include <string>
#include <assert.h>

const char HEX2DEC[256] = 
{
    /*       0  1  2  3   4  5  6  7   8  9  A  B   C  D  E  F */
    /* 0 */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
    /* 1 */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
    /* 2 */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
    /* 3 */  0, 1, 2, 3,  4, 5, 6, 7,  8, 9,-1,-1, -1,-1,-1,-1,
    
    /* 4 */ -1,10,11,12, 13,14,15,-1, -1,-1,-1,-1, -1,-1,-1,-1,
    /* 5 */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
    /* 6 */ -1,10,11,12, 13,14,15,-1, -1,-1,-1,-1, -1,-1,-1,-1,
    /* 7 */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
    
    /* 8 */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
    /* 9 */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
    /* A */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
    /* B */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
    
    /* C */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
    /* D */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
    /* E */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
    /* F */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1
};
    
std::string UriDecode(const std::string & sSrc)
{
    // Note from RFC1630:  "Sequences which start with a percent sign
    // but are not followed by two hexadecimal characters (0-9, A-F) are reserved
    // for future extension"
    
    const unsigned char * pSrc = (const unsigned char *)sSrc.c_str();
	const int SRC_LEN = sSrc.length();
    const unsigned char * const SRC_END = pSrc + SRC_LEN;
    const unsigned char * const SRC_LAST_DEC = SRC_END - 2;   // last decodable '%' 

    char * const pStart = new char[SRC_LEN];
    char * pEnd = pStart;

    while (pSrc < SRC_LAST_DEC)
	{
		if (*pSrc == '%')
        {
            char dec1, dec2;
            if (-1 != (dec1 = HEX2DEC[*(pSrc + 1)])
                && -1 != (dec2 = HEX2DEC[*(pSrc + 2)]))
            {
                *pEnd++ = (dec1 << 4) + dec2;
                pSrc += 3;
                continue;
            }
        }

        *pEnd++ = *pSrc++;
	}

    // the last 2- chars
    while (pSrc < SRC_END)
        *pEnd++ = *pSrc++;

    std::string sResult(pStart, pEnd);
    delete [] pStart;
	return sResult;
}

// Only alphanum is safe.
const char SAFE[256] =
{
    /*      0 1 2 3  4 5 6 7  8 9 A B  C D E F */
    /* 0 */ 0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0,
    /* 1 */ 0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0,
    /* 2 */ 0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0,
    /* 3 */ 1,1,1,1, 1,1,1,1, 1,1,0,0, 0,0,0,0,
    
    /* 4 */ 0,1,1,1, 1,1,1,1, 1,1,1,1, 1,1,1,1,
    /* 5 */ 1,1,1,1, 1,1,1,1, 1,1,1,0, 0,0,0,0,
    /* 6 */ 0,1,1,1, 1,1,1,1, 1,1,1,1, 1,1,1,1,
    /* 7 */ 1,1,1,1, 1,1,1,1, 1,1,1,0, 0,0,0,0,
    
    /* 8 */ 0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0,
    /* 9 */ 0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0,
    /* A */ 0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0,
    /* B */ 0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0,
    
    /* C */ 0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0,
    /* D */ 0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0,
    /* E */ 0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0,
    /* F */ 0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0
};

std::string UriEncode(const std::string & sSrc)
{
    const char DEC2HEX[16 + 1] = "0123456789ABCDEF";
    const unsigned char * pSrc = (const unsigned char *)sSrc.c_str();
    const int SRC_LEN = sSrc.length();
    unsigned char * const pStart = new unsigned char[SRC_LEN * 3];
    unsigned char * pEnd = pStart;
    const unsigned char * const SRC_END = pSrc + SRC_LEN;

    for (; pSrc < SRC_END; ++pSrc)
	{
		if (SAFE[*pSrc]) 
            *pEnd++ = *pSrc;
        else
        {
            // escape this char
            *pEnd++ = '%';
            *pEnd++ = DEC2HEX[*pSrc >> 4];
            *pEnd++ = DEC2HEX[*pSrc & 0x0F];
        }
	}

    std::string sResult((char *)pStart, (char *)pEnd);
    delete [] pStart;
    return sResult;
}


void StringSplit(string str, string delim, vector<string> &results)
{
	int cutAt;
	while( (cutAt = str.find_first_of(delim)) != str.npos )
	{
		if(cutAt > 0)
		{
			results.push_back(str.substr(0,cutAt));
		}
		str = str.substr(cutAt+1);
	}

	if(str.length() > 0)
	{
		results.push_back(str);
	}

}


The helper class to enable authenticating with ACS

#pragma once
#include "WinHTTPREST.h"
#include <Windows.h>
#include <string>
using namespace std;
// ACSAuthHelper.h
class ACSAuthHelper
{
	string _serviceNamespace;
	string _issuerName;
	string _issuerPassword;
	WinHTTPREST * _rest;
	DWORD _httpResponseCode;
	string _httpResponse;
public:
	string GetHttpResponse();
	DWORD GetHTTPResponseCode();
	ACSAuthHelper(string serviceNamespace, string issuerName, string issuerPassword  );
	~ACSAuthHelper(void);
	BOOL GetAuthHeader(int &secondsUntilExpiration, string &AuthHeader);
};


#include "StdAfx.h"
#include "ACSAuthHelper.h"
#include "UrlEncodingHelpers.h"

ACSAuthHelper::ACSAuthHelper(string serviceNamespace, string issuerName, string issuerPassword )
{
	_serviceNamespace = serviceNamespace;
	string serviceAddress = _serviceNamespace + ".servicebus.appfabriclabs.com";
	_rest = new WinHTTPREST(serviceAddress, TRUE);
	_issuerName = issuerName;
	_issuerPassword = issuerPassword;
}


ACSAuthHelper::~ACSAuthHelper(void)
{
	if(_rest != NULL)
		delete _rest;
}


BOOL ACSAuthHelper::GetAuthHeader(int &secondsUntilExpiration, string &AuthHeader)
{
	BOOL rValue = FALSE;
	string relyingPartyAddress = "http://" + _serviceNamespace + ".servicebus.appfabriclabs.com";
	string acsBaseAddress = _serviceNamespace + "-sb.accesscontrol.appfabriclabs.com";
	string postData = "";
	
	WinHTTPREST acsRest(acsBaseAddress, TRUE);

	// Build the ACS Token Request
	postData = "wrap_scope=" + UriEncode(relyingPartyAddress) + "&wrap_name=" + UriEncode(_issuerName) + "&wrap_password=" + UriEncode(_issuerPassword);

	vector<string> headers;
	// Add the content type header 
	headers.push_back("Content-type: application/x-www-form-urlencoded");

	// Post the tokn request the WRAP uri
	BOOL result = acsRest.SendRequest("POST", "WRAPv0.9", _httpResponseCode, _httpResponse, headers, postData);

	// Parse the response. We want the authorization token and the expiration time
	vector<string> tmpResponse;
	StringSplit(_httpResponse, "&", tmpResponse);
	string acsToken = "";
	if(tmpResponse.size() > 1 && _httpResponseCode == 200)
	{
		acsToken = tmpResponse[0].replace(0, 18, ""); // replacing out wrap_access_token=
		secondsUntilExpiration = atoi(tmpResponse[1].replace(0, 29, "").c_str()); // wrap_access_token_expires_in=
		AuthHeader = "WRAP access_token=\"" + UriDecode(acsToken) + "\"";
		rValue = TRUE;
	}
	else
	{
		rValue = FALSE;
	}
	return rValue;
}

string ACSAuthHelper::GetHttpResponse()
{
	return _httpResponse;
}

DWORD ACSAuthHelper::GetHTTPResponseCode()
{
	return _httpResponseCode;
}

Finally, the runtime of the program that makes it all work.

// main.cpp: Defines the entry point for the console application.
//

#include "stdafx.h"
#include <Windows.h>
#include <winhttp.h>
#include <string>
#include <sstream>
#include <iomanip>
#include <time.h>
#include "WinHTTPREST.h"
#include "ACSAuthHelper.h"
#include "QueueClient.h"
using namespace std;




int _tmain(int argc, _TCHAR* argv[])
{
	int secondsUntilExpiration = 0;
	string authHeader = "";

	ACSAuthHelper acsAuth("yournamespace", "owner", "yourauthorizationcode");

	if(acsAuth.GetAuthHeader(secondsUntilExpiration, authHeader) == FALSE)
	{
		printf("Authentication to ACS Failed.\n");
		printf("Status Code: %d, HTTP Response: %s\n", acsAuth.GetHTTPResponseCode(), acsAuth.GetHttpResponse());
		return 0;
	}

	QueueClient qc("yournamespace");
	if(qc.CreateQueue("MyQueue", authHeader) == FALSE)
	{
		if(qc.GetHTTPResponseCode() != 409) // already exists
		{
			printf("Queue Creation Failed.\n");
			printf("Status Code: %d, HTTP Response: %s\n", qc.GetHTTPResponseCode(), qc.GetHttpResponse());
			return 0;
		}
	}

	for(int i=0; i < 5; i++)
	{
		if(qc.SubmitMessageToQueue("MyQueue", authHeader, "Hello World Label!", "Hello World Message Body") == FALSE)
		{
			printf("Submitting to Queue Failed.\n");
			printf("Status Code: %d, HTTP Response: %s\n", qc.GetHTTPResponseCode(), qc.GetHttpResponse());
		}
	}
	return 0;
}

To receive the messages you can use the GetMessageFromQueue() method in C++. Or from managed code you can use MessageReceiver/BrokeredMessage classes.
One thing to remember is we are NOT passing managed strings around. So to get the underlying data you will want to read from the stream directly.

public void ReceiveMessage()
{
    MessageReceiver receiver = _queueClient.CreateReceiver();
            
    BrokeredMessage receivedMessage = null;

    if (receiver.TryReceive(TimeSpan.FromSeconds(10), out receivedMessage))
    {
        receivedMessage.Complete();
        System.IO.Stream s = receivedMessage.GetBody<System.IO.Stream>();
        System.IO.StreamReader sr = new System.IO.StreamReader(s);
        String msg = sr.ReadToEnd();
        Console.WriteLine("Label: " + receivedMessage.Label + " Body: " + msg);
    }
    else
        Console.WriteLine("No Messages To Process");
}