MassTransit with Azure Service Bus

One of the great features of MassTransit is the abstraction it provides over the message transport. This allows you to forget about the plumbing and focus on distributed application design and development.

Now would be a great opportunity to take one of our past tutorials and convert it from RabbitMQ to Azure Service Bus. It will become very clear how easily you can switch from one message transport to another. The completed project for this tutorial can be found here under the azure-service-bus branch.

Tutorial

First, clone the master branch from this tutorial. Don’t worry if you don’t have RabbitMQ setup locally, because we will be switching the solution to use Azure Service Bus.

Create an Azure Service Bus

The first thing we need to do is create an AzureSB for this project. You can use a free trial, or if you have an MSDN License that works too. You’ll want to log into https://portal.azure.com. Once at the dashboard screen, follow these steps:

  1. Click New, and search for “azure service bus” or “service bus”. Then select it, and click “Create”.
    tutorial-req-resp_azure_1
  2. Within the Create window, you will want to enter settings:
    Name: <this will be your namespace>
    Pricing Tier: <Must be Standard or higher, MassTransit needs Topics and Queues>
    Resource group: <I made a new one, but you can choose existing if you have one>
    tutorial-req-resp_azure_2
  3. Wait a bit while it creates/deploys the Service Bus
    tutorial-req-resp_azure_3
  4. Once Created, you will want to click “Shared access policies”, Choose “RootManageSharedAccessKey” and then leave this window open, because we will need to copy the primary key values into our App.config and Web.config.

Update the Project Configs

Before we update our project IoC with the Azure SB transport, we need to add the Azure Key Name and Shared Access Key for the Service Bus we made in the previous section.

  1. Open your App.config in the StarterKit.Service project. Add the 4 lines with Azure configuration as follows:

    Line 2: This must match the namespace you used when creating the Azure Service Bus
    Line 3: This can be a path of your choosing, similar to RabbitMQ’s virtual host. A logical separation for different topics and queues within the same namespace
    Line 4: Leave this as is
    Line 5: Paste in your shared access key (continue reading to see how you obtain the Shared Access Key)
  2. Open your Service Bus in Azure Portal. Then click Settings -> Shared access policies -> Root manage shared access key. You can copy the Primary key (or secondary, it doesn’t matter) and paste it into your App.config.
    tutorial-req-resp_azure_4
  3. Now open Web.config in StarterKit.Web and add the same 4 lines, plus a new 5th line.

    Line 6: This should look familiar if you followed the original req-resp tutorial here. Because the RequestClient needs to connect to a specific endpoint (queue), we need to provide the full Uri that we can access the queue in Azure. The uri of queues can be determined fairly easily, here’s a quick explanation: sb://<namespace>.servicebus.windows.net/<path>/<queue_name>

Add MassTransit Azure Packages and Register with our Container

Great. Now we have our configuration settings saved and ready to use. Now all we have to do it change the container we register in our IoC of both the StarterKit.Service and StarterKit.Web projects. We don’t need to change a single line of business logic code in our app. Thank you MassTransit!

First, you will need to install the nuget package MassTransit.AzureServiceBus in both the StarterKit.Service and StarterKit.Web.Bootstrapper.

StarterKit.Service Changes

Once complete, create a new file in StarterKit.Service/Modules named AzureServiceBusModule.cs.

Paste the following in the file.

Line 15: This helper method from Microsoft.ServiceBus namespace lets us construct our service Uri. You will notice that this uri is the same as our queue in the previous section, sans /<queue_name>.
Line 19: This also uses a helper method to create the Token Provider for us, providing all the proper pieces of information.

Last but not least, head over to IocConfig.cs and change the line:

Done!

StarterKit.Web.Bootstrapper

Almost there. I guess you can probably imagine we now need to create a new file at StarterKit.Web.Bootstrapper/Modules/AzureServiceBusModule.cs. Paste in the following:

Again, same as previous, but we obviously don’t need to create the endpoint here. Instead our RequestClient will connect directly to the queue to read.

So… open up RequestClientModule.cs, and change this line:

And if you haven’t guessed it, last but not least, go to IocConfig.cs, and change:

 

All done. We didn’t touch any of our core code, we just setup the bus, wired it into our IoC and MassTransit handles the rest!

MassTransit Autofac Request and Response

Messages send with MassTransit typically are fire and forget. This is because distributed systems operate in different processes on different machines, giving it the benefit of horizontal scaling (with trade offs, but that’s another discussion entirely). However, the need may arise to use request + response in a distributed system. So this tutorial will show how this can be achieved with MassTransit.

