VB Magic

2012/11/07

Windows Azure, Service Bus Queue between Webrole and Worker role

In a previous post I showed Azure Storage Queues being used to send messages between a Webrole and a Worker role.

I’ve got back onto this project to learn more about MCV 4 and Windows Azure Service Bus Queues. So I modified the original classes to use the SB Queues instead of Azure Storage queues. Behind the scenes Entity Framework Code First is being used to access the database. I’m loving the ability to change the classes and then run update-database to modify the SQL Azure database. Back to the queues.

First you need to create a queue which this article can walk you you through.

Once the queue is created put the connection string into the Azure Service Configuration file.

Below is the base class for a Job for this application.

Imports System
Imports System.Collections.Generic
Imports System.Diagnostics
Imports System.Linq
Imports System.Net
Imports System.Threading
Imports Microsoft.ServiceBus
Imports Microsoft.ServiceBus.Messaging
Imports Microsoft.WindowsAzure
Imports Microsoft.WindowsAzure.Diagnostics
Imports Microsoft.WindowsAzure.ServiceRuntime
Imports Microsoft.WindowsAzure.Storage

Public MustInherit Class tjob


    ' Enable database access
    Protected Friend _db As New TyranntDb()
    ' A place holder for the user information
    Protected Friend _usr As Member
    ' setup variables to allow for access to Azure Queues
    Protected Friend _jobQueue As QueueClient

    ' A sample of a job entry in the Queue
    Private sample = <job type="email" userid="102">
                         <type/>
                     </job>

    ' the user ID which is used to pull the user information
    Private _userID As Int64

    ' Initialise the job from the XML String
    Public Sub New(jobStr As String)
        Dim jobXML As XElement = XElement.Parse(jobStr)
        _JobType = jobXML.@type
        Dim usrstr As String = jobXML.@userid

        Try
            UserID = Convert.ToInt64(usrstr)
        Catch ex As Exception
            ErrorMessage = ex.Message
            ReportError("tJob New Convert int64")
        End Try

    End Sub

    ' Create a blank job, this is used for creating a job to
    ' put onto the queue.
    Public Sub New()
        _JobType = ""
        _userID = -1
    End Sub

    ' Job type. Used to create the correct object.
    Public Property JobType As String

    ' The user ID. If this is being set then it
    ' will look up the user from the database
    Public Property UserID As Integer
        Get
            Return _userID
        End Get
        Set(value As Integer)
            _userID = value
            If _userID > 0 Then
                GetUserDetails()
            End If
        End Set
    End Property

    ' This is the code that "Processes" the job. Each job type must
    ' implement this code.
    Public MustOverride Function Process() As Boolean

    ' A general variable for storing any errors that
    ' occur. If it's empty then no errors are assumed.
    Public Property ErrorMessage As String

    ' This will generate an XML element that describes the job.
    Public MustOverride Function ToXML() As XElement

    ' This will generate a string version of the XML
    ' which describes this job.
    Public Overrides Function ToString() As String
        Return ToXML.ToString
    End Function

    ' This routine will pull the user information from the
    ' database and store the user detals in the _usr object.
    Protected Friend Sub GetUserDetails()
        Dim q = From u In _db.Members
                Where u.ID = _userID
                Select u
        If q.Count > 0 Then
            _usr = q.Single
        End If
    End Sub

    ' If the job is being created. This function will add the job
    ' to the Azure Queue.
    Public Sub AddJobToQueue()
        ' Get the azure storage account object.
        _jobQueue = QueueClient.CreateFromConnectionString(CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString"), "standard_queue")
        Try
            ' Now add the job details to the queue.
            Dim msg As New BrokeredMessage(Me.ToString)
            _jobQueue.Send(msg)
        Catch ex As Exception
            _ErrorMessage = ex.Message
            ReportError("AddJobToQueue")
        End Try
    End Sub

    Public Sub ReportError(location As String)
        Dim err As New ErrorMessage
        err.ErrorTime = DateTime.Now
        err.Location = location
        err.Message = Me.ErrorMessage
        _db.ErrorMessages.Add(err)
        _db.SaveChanges()
    End Sub
End Class

As you can see, this class must be inherited by another class which will perform the actual job. The base class will handle getting a users information from the database represented by the TyranntDB context class. I’ve added an error reporting system which will store any errors into a table in the database. The job information is stored in an XML format (The message queues allow for 64kb message which should be plenty for our purposes), so the inheriting class must implement a ToXml method. Also the inheriting job class must know how to do the job it is intended to perform so it must implement a Process method as well. Finally the class must be able to add its self to the message queue. So this base class has an AddJobToQueue method which makes use of the ToXml method the inheriting class has implemented to generate the message body to be added onto the queue. The service bus Imports at the top of the class will expose the QueueClient and the BrokeredMessage which is all that is required to add a message to the queue. You can see in the code how simple it is.

Next we need to add a new class that is derived from this base class to add a new member to the database. Below is the code that achieves this.

Public Class tjNewUser
    Inherits tjob

    ' an example of a new user job
    Private sample = <job type="newuser" userid="-1">
                         <user name="{username}" email="{user)@{domain}">Full Name</user>
                     </job>

    ' Extra data required by this class
    Public Property userName As String
    Public Property email As String
    Public Property fullName As String

    Public Sub New(jobStr As String)
        ' initialise basic information
        MyBase.New(jobStr)
        Dim jobXML As XElement = XElement.Parse(jobStr)
        ' now initialise new user information
        userName = jobXML...<user>.@name
        email = jobXML...<user>.@email
        fullName = jobXML...<user>.Value
    End Sub

    Public Sub New()
        ' initialise the base information
        MyBase.New()
        JobType = "newuser"
        userName = ""
        email = ""
        fullName = ""
    End Sub

    ' Create the new user in the database
    Public Overrides Function Process() As Boolean
        ' first check to see if the user already exists

        Try
            Dim r = From m In _db.Members
                          Where m.MemberAlias = _userName
                          Select m

            If r.Count > 0 Then
                ' User already exists so do not continue
                ' return true in this case as request
                ' has been processed more than one.
                ErrorMessage = "User " & _userName & " Already exists"
                ReportError("tjNewUser Process")
                Return True
            End If
        Catch ex As Exception
            ErrorMessage = ex.Message
            ReportError("tjNewUser lynq query to get member ID")
        End Try

        ' create a new user
        Dim usr As New Member
        ' populate the generic information
        usr.EmailAddress = _email
        usr.Name = _fullName
        usr.MemberAlias = _userName
        usr.LastLogin = DateTime.Now
        ' now set the user group to be member
        Dim userType As MemberType

        Try
            userType = (From m In _db.MemberTypes
                     Where m.Name = "Member"
                     Select m).Single
        Catch ex As Exception
            ErrorMessage = ex.Message
            ReportError("tjNewUser Lynq query to get 'Member' member type")
            Return False
        End Try
        usr.Type = userType
        ' now save the user
        Try
            _db.Members.Add(usr)
            _db.SaveChanges()
        Catch ex As Exception
            ErrorMessage = ex.Message
            ReportError("tjNewUser Memebers.Add Save Changes")
            Return False
        End Try

        ' Now that the user was sucessfully created,
        ' generate a new user email job
        Dim jb As New tjEmail
        jb.EmailType = "NewAccount"
        jb.UserID = usr.ID
        ' Add the job to the Azure job queue
        jb.AddJobToQueue()
        If jb.ErrorMessage = "" Then
            Return True
        Else
            ErrorMessage = jb.ErrorMessage
            ReportError("tjNewUser Add Job to Queue produced error")
            Return False
        End If
    End Function

    Public Overrides Function ToXML() As XElement
        Return <job type="newuser" userid=<%= UserID %>>
                   <user name=<%= _userName %> email=<%= _email %>><%= _fullName %></user>
               </job>
    End Function

End Class

As you can see, this class has overrode the ToXML function and the Process Function which has the code that actually adds the user to the Members database. In the last part of the process function it creates a new job which will be used to send an email to the user about their newly created account. This job class is shown below.

Imports System.Net.Mail
Imports Microsoft.WindowsAzure

Public Class tjEmail
    Inherits tjob

    ' a sample email job
    Private sample = <job type="email" userid="102">
                         <email from="noreply@tyranntrpg.org" type="newuser"/>
                     </job>

    ' setup extra information required by this job
    Private _from As String
    Private _emailType As String

    ' The is the from email address
    Public WriteOnly Property From As String
        Set(value As String)
            _from = value
        End Set
    End Property

    ' This will be the email type e.g. newuser
    Public WriteOnly Property EmailType As String
        Set(value As String)
            _emailType = value
        End Set
    End Property

    ' If the job XML already exists this will set up
    ' the information automatically
    Public Sub New(jobStr As String)
        MyBase.new(jobStr)
        Dim jobXML As XElement = XElement.Parse(jobStr)
        _from = jobXML...<email>.@from
        _emailType = jobXML...<email>.@type
    End Sub

    ' Create an empty email job if creating a new job
    Public Sub New()
        MyBase.New()
        JobType = "email"
        _from = "noreply@tyranntrpg.org"
        _emailType = ""
    End Sub

    '' Send the email
    Public Overrides Function Process() As Boolean
        Dim email As MailMessage
        ' Generate the correct body of the email
        Select Case _emailType
            Case "NewAccount"
                email = GenerateNewUserEmail()
            Case Else
                ErrorMessage = String.Format("Email Type [{0}] not recognised", _emailType)
                ReportError("tjEmail Process")
                Return False
        End Select

        Dim smtp = New SmtpClient(CloudConfigurationManager.GetSetting("SMTPAddress"), Integer.Parse(CloudConfigurationManager.GetSetting("SMTPPort")))
        smtp.Credentials = New Net.NetworkCredential(CloudConfigurationManager.GetSetting("SMTPUser"), CloudConfigurationManager.GetSetting("SMTPPassword"))
        smtp.EnableSsl = True

        Try

            smtp.Send(email)

        Catch ex As Exception
            Me.ErrorMessage = ex.Message
            ReportError("tjEmail Send()")
            Return False
        End Try
        Return True
    End Function

    ' This will generate the subject and body of the newuser email
    Private Function GenerateNewUserEmail() As MailMessage
        If _usr Is Nothing Then
            ErrorMessage = "_usr is null"
            ReportError("GenerateNewUserEmail()")
            Return Nothing
        End If
        Dim email As New MailMessage(_from, _usr.EmailAddress)
        Dim subject As String = ""
        Dim body As String = ""
        Try
            Dim emailMsg = (From e In _db.EmailMessages
                            Where e.Name = "NewAccount"
                            Select e).Single
            subject = emailMsg.Subject
            body = String.Format(emailMsg.Body, _usr.Name)
        Catch ex As Exception
            ErrorMessage = ex.Message
            ReportError("GenerateNewUserEmail(), Lynq Query to _db.EmailMessages")
            Return Nothing
        End Try
        ErrorMessage = body
        email.Subject = subject
        email.Body = body
        Return email
    End Function

    Public Overrides Function ToXML() As XElement
        Return <job type="email" userid=<%= UserID %>>
                   <email from=<%= _from %> type=<%= _emailType %>/>
               </job>
    End Function

    Private Sub Smtp_SendCompleted(sender As Object, e As ComponentModel.AsyncCompletedEventArgs)
        If e.Error Is Nothing Then
            ErrorMessage = "Mail sent correctly"
            Me.ReportError("tjEmail SendCompleted")
        Else
            ErrorMessage = e.Error.Message
            Me.ReportError("tjEmail SendCompleted")
        End If
    End Sub

End Class

All the SMTP server information is held in the azure service configuration file. Again the overrode functions do all the work in this class.

Now onto the back end. This is uses the Worker Role With Service Bus Queue template (Shown Below)
New Worker Role Dialogue

As the job classes actually do all the work required, this is a very small piece of code.

Imports System
Imports System.Collections.Generic
Imports System.Diagnostics
Imports System.Linq
Imports System.Net
Imports System.Threading
Imports Microsoft.ServiceBus
Imports Microsoft.ServiceBus.Messaging
Imports Microsoft.WindowsAzure
Imports Microsoft.WindowsAzure.Diagnostics
Imports Microsoft.WindowsAzure.ServiceRuntime
Imports Microsoft.WindowsAzure.Storage
Imports Tyrannt.Domain

Public Class WorkerRole
    Inherits RoleEntryPoint

    ' The name of your queue
    Const QueueName As String = "standard_queue"

    Private _db As New TyranntDb

    ' QueueClient is Thread-safe. Recommended that you cache 
    ' rather than recreating it on every request
    Dim Client As QueueClient
    Dim IsStopped As Boolean

    Public Overrides Sub Run()

        ' This is a sample implementation for Tyrannt.Backoffice. Replace with your logic.

        While (Not IsStopped)
            Try
                ' Receive the message
                Dim receivedMessage As BrokeredMessage = Client.Receive()

                If (Not receivedMessage Is Nothing) Then
                    ' Process the message
                    Trace.WriteLine("Procesing", receivedMessage.SequenceNumber.ToString())
                    ProcessMessage(receivedMessage)
                    receivedMessage.Complete()
                End If
            Catch ex As MessagingException
                If (Not ex.IsTransient) Then
                    Trace.WriteLine(ex.Message)
                    Throw ex
                End If
                Thread.Sleep(10000)
            Catch ex As OperationCanceledException
                If (Not IsStopped) Then
                    Trace.WriteLine(ex.Message)
                    Throw ex
                End If
            End Try
        End While

    End Sub

    Private Sub ProcessMessage(msg As BrokeredMessage)

        Dim msgBody As String = "msgBody not available"
        Dim errMsg As String = ""
        Try
            msgBody = msg.GetBody(Of String)()
            ' Turn the message into an XML element
            Dim xmlMsg As XElement = XElement.Parse(msgBody)
            ' Extract the message type from the element
            Dim type As String = xmlMsg.@type

            ' Now we create a job
            Dim job As tjob
            'ReportError("Processing job [" & type & "]", "WorkerRole.vb")
            Select Case type
                ' Use the message type to see what kind of job is required
                Case "newuser"
                    job = New tjNewUser(xmlMsg.ToString)
                Case "email"
                    job = New tjEmail(xmlMsg.ToString)
                Case Else
                    ReportError("job type [" + type + "] Not recognised", "WorkerRole.ProcessMessage()")
                    Exit Sub
            End Select
            ' Process the job.
            If job.Process() = True Then
                ' The job succeeded so write a trace message to say this and
                ' delete the message from the queue.
                Trace.WriteLine(String.Format("{0} succeeded", type), "Information")
            Else
                ' The job failed so write a trace error message saying why the job failed.
                ' This will leave the job on the queue to be processed again.
                Trace.WriteLine(String.Format("{0} failed: {1} ", type, job.ErrorMessage), "Error")
            End If
        Catch ex As Exception
            ' something big has gone wrong so write this out as an error trace message.
            Trace.WriteLine(String.Format("Failed to parse xml message: [{0}]", msgBody, "Error"))
            ReportError(ex.Message, "WorkerRole.ProcessMessage()")
            Exit Sub
        End Try
    End Sub

    Public Overrides Function OnStart() As Boolean

        ' Set the maximum number of concurrent connections 
        ServicePointManager.DefaultConnectionLimit = 12

        ' Create the queue if it does not exist already
        Dim connectionString As String = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString")
        Dim namespaceManager As NamespaceManager = namespaceManager.CreateFromConnectionString(connectionString)
        If (Not namespaceManager.QueueExists(QueueName)) Then
            namespaceManager.CreateQueue(QueueName)
        End If

        ' Get a client to use the queue
        Client = QueueClient.CreateFromConnectionString(connectionString, QueueName)
        IsStopped = False
        Return MyBase.OnStart()

    End Function

    Public Overrides Sub OnStop()

        ' Close the connection to Service Bus Queue
        IsStopped = True
        Client.Close()
        MyBase.OnStop()

    End Sub

    Private Sub ReportError(msg As String, location As String)
        Dim err As New ErrorMessage
        err.ErrorTime = DateTime.Now
        err.Location = location
        err.Message = msg
        _db.ErrorMessages.Add(err)
        _db.SaveChanges()
    End Sub

End Class

I’ve added the ability to access the database to this class only for error reporting which helps tracking down any issues during development. The only subroutine aside from the error routines is the ProcessMessage. This function will get the message body. Work out what type of message it is and then generate the correct class and run the Process method. (Note: You can only use the GetBody method once, if you try more than once it will crash out.) In this example I complete the received message even if it fails to process (Failure gets added to the error table). In future I may end up doing more elaborate things.

I hope some people find this helpful.

Leave a Comment »

No comments yet.

RSS feed for comments on this post. TrackBack URI

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Blog at WordPress.com.

%d bloggers like this: