MassTransit with Azure Service Bus

One of the great features of MassTransit is the abstraction it provides over the message transport. This allows you to forget about the plumbing and focus on distributed application design and development.

Now would be a great opportunity to take one of our past tutorials and convert it from RabbitMQ to Azure Service Bus. It will become very clear how easily you can switch from one message transport to another. The completed project for this tutorial can be found here under the azure-service-bus branch.

Tutorial

First, clone the master branch from this tutorial. Don’t worry if you don’t have RabbitMQ setup locally, because we will be switching the solution to use Azure Service Bus.

Create an Azure Service Bus

The first thing we need to do is create an AzureSB for this project. You can use a free trial, or if you have an MSDN License that works too. You’ll want to log into https://portal.azure.com. Once at the dashboard screen, follow these steps:

  1. Click New, and search for “azure service bus” or “service bus”. Then select it, and click “Create”.
    tutorial-req-resp_azure_1
  2. Within the Create window, you will want to enter settings:
    Name: <this will be your namespace>
    Pricing Tier: <Must be Standard or higher, MassTransit needs Topics and Queues>
    Resource group: <I made a new one, but you can choose existing if you have one>
    tutorial-req-resp_azure_2
  3. Wait a bit while it creates/deploys the Service Bus
    tutorial-req-resp_azure_3
  4. Once Created, you will want to click “Shared access policies”, Choose “RootManageSharedAccessKey” and then leave this window open, because we will need to copy the primary key values into our App.config and Web.config.

Update the Project Configs

Before we update our project IoC with the Azure SB transport, we need to add the Azure Key Name and Shared Access Key for the Service Bus we made in the previous section.

  1. Open your App.config in the StarterKit.Service project. Add the 4 lines with Azure configuration as follows:
    <appSettings>
      <add key="AzureSbNamespace" value="azurereqresp" />
      <add key="AzureSbPath" value="TutorialReqResp" />
      <add key="AzureSbKeyName" value="RootManageSharedAccessKey" />
      <add key="AzureSbSharedAccessKey" value="+yourlongkeyhere=" />
      <add key="RabbitMQHost" value="rabbitmq://localhost/tutorial-req-resp" />
      <add key="RabbitMQUsername" value="tutorial-req-resp" />
      <add key="RabbitMQPassword" value="apple" />
      <add key="ServiceQueueName" value="order_status_check" />
    </appSettings>

    Line 2: This must match the namespace you used when creating the Azure Service Bus
    Line 3: This can be a path of your choosing, similar to RabbitMQ’s virtual host. A logical separation for different topics and queues within the same namespace
    Line 4: Leave this as is
    Line 5: Paste in your shared access key (continue reading to see how you obtain the Shared Access Key)

  2. Open your Service Bus in Azure Portal. Then click Settings -> Shared access policies -> Root manage shared access key. You can copy the Primary key (or secondary, it doesn’t matter) and paste it into your App.config.
    tutorial-req-resp_azure_4
  3. Now open Web.config in StarterKit.Web and add the same 4 lines, plus a new 5th line.
    <appSettings>
      <add key="AzureSbNamespace" value="azurereqresp" />
      <add key="AzureSbPath" value="TutorialReqResp" />
      <add key="AzureSbKeyName" value="RootManageSharedAccessKey" />
      <add key="AzureSbSharedAccessKey" value="+yourlongkeyhere=" />
      <add key="AzureQueueFullUri" value="sb://azurereqresp.servicebus.windows.net/tutorialreqresp/order_status_check/" />
      <add key="RabbitMQHost" value="rabbitmq://localhost/tutorial-req-resp" />
      <add key="RabbitMQUsername" value="tutorial-req-resp" />
      <add key="RabbitMQPassword" value="apple" />
      <add key="ServiceFullUri" value="rabbitmq://localhost/tutorial-req-resp/order_status_check" />
      <add key="webpages:Version" value="3.0.0.0" />
      <add key="webpages:Enabled" value="false" />
      <add key="ClientValidationEnabled" value="true" />
      <add key="UnobtrusiveJavaScriptEnabled" value="true" />
      <add key="owin:AppStartup" value="StarterKit.Web.Bootstrapper.Startup, StarterKit.Web.Bootstrapper" />
    </appSettings>

    Line 6: This should look familiar if you followed the original req-resp tutorial here. Because the RequestClient needs to connect to a specific endpoint (queue), we need to provide the full Uri that we can access the queue in Azure. The uri of queues can be determined fairly easily, here’s a quick explanation: sb://<namespace>.servicebus.windows.net/<path>/<queue_name>

Add MassTransit Azure Packages and Register with our Container

Great. Now we have our configuration settings saved and ready to use. Now all we have to do it change the container we register in our IoC of both the StarterKit.Service and StarterKit.Web projects. We don’t need to change a single line of business logic code in our app. Thank you MassTransit!

First, you will need to install the nuget package MassTransit.AzureServiceBus in both the StarterKit.Service and StarterKit.Web.Bootstrapper.

StarterKit.Service Changes

Once complete, create a new file in StarterKit.Service/Modules named AzureServiceBusModule.cs.

Paste the following in the file.

public class AzureServiceBusModule : Module
{
    private readonly System.Reflection.Assembly[] _assembliesToScan;

    public AzureServiceBusModule(params System.Reflection.Assembly[] assembliesToScan)
    {
        _assembliesToScan = assembliesToScan;
    }

    protected override void Load(ContainerBuilder builder)
    {
        // Creates our bus from the factory and registers it as a singleton against two interfaces
        builder.Register(c => Bus.Factory.CreateUsingAzureServiceBus(sbc =>
        {
            var serviceUri = ServiceBusEnvironment.CreateServiceUri("sb", ConfigurationManager.AppSettings["AzureSbNamespace"], ConfigurationManager.AppSettings["AzureSbPath"]);

            var host = sbc.Host(serviceUri, h =>
            {
                h.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(ConfigurationManager.AppSettings["AzureSbKeyName"], ConfigurationManager.AppSettings["AzureSbSharedAccessKey"], TimeSpan.FromDays(1), TokenScope.Namespace);
            });

            sbc.ReceiveEndpoint(host, ConfigurationManager.AppSettings["ServiceQueueName"], e =>
            {
                // Configure your consumer(s)
                e.Consumer<CheckOrderStatusConsumer>();
                e.DefaultMessageTimeToLive = TimeSpan.FromMinutes(1);
                e.EnableDeadLetteringOnMessageExpiration = false;
            });
        }))
            .SingleInstance()
            .As<IBusControl>()
            .As<IBus>();
    }
}

Line 15: This helper method from Microsoft.ServiceBus namespace lets us construct our service Uri. You will notice that this uri is the same as our queue in the previous section, sans /<queue_name>.
Line 19: This also uses a helper method to create the Token Provider for us, providing all the proper pieces of information.

Last but not least, head over to IocConfig.cs and change the line:

// Old Registration
//builder.RegisterModule(new BusModule(Assembly.GetExecutingAssembly()));

// New Registration
builder.RegisterModule(new AzureServiceBusModule(Assembly.GetExecutingAssembly()));

Done!

StarterKit.Web.Bootstrapper

Almost there. I guess you can probably imagine we now need to create a new file at StarterKit.Web.Bootstrapper/Modules/AzureServiceBusModule.cs. Paste in the following:

public class AzureServiceBusModule : Module
{
    protected override void Load(ContainerBuilder builder)
    {
        // Creates our bus from the factory and registers it as a singleton against two interfaces
        builder.Register(c => Bus.Factory.CreateUsingAzureServiceBus(sbc =>
        {
            var serviceUri = ServiceBusEnvironment.CreateServiceUri("sb", ConfigurationManager.AppSettings["AzureSbNamespace"], ConfigurationManager.AppSettings["AzureSbPath"]);

            var host = sbc.Host(serviceUri, h =>
            {
                h.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(ConfigurationManager.AppSettings["AzureSbKeyName"], ConfigurationManager.AppSettings["AzureSbSharedAccessKey"], TimeSpan.FromDays(1), TokenScope.Namespace);
            });
        }))
            .SingleInstance()
            .As<IBusControl>()
            .As<IBus>();
    }
}

Again, same as previous, but we obviously don’t need to create the endpoint here. Instead our RequestClient will connect directly to the queue to read.

So… open up RequestClientModule.cs, and change this line:

// Old queue uri
//Uri address = new Uri(ConfigurationManager.AppSettings["ServiceFullUri"]);

// New queue uri
Uri address = new Uri(ConfigurationManager.AppSettings["AzureQueueFullUri"]);

And if you haven’t guessed it, last but not least, go to IocConfig.cs, and change:

// Old registration
//builder.RegisterModule<BusModule>();

// New registration
builder.RegisterModule<AzureServiceBusModule>();

 

All done. We didn’t touch any of our core code, we just setup the bus, wired it into our IoC and MassTransit handles the rest!

MassTransit Autofac Request and Response

Messages send with MassTransit typically are fire and forget. This is because distributed systems operate in different processes on different machines, giving it the benefit of horizontal scaling (with trade offs, but that’s another discussion entirely). However, the need may arise to use request + response in a distributed system. So this tutorial will show how this can be achieved with MassTransit.

If you have followed some of my other tutorials, you will know that I like to separate the RabbitMQ config with a virtual host and username specific to the tutorial. My config for this tutorial is:

<add key="RabbitMQHost" value="rabbitmq://localhost/tutorial-req-resp" />
<add key="RabbitMQUsername" value="tutorial-req-resp" />
<add key="RabbitMQPassword" value="apple" />

This tutorial will simply show the main components required to set up a Request/Response with MassTransit. You can follow along with the completed project here.

*Update*: When I had drafted this, the MassTransit Documentation didn’t have an example for Request/Response. Now there is excellent documentation here. So I will reduce the scope of this tutorial to registering the IRequestClient<…> in an Ioc Framework (Autofac).

Tutorial

When using Publish + Subscribe, we typically get the IBus or IBusControl injected and use the .Publish or .Send methods to fire and forget. But for Request Response, we will use IRequestClient<…>. Open up RequestClientModule.cs. You will see:

Uri address = new Uri(ConfigurationManager.AppSettings["ServiceFullUri"]);
TimeSpan requestTimeout = TimeSpan.FromSeconds(30);

builder.Register(c => new MessageRequestClient<CheckOrderStatus, OrderStatusResult>(c.Resolve<IBus>(), address, requestTimeout))
    .As<IRequestClient<CheckOrderStatus, OrderStatusResult>>()
    .SingleInstance();

Line 4: This is where our concrete type MessageRequestClient is constructed, which is one of the many value adds for MassTransit. It handles the plumbing of creating a RequestId and correlates messages accordingly.

You might be wondering why I used MessageRequestClient? Because in the official sample, a helper extension method is used instead. If we look at the MassTransit source, we will see the extension is just that, a helper:

public static IRequestClient<TRequest, TResponse> CreateRequestClient<TRequest, TResponse>(this IBus bus, Uri address, TimeSpan timeout,
    TimeSpan? ttl = default(TimeSpan?), Action<SendContext<TRequest>> callback = null)
    where TRequest : class
    where TResponse : class
{
    return new MessageRequestClient<TRequest, TResponse>(bus, address, timeout, ttl, callback);
}

So in our RequestClientModule.cs, you could replace the builder.Register with:

builder.Register(c =>
{
    var bus = c.Resolve<IBus>();

    return bus.CreateRequestClient<CheckOrderStatus, OrderStatusResult>(address, requestTimeout);
})
    .As<IRequestClient<CheckOrderStatus, OrderStatusResult>>()
    .SingleInstance();

Then you would get the exact same behavior. Choose whatever method you prefer.

Now we take a quick peek in the Consumer. Open up CheckOrderStatusConsumer.cs and you will see:

public async Task Consume(ConsumeContext<CheckOrderStatus> context)
{
    Console.Out.WriteLine("Received OrderId: " + context.Message.OrderId);
    context.Respond(new SimpleOrderStatusResult { OrderMessage = string.Format("Echo the OrderId {0}", context.Message.OrderId) });
}