If you have followed some of my other tutorials, you will know that I like to separate the RabbitMQ config with a virtual host and username specific to the tutorial. My config for this tutorial is:

This tutorial will simply show the main components required to set up a Request/Response with MassTransit. You can follow along with the completed project here.

*Update*: When I had drafted this, the MassTransit Documentation didn’t have an example for Request/Response. Now there is excellent documentation here. So I will reduce the scope of this tutorial to registering the IRequestClient<…> in an Ioc Framework (Autofac).

Tutorial

When using Publish + Subscribe, we typically get the IBus or IBusControl injected and use the .Publish or .Send methods to fire and forget. But for Request Response, we will use IRequestClient<…>. Open up RequestClientModule.cs. You will see:

Line 4: This is where our concrete type MessageRequestClient is constructed, which is one of the many value adds for MassTransit. It handles the plumbing of creating a RequestId and correlates messages accordingly.

You might be wondering why I used MessageRequestClient? Because in the official sample, a helper extension method is used instead. If we look at the MassTransit source, we will see the extension is just that, a helper:

So in our RequestClientModule.cs, you could replace the builder.Register with:

Then you would get the exact same behavior. Choose whatever method you prefer.

Now we take a quick peek in the Consumer. Open up CheckOrderStatusConsumer.cs and you will see:

And the important line here is context.Respond(…). This is the other part of the MassTransit value add plumbing. It makes sure that it responds and retains the RequestId. You can actually look and see what the RequestId is with context.RequestId .

There you have it, pretty simple, but powerful features provided by MassTransit ontop of whatever message transport (eg. RabbitMQ, AzureSB) you choose.

Enjoy!

MassTransit Send vs. Publish

MassTransit is capable of a lot, and one important feature is Send vs Publish. If you read the official documentation section “creating a contract“, it talks about commands (send) and events (publish). This makes sense, because commands you want to only be executed once, however events can be observed by one or more listeners.

This tutorial will look at what is happening with the two different message types. To follow along, download the completed project here. The only thing you will need to do is create a virtual host and user in RabbitMQ. Other tutorials for MassTransit typically use the default RabbitMQ guest/guest account, but I prefer to separate mine into virtual hosts. So open the solution and look in either of the three services App.config or the Mvc Web.config. You’ll see:

So browse to http://localhost:15672, login as admin and configure. I make the virtual host and username the same. Then choose a password you want. After it’s setup run the solution and try it out!

Tutorial

I started with the Mvc StarterKit and created 3 services. One service is for the events (in practice it could be 1..n). The other two services for commands have artificially fast and slow processing speeds. This was done to illustrate that although X quantity of commands are queued, they are only processed once (well, once or more to be honest, as RabbitMQ is a ‘at least once’ message broker).

Note #1: Commands and events are both usually defined as interfaces. Therefore using descriptive names helps discern commands from events.

Publish (Events)

Lets use a post office and mailbox to help describe publish. An event in MassTransit is akin to flyer’s that we receive in the mail. They aren’t directly addressed to us, but everybody who has a mailbox gets them. So flyer = event, and mailbox = endpoint. When building your bus and registering an endpoint like so:  sbc.ReceiveEndpoint(...), one has to be sure that the queueName parameter is unique. This is emphasized in the “common gotcha’s” part of the documentation. Now although it’s called queueName, it actually makes an Exchange and Queue pair, both named after the queueName parameter.

The below diagram shows the MyEvent flow in the sample project.

send-vs-publish_2r2

Although the sample doesn’t have multiple event services, I added my_events[N] just to show you each new service (and receive endpoint) will have a corresponding exchange+queue pair made.

Note #2: Now when I’ve used MassTransit and RabbitMQ, I’ve never worked with high traffic projects that result with queue or service bottlenecks. I believe RabbitMQ can be thrown quite a lot and keep up, but if you are encountering thresholds for RabbitMQ or the service, then you will need to think about RabbitMQ clustering, or adding more services (perhaps even scaling out services). But these are highly specialized and more advanced topics. My advice would be to use the MassTransit and RabbitMQ community discussion boards or mailing lists.

Send (Commands)

Commands should only be executed once (although you should be aware if making commands idempotent, but we will not cover this more complex topic in this tutorial). So in our scenario above (publish), if we have multiple services (ie. multiple endpoints) for the same contract type, we will have a single command being executed by many consumers. So rather than publish, we send a command to a specific queue+exchange pair. Then, our services can connect to the same receive endpoint’s queueName so long as they have the same consumers registered. I’m paraphrasing this important point from the documentation.

Here’s the MyCommand flow in the sample project.

send-vs-publish_2

The one main difference is the Message is sent directly to the my_commands endpoint exchange, rather than the MyCommand contract exchange. This avoids the contract exchange fan-out. In the sample, we also chose our services to subscribe to the same endpoint (as I made sure they both had the same consumers registered). I cheated a little and added a different sleep time to each consumer.

Code Comparison

To publish:

Pretty simple, nothing to explain here

To send:

The big difference, we have to get our send endpoint first by passing in the Full Uri.

I hope this has helped clarify commands vs events, and when to use each. The next post I have planned will look at Transient vs Persistent exchanges+queues.

Sagas, Scheduling, and Shopping Cart… Under the Covers

A few weeks ago, Chris published a Sample-ShoppingWeb with a supporting blog post (a good foundation to understand the Sample), which showcases how to implement a expiring shopping carts, complete with a Saga and Quartz Scheduler. This sample code really starts to show the power of MassTransit, going beyond simple PubSub and ReqResp functionality. However I like to know how things work, and visualizing the flow really helps me understand what’s going on under the covers. So please read on if you are curious as well!

First, try it yourself

Download the Sample-ShoppingWeb and run it on your machine. Make sure you either have a guest/guest account in RabbitMQ, or quickly setup a virtual host in RabbitMQ web console (similar to how I did in Step 2 here) for the shopping cart.

Once you have your solution up and running, you can see how adding fake items to a users cart will start the expiration countdown (and adding more items will refresh it). If the countdown elapses, the cart is destroyed. A lot is going on here, but I will try to break it down in steps.

Exchanges/Queues

After you have run the sample, you will notice in the RabbitMQ web console there are these exchanges:

exchanges_1

The first 4 are created by our Scheduler, the second 4 are created by the Saga. Each of these 4 exchanges can be viewed as two groupings. If you look in the TrackingServices.cs at this code snip:

You’ll see that two receive endpoints are made, and if you look at the Queues in the RabbitMQ web console, you’ll see sample_quartz-scheduler and shopping_cart_state. So each of the grouping of 4 Exchanges will send all messages to their respective queue.

Flow Diagram

The example of clicking the “Add Item” button once on the Cart webpage is as follows:
flow_2

Okay, so there’s a lot going on here, so I’ll break it down into the numbered steps shown in the picture.

  1. The webpage performs a Form submit (using HTTP POST). The input field is meant to hold the username (a unique identifier for the cart). Any one username can only have one active cart at a time.
  2. The MVC controller receives this HTTP POST message, and creates a message CartItemAdded and publishes it to the Service Bus (RabbitMQ).
  3. The message is received in the appropriate exchange, and then sent to the queue shopping_cart_state.
    The TrackingService is running in a TopShelf service, and is listening with consumers on each queue. One group of consumers (handled by the Saga Repository) is listening to the saga queue, the other grouping (handled by the Quartz Scheduler) listens to the scheduler queue.
  4. The saga state machine sees the CartItemAdded and starts a new state machine. This state is saved to the local database using Entity Framework.
  5. A schedule message is kicked off from the state machine for 10 seconds into the future.
  6. The schedule message is passed along to the sample_quartz-scheduler.
  7. The in memory scheduler receives the message and will take actions when the schedule message indicated. (Quartz has the capability to run with a persistent mechanism, because if the TrackingService crashed before the 10 seconds was triggered, the cart would not be expired).
  8. The 10 second mark has passed, and the scheduler sends a CartExpired message.
  9. The message is passed to the saga queue from the exchange.
  10. The Tracking Service gets the CartExpired message, and it first looks in the DB to see if a state instance already exists (using the CorrelationId). It finds the saved state, loads it, and then raises the CartExpired event to follow the State Machine flow. This event ends the state machine and deletes it’s entry from the database, because the state machine has set  SetCompletedWhenFinalized(); .

Now as you can see, quite a lot happened after you pressed that “Add Item” button. So hopefully this explanation helped clear some of the confusion. Don’t be intimidated by MassTransit, it’s a powerful tool and when used correctly, you can solve some difficult problems.

Now I’ll leave this optional exercise for the reader. How does the flow I explained above change if the user pressed “Add Item” again after 5 seconds (for the same user of course)?