In a previous post I showed Azure Storage Queues being used to send messages between a Webrole and a Worker role.
I’ve got back onto this project to learn more about MCV 4 and Windows Azure Service Bus Queues. So I modified the original classes to use the SB Queues instead of Azure Storage queues. Behind the scenes Entity Framework Code First is being used to access the database. I’m loving the ability to change the classes and then run update-database to modify the SQL Azure database. Back to the queues.
First you need to create a queue which this article can walk you you through.
Once the queue is created put the connection string into the Azure Service Configuration file.
Below is the base class for a Job for this application.
Imports System
Imports System.Collections.Generic
Imports System.Diagnostics
Imports System.Linq
Imports System.Net
Imports System.Threading
Imports Microsoft.ServiceBus
Imports Microsoft.ServiceBus.Messaging
Imports Microsoft.WindowsAzure
Imports Microsoft.WindowsAzure.Diagnostics
Imports Microsoft.WindowsAzure.ServiceRuntime
Imports Microsoft.WindowsAzure.Storage
Public MustInherit Class tjob
' Enable database access
Protected Friend _db As New TyranntDb()
' A place holder for the user information
Protected Friend _usr As Member
' setup variables to allow for access to Azure Queues
Protected Friend _jobQueue As QueueClient
' A sample of a job entry in the Queue
Private sample = <job type="email" userid="102">
<type/>
</job>
' the user ID which is used to pull the user information
Private _userID As Int64
' Initialise the job from the XML String
Public Sub New(jobStr As String)
Dim jobXML As XElement = XElement.Parse(jobStr)
_JobType = jobXML.@type
Dim usrstr As String = jobXML.@userid
Try
UserID = Convert.ToInt64(usrstr)
Catch ex As Exception
ErrorMessage = ex.Message
ReportError("tJob New Convert int64")
End Try
End Sub
' Create a blank job, this is used for creating a job to
' put onto the queue.
Public Sub New()
_JobType = ""
_userID = -1
End Sub
' Job type. Used to create the correct object.
Public Property JobType As String
' The user ID. If this is being set then it
' will look up the user from the database
Public Property UserID As Integer
Get
Return _userID
End Get
Set(value As Integer)
_userID = value
If _userID > 0 Then
GetUserDetails()
End If
End Set
End Property
' This is the code that "Processes" the job. Each job type must
' implement this code.
Public MustOverride Function Process() As Boolean
' A general variable for storing any errors that
' occur. If it's empty then no errors are assumed.
Public Property ErrorMessage As String
' This will generate an XML element that describes the job.
Public MustOverride Function ToXML() As XElement
' This will generate a string version of the XML
' which describes this job.
Public Overrides Function ToString() As String
Return ToXML.ToString
End Function
' This routine will pull the user information from the
' database and store the user detals in the _usr object.
Protected Friend Sub GetUserDetails()
Dim q = From u In _db.Members
Where u.ID = _userID
Select u
If q.Count > 0 Then
_usr = q.Single
End If
End Sub
' If the job is being created. This function will add the job
' to the Azure Queue.
Public Sub AddJobToQueue()
' Get the azure storage account object.
_jobQueue = QueueClient.CreateFromConnectionString(CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString"), "standard_queue")
Try
' Now add the job details to the queue.
Dim msg As New BrokeredMessage(Me.ToString)
_jobQueue.Send(msg)
Catch ex As Exception
_ErrorMessage = ex.Message
ReportError("AddJobToQueue")
End Try
End Sub
Public Sub ReportError(location As String)
Dim err As New ErrorMessage
err.ErrorTime = DateTime.Now
err.Location = location
err.Message = Me.ErrorMessage
_db.ErrorMessages.Add(err)
_db.SaveChanges()
End Sub
End Class
As you can see, this class must be inherited by another class which will perform the actual job. The base class will handle getting a users information from the database represented by the TyranntDB context class. I’ve added an error reporting system which will store any errors into a table in the database. The job information is stored in an XML format (The message queues allow for 64kb message which should be plenty for our purposes), so the inheriting class must implement a ToXml method. Also the inheriting job class must know how to do the job it is intended to perform so it must implement a Process method as well. Finally the class must be able to add its self to the message queue. So this base class has an AddJobToQueue method which makes use of the ToXml method the inheriting class has implemented to generate the message body to be added onto the queue. The service bus Imports at the top of the class will expose the QueueClient and the BrokeredMessage which is all that is required to add a message to the queue. You can see in the code how simple it is.
Next we need to add a new class that is derived from this base class to add a new member to the database. Below is the code that achieves this.
Public Class tjNewUser
Inherits tjob
' an example of a new user job
Private sample = <job type="newuser" userid="-1">
<user name="{username}" email="{user)@{domain}">Full Name</user>
</job>
' Extra data required by this class
Public Property userName As String
Public Property email As String
Public Property fullName As String
Public Sub New(jobStr As String)
' initialise basic information
MyBase.New(jobStr)
Dim jobXML As XElement = XElement.Parse(jobStr)
' now initialise new user information
userName = jobXML...<user>.@name
email = jobXML...<user>.@email
fullName = jobXML...<user>.Value
End Sub
Public Sub New()
' initialise the base information
MyBase.New()
JobType = "newuser"
userName = ""
email = ""
fullName = ""
End Sub
' Create the new user in the database
Public Overrides Function Process() As Boolean
' first check to see if the user already exists
Try
Dim r = From m In _db.Members
Where m.MemberAlias = _userName
Select m
If r.Count > 0 Then
' User already exists so do not continue
' return true in this case as request
' has been processed more than one.
ErrorMessage = "User " & _userName & " Already exists"
ReportError("tjNewUser Process")
Return True
End If
Catch ex As Exception
ErrorMessage = ex.Message
ReportError("tjNewUser lynq query to get member ID")
End Try
' create a new user
Dim usr As New Member
' populate the generic information
usr.EmailAddress = _email
usr.Name = _fullName
usr.MemberAlias = _userName
usr.LastLogin = DateTime.Now
' now set the user group to be member
Dim userType As MemberType
Try
userType = (From m In _db.MemberTypes
Where m.Name = "Member"
Select m).Single
Catch ex As Exception
ErrorMessage = ex.Message
ReportError("tjNewUser Lynq query to get 'Member' member type")
Return False
End Try
usr.Type = userType
' now save the user
Try
_db.Members.Add(usr)
_db.SaveChanges()
Catch ex As Exception
ErrorMessage = ex.Message
ReportError("tjNewUser Memebers.Add Save Changes")
Return False
End Try
' Now that the user was sucessfully created,
' generate a new user email job
Dim jb As New tjEmail
jb.EmailType = "NewAccount"
jb.UserID = usr.ID
' Add the job to the Azure job queue
jb.AddJobToQueue()
If jb.ErrorMessage = "" Then
Return True
Else
ErrorMessage = jb.ErrorMessage
ReportError("tjNewUser Add Job to Queue produced error")
Return False
End If
End Function
Public Overrides Function ToXML() As XElement
Return <job type="newuser" userid=<%= UserID %>>
<user name=<%= _userName %> email=<%= _email %>><%= _fullName %></user>
</job>
End Function
End Class
As you can see, this class has overrode the ToXML function and the Process Function which has the code that actually adds the user to the Members database. In the last part of the process function it creates a new job which will be used to send an email to the user about their newly created account. This job class is shown below.
Imports System.Net.Mail
Imports Microsoft.WindowsAzure
Public Class tjEmail
Inherits tjob
' a sample email job
Private sample = <job type="email" userid="102">
<email from="noreply@tyranntrpg.org" type="newuser"/>
</job>
' setup extra information required by this job
Private _from As String
Private _emailType As String
' The is the from email address
Public WriteOnly Property From As String
Set(value As String)
_from = value
End Set
End Property
' This will be the email type e.g. newuser
Public WriteOnly Property EmailType As String
Set(value As String)
_emailType = value
End Set
End Property
' If the job XML already exists this will set up
' the information automatically
Public Sub New(jobStr As String)
MyBase.new(jobStr)
Dim jobXML As XElement = XElement.Parse(jobStr)
_from = jobXML...<email>.@from
_emailType = jobXML...<email>.@type
End Sub
' Create an empty email job if creating a new job
Public Sub New()
MyBase.New()
JobType = "email"
_from = "noreply@tyranntrpg.org"
_emailType = ""
End Sub
'' Send the email
Public Overrides Function Process() As Boolean
Dim email As MailMessage
' Generate the correct body of the email
Select Case _emailType
Case "NewAccount"
email = GenerateNewUserEmail()
Case Else
ErrorMessage = String.Format("Email Type [{0}] not recognised", _emailType)
ReportError("tjEmail Process")
Return False
End Select
Dim smtp = New SmtpClient(CloudConfigurationManager.GetSetting("SMTPAddress"), Integer.Parse(CloudConfigurationManager.GetSetting("SMTPPort")))
smtp.Credentials = New Net.NetworkCredential(CloudConfigurationManager.GetSetting("SMTPUser"), CloudConfigurationManager.GetSetting("SMTPPassword"))
smtp.EnableSsl = True
Try
smtp.Send(email)
Catch ex As Exception
Me.ErrorMessage = ex.Message
ReportError("tjEmail Send()")
Return False
End Try
Return True
End Function
' This will generate the subject and body of the newuser email
Private Function GenerateNewUserEmail() As MailMessage
If _usr Is Nothing Then
ErrorMessage = "_usr is null"
ReportError("GenerateNewUserEmail()")
Return Nothing
End If
Dim email As New MailMessage(_from, _usr.EmailAddress)
Dim subject As String = ""
Dim body As String = ""
Try
Dim emailMsg = (From e In _db.EmailMessages
Where e.Name = "NewAccount"
Select e).Single
subject = emailMsg.Subject
body = String.Format(emailMsg.Body, _usr.Name)
Catch ex As Exception
ErrorMessage = ex.Message
ReportError("GenerateNewUserEmail(), Lynq Query to _db.EmailMessages")
Return Nothing
End Try
ErrorMessage = body
email.Subject = subject
email.Body = body
Return email
End Function
Public Overrides Function ToXML() As XElement
Return <job type="email" userid=<%= UserID %>>
<email from=<%= _from %> type=<%= _emailType %>/>
</job>
End Function
Private Sub Smtp_SendCompleted(sender As Object, e As ComponentModel.AsyncCompletedEventArgs)
If e.Error Is Nothing Then
ErrorMessage = "Mail sent correctly"
Me.ReportError("tjEmail SendCompleted")
Else
ErrorMessage = e.Error.Message
Me.ReportError("tjEmail SendCompleted")
End If
End Sub
End Class
All the SMTP server information is held in the azure service configuration file. Again the overrode functions do all the work in this class.
Now onto the back end. This is uses the Worker Role With Service Bus Queue template (Shown Below)

As the job classes actually do all the work required, this is a very small piece of code.
Imports System
Imports System.Collections.Generic
Imports System.Diagnostics
Imports System.Linq
Imports System.Net
Imports System.Threading
Imports Microsoft.ServiceBus
Imports Microsoft.ServiceBus.Messaging
Imports Microsoft.WindowsAzure
Imports Microsoft.WindowsAzure.Diagnostics
Imports Microsoft.WindowsAzure.ServiceRuntime
Imports Microsoft.WindowsAzure.Storage
Imports Tyrannt.Domain
Public Class WorkerRole
Inherits RoleEntryPoint
' The name of your queue
Const QueueName As String = "standard_queue"
Private _db As New TyranntDb
' QueueClient is Thread-safe. Recommended that you cache
' rather than recreating it on every request
Dim Client As QueueClient
Dim IsStopped As Boolean
Public Overrides Sub Run()
' This is a sample implementation for Tyrannt.Backoffice. Replace with your logic.
While (Not IsStopped)
Try
' Receive the message
Dim receivedMessage As BrokeredMessage = Client.Receive()
If (Not receivedMessage Is Nothing) Then
' Process the message
Trace.WriteLine("Procesing", receivedMessage.SequenceNumber.ToString())
ProcessMessage(receivedMessage)
receivedMessage.Complete()
End If
Catch ex As MessagingException
If (Not ex.IsTransient) Then
Trace.WriteLine(ex.Message)
Throw ex
End If
Thread.Sleep(10000)
Catch ex As OperationCanceledException
If (Not IsStopped) Then
Trace.WriteLine(ex.Message)
Throw ex
End If
End Try
End While
End Sub
Private Sub ProcessMessage(msg As BrokeredMessage)
Dim msgBody As String = "msgBody not available"
Dim errMsg As String = ""
Try
msgBody = msg.GetBody(Of String)()
' Turn the message into an XML element
Dim xmlMsg As XElement = XElement.Parse(msgBody)
' Extract the message type from the element
Dim type As String = xmlMsg.@type
' Now we create a job
Dim job As tjob
'ReportError("Processing job [" & type & "]", "WorkerRole.vb")
Select Case type
' Use the message type to see what kind of job is required
Case "newuser"
job = New tjNewUser(xmlMsg.ToString)
Case "email"
job = New tjEmail(xmlMsg.ToString)
Case Else
ReportError("job type [" + type + "] Not recognised", "WorkerRole.ProcessMessage()")
Exit Sub
End Select
' Process the job.
If job.Process() = True Then
' The job succeeded so write a trace message to say this and
' delete the message from the queue.
Trace.WriteLine(String.Format("{0} succeeded", type), "Information")
Else
' The job failed so write a trace error message saying why the job failed.
' This will leave the job on the queue to be processed again.
Trace.WriteLine(String.Format("{0} failed: {1} ", type, job.ErrorMessage), "Error")
End If
Catch ex As Exception
' something big has gone wrong so write this out as an error trace message.
Trace.WriteLine(String.Format("Failed to parse xml message: [{0}]", msgBody, "Error"))
ReportError(ex.Message, "WorkerRole.ProcessMessage()")
Exit Sub
End Try
End Sub
Public Overrides Function OnStart() As Boolean
' Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12
' Create the queue if it does not exist already
Dim connectionString As String = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString")
Dim namespaceManager As NamespaceManager = namespaceManager.CreateFromConnectionString(connectionString)
If (Not namespaceManager.QueueExists(QueueName)) Then
namespaceManager.CreateQueue(QueueName)
End If
' Get a client to use the queue
Client = QueueClient.CreateFromConnectionString(connectionString, QueueName)
IsStopped = False
Return MyBase.OnStart()
End Function
Public Overrides Sub OnStop()
' Close the connection to Service Bus Queue
IsStopped = True
Client.Close()
MyBase.OnStop()
End Sub
Private Sub ReportError(msg As String, location As String)
Dim err As New ErrorMessage
err.ErrorTime = DateTime.Now
err.Location = location
err.Message = msg
_db.ErrorMessages.Add(err)
_db.SaveChanges()
End Sub
End Class
I’ve added the ability to access the database to this class only for error reporting which helps tracking down any issues during development. The only subroutine aside from the error routines is the ProcessMessage. This function will get the message body. Work out what type of message it is and then generate the correct class and run the Process method. (Note: You can only use the GetBody method once, if you try more than once it will crash out.) In this example I complete the received message even if it fails to process (Failure gets added to the error table). In future I may end up doing more elaborate things.
I hope some people find this helpful.