And the important line here is context.Respond(…). This is the other part of the MassTransit value add plumbing. It makes sure that it responds and retains the RequestId. You can actually look and see what the RequestId is with context.RequestId .

There you have it, pretty simple, but powerful features provided by MassTransit ontop of whatever message transport (eg. RabbitMQ, AzureSB) you choose.

Enjoy!

MassTransit Send vs. Publish

MassTransit is capable of a lot, and one important feature is Send vs Publish. If you read the official documentation section “creating a contract“, it talks about commands (send) and events (publish). This makes sense, because commands you want to only be executed once, however events can be observed by one or more listeners.

This tutorial will look at what is happening with the two different message types. To follow along, download the completed project here. The only thing you will need to do is create a virtual host and user in RabbitMQ. Other tutorials for MassTransit typically use the default RabbitMQ guest/guest account, but I prefer to separate mine into virtual hosts. So open the solution and look in either of the three services App.config or the Mvc Web.config. You’ll see:

<add key="RabbitMQHost" value="rabbitmq://localhost/tutorial-send-vs-publish" />
<add key="RabbitMQHostUsername" value="tutorial-send-vs-publish" />
<add key="RabbitMQHostPassword" value="chooseapassword" />

So browse to http://localhost:15672, login as admin and configure. I make the virtual host and username the same. Then choose a password you want. After it’s setup run the solution and try it out!

Tutorial

I started with the Mvc StarterKit and created 3 services. One service is for the events (in practice it could be 1..n). The other two services for commands have artificially fast and slow processing speeds. This was done to illustrate that although X quantity of commands are queued, they are only processed once (well, once or more to be honest, as RabbitMQ is a ‘at least once’ message broker).

Note #1: Commands and events are both usually defined as interfaces. Therefore using descriptive names helps discern commands from events.

Publish (Events)

Lets use a post office and mailbox to help describe publish. An event in MassTransit is akin to flyer’s that we receive in the mail. They aren’t directly addressed to us, but everybody who has a mailbox gets them. So flyer = event, and mailbox = endpoint. When building your bus and registering an endpoint like so: sbc.ReceiveEndpoint(…), one has to be sure that the queueName parameter is unique. This is emphasized in the “common gotcha’s” part of the documentation. Now although it’s called queueName, it actually makes an Exchange and Queue pair, both named after the queueName parameter.

The below diagram shows the MyEvent flow in the sample project.

send-vs-publish_2r2

Although the sample doesn’t have multiple event services, I added my_events[N] just to show you each new service (and receive endpoint) will have a corresponding exchange+queue pair made.

Note #2: Now when I’ve used MassTransit and RabbitMQ, I’ve never worked with high traffic projects that result with queue or service bottlenecks. I believe RabbitMQ can be thrown quite a lot and keep up, but if you are encountering thresholds for RabbitMQ or the service, then you will need to think about RabbitMQ clustering, or adding more services (perhaps even scaling out services). But these are highly specialized and more advanced topics. My advice would be to use the MassTransit and RabbitMQ community discussion boards or mailing lists.

Send (Commands)

Commands should only be executed once (although you should be aware if making commands idempotent, but we will not cover this more complex topic in this tutorial). So in our scenario above (publish), if we have multiple services (ie. multiple endpoints) for the same contract type, we will have a single command being executed by many consumers. So rather than publish, we send a command to a specific queue+exchange pair. Then, our services can connect to the same receive endpoint’s queueName so long as they have the same consumers registered. I’m paraphrasing this important point from the documentation.

Here’s the MyCommand flow in the sample project.

send-vs-publish_2

The one main difference is the Message is sent directly to the my_commands endpoint exchange, rather than the MyCommand contract exchange. This avoids the contract exchange fan-out. In the sample, we also chose our services to subscribe to the same endpoint (as I made sure they both had the same consumers registered). I cheated a little and added a different sleep time to each consumer.

Code Comparison

To publish:

await _bus.Publish<MyEvent>(...);

Pretty simple, nothing to explain here

To send:

var sendEndpoint = await _bus.GetSendEndpoint(new Uri(ConfigurationManager.AppSettings["MyCommandQueueFullUri"]));
await sendEndpoint.Send<MyCommand>(...);

The big difference, we have to get our send endpoint first by passing in the Full Uri.

I hope this has helped clarify commands vs events, and when to use each. The next post I have planned will look at Transient vs Persistent exchanges+queues.

Starter Kit for ASP.NET MVC, MassTransit, and Autofac


Chris has done a great job providing Samples for simple and complex use cases with MassTransit. I recommend looking at the Shopping Cart sample and my complimentary post.

I want to write more SOA+MassTransit tutorials covering a wide range of scenarios for scalable and robust web applications, but I need a good jumping off point for the reader and myself to start from. So I’ll be making some Starter Kits available on Github to help provide an equal and familiar starting point for my tutorials. This is a learning process for both you and me, and I want to give the best and most accurate advice.

I like to construct my projects roughly following Jeffery Palermo’s Onion Architecture. The onion architecture might seem like overkill to use for the simple example of the Starter Kit. However I will be writing more tutorials, and when they get more complex, this organizational architecture will help keep things concise, and should (I hope) make it easier for the reader (you) to follow along.

Onion Architecture Approach (Quickly Explained)

starterkit-onion-architecture_1

One important concept is that layers can only have dependencies on their current layer, or more Inner Layers (or external nuget dependencies if necessary). So for example the Contracts and Domain Models should really only reference .NET Framework, nothing else.

Centre: This layer will typically contain our Contracts (Interfaces) and Domain Models or POCO (for ORM). There should be no business logic or type verification. This should be done in another (more outer) layer.

Middle: This layer houses services that contain implementation for our contracts. This layer could be further split into Domain Services and Application Services, but for simplicity we will just leave them as the Middle layer for now. I’ve provided some examples in the diagram such as; MassTransit Consumers, SignalR Hubs, CQRS services (eg. something like Mediatr).

Outer: The outer most layer typically has a variety of items, which end up tying together the services with concrete assemblies and infrastructure. For example, MassTransit (and projects I’ve worked on in the past) use a logging mechanism called LibLog. I am a huge fan of this, because it doesn’t restrict you to a specific logging implementation. So in the outer most layer, all you have to do is add your dependency to NLog, EntLib, or log4net, and configure it (using code or app.config), and your off to the races. None of your inner layers have any dependency on any logging system (LibLog works by simply adding a *.cs file to your projects). The same idea applies for other infrastructure you wire up such as; what ORM you use, the Storage type, the DB provider, the unit testing (and mocking) libraries, which CQRS pattern, or what logging framework to use.

Starter Kit – Autofac, MassTransit, Mvc

The Starter Kit can be found here, and below is the architecture.

Update Oct. 24, 2015: I’ve added another starter kit here, which includes SignalR.

starterkit-architecture_2

The Starter Kit compiles and runs a simple message (publish and subscribe) example. You can see the message sent from the webpage in the console service running.

Logging Explained

I want to take a moment to explain the logging option used. I’ve been around the block when it comes to logging for .NET, and by far the best technique I’ve encountered so far is LibLog. It lets you choose your favorite, and there is no DLL dependency in your libraries. Only the final executable that wires up the logger of your choice (log4net, nlog, entlib, etc…). So in the Starter Kit, I chose log4net, and if you look in StarterKit.Web (Web.config), or StarterKit.Service (App.config), you will see log4net’s configuration.

Logging Level in StarterKit

Because MassTransit, and Topshelf both use LibLog, our logs can become crowded if we set the global log level to debug. So you’ll notice in the *.config, that I set the root level to info and I set the StarterKit.* namespace level to debug.

<root>
  <level value="INFO" />
  <appender-ref ref="RollingFile" />
</root>
<logger name="StarterKit.*">
  <level value="DEBUG" />
</logger>

This keeps the logs clean if you are trying to debug your code. And of course, if you are having trouble with MassTransit, and need to post logs in the google discuss group to get help, the library contributors might ask for logs so they can help diagnose your issue. In that case you would want to change the root level to debug and run + capture the logs.

So go clone the Starter Kit and start experimenting!