Sagas, Scheduling, and Shopping Cart… Under the Covers

A few weeks ago, Chris published a Sample-ShoppingWeb with a supporting blog post (a good foundation to understand the Sample), which showcases how to implement a expiring shopping carts, complete with a Saga and Quartz Scheduler. This sample code really starts to show the power of MassTransit, going beyond simple PubSub and ReqResp functionality. However I like to know how things work, and visualizing the flow really helps me understand what’s going on under the covers. So please read on if you are curious as well!

First, try it yourself

Download the Sample-ShoppingWeb and run it on your machine. Make sure you either have a guest/guest account in RabbitMQ, or quickly setup a virtual host in RabbitMQ web console (similar to how I did in Step 2 here) for the shopping cart.

Once you have your solution up and running, you can see how adding fake items to a users cart will start the expiration countdown (and adding more items will refresh it). If the countdown elapses, the cart is destroyed. A lot is going on here, but I will try to break it down in steps.

Exchanges/Queues

After you have run the sample, you will notice in the RabbitMQ web console there are these exchanges:

exchanges_1

The first 4 are created by our Scheduler, the second 4 are created by the Saga. Each of these 4 exchanges can be viewed as two groupings. If you look in the TrackingServices.cs at this code snip:

You’ll see that two receive endpoints are made, and if you look at the Queues in the RabbitMQ web console, you’ll see sample_quartz-scheduler and shopping_cart_state. So each of the grouping of 4 Exchanges will send all messages to their respective queue.

Flow Diagram

The example of clicking the “Add Item” button once on the Cart webpage is as follows:
flow_2

Okay, so there’s a lot going on here, so I’ll break it down into the numbered steps shown in the picture.

  1. The webpage performs a Form submit (using HTTP POST). The input field is meant to hold the username (a unique identifier for the cart). Any one username can only have one active cart at a time.
  2. The MVC controller receives this HTTP POST message, and creates a message CartItemAdded and publishes it to the Service Bus (RabbitMQ).
  3. The message is received in the appropriate exchange, and then sent to the queue shopping_cart_state.
    The TrackingService is running in a TopShelf service, and is listening with consumers on each queue. One group of consumers (handled by the Saga Repository) is listening to the saga queue, the other grouping (handled by the Quartz Scheduler) listens to the scheduler queue.
  4. The saga state machine sees the CartItemAdded and starts a new state machine. This state is saved to the local database using Entity Framework.
  5. A schedule message is kicked off from the state machine for 10 seconds into the future.
  6. The schedule message is passed along to the sample_quartz-scheduler.
  7. The in memory scheduler receives the message and will take actions when the schedule message indicated. (Quartz has the capability to run with a persistent mechanism, because if the TrackingService crashed before the 10 seconds was triggered, the cart would not be expired).
  8. The 10 second mark has passed, and the scheduler sends a CartExpired message.
  9. The message is passed to the saga queue from the exchange.
  10. The Tracking Service gets the CartExpired message, and it first looks in the DB to see if a state instance already exists (using the CorrelationId). It finds the saved state, loads it, and then raises the CartExpired event to follow the State Machine flow. This event ends the state machine and deletes it’s entry from the database, because the state machine has set  SetCompletedWhenFinalized(); .

Now as you can see, quite a lot happened after you pressed that “Add Item” button. So hopefully this explanation helped clear some of the confusion. Don’t be intimidated by MassTransit, it’s a powerful tool and when used correctly, you can solve some difficult problems.

Now I’ll leave this optional exercise for the reader. How does the flow I explained above change if the user pressed “Add Item” again after 5 seconds (for the same user of course)?

SignalRChat with MassTransit v3

In this tutorial we will add a service bus using MassTransit. We will build on this project made in a previous post.

Service Bus?

Without going into too much detail (you can find many websites with more thorough explanations), a service bus lets you send messages (which contain a payload/instruction) for out-of-process handling. They can do a lot more, but we will just be exploring a very simple usage scenario. I plan to make future posts which will leverage more features of the service bus.

The most simple example would be if you have a webpage that is used to generate a report. Typically, as a developer your first inclination would be to generate the report on the fly with each HTTP request. But if the generation can take over 30 seconds (minutes, or even hours), you definitely need to think about processing the request outside of the web server process (eg. IIS can and will behave unpredictably with requests that take longer than 30 seconds). So you may need to adjust your design depending on your system limits and design requirements.

Our goal is to have our message flow look like:

signalrchat_masstransit_flow

Requirements

For this tutorial:

  • We want SignalRChat to be horizontally scalable, and users can chat to each other, even though they might be hosted on different servers (an example of horizontal scaling)
  • We will not be running any long processing background tasks (perhaps in a future post I will show an example of this)
  • We won’t need distributed transactions (This article has some excellent information related to service bus design)
  • We want a Message Queue (MQ) so this project to support horizontal scaling. There are a variety of options for MQ (RabbitMQ, ZeroMQ, MSMQ, NServiceBus, Azure Service Bus), some free and open source, some commercial and fully featured with support options. We will use RabbitMQ, which is free and super easy to setup.

Note #1: An excellent resource where I learned about MassTransit and Service Buses is Loosely Coupled. The blog is made by a highly knowledgeable developer and there is a lot of great information there.

Tutorial

Step 1 – Install RabbitMQ

This is quite straight forward. You will need to install ErLang, because RabbitMQ was made in Erlang.

Now install the two packages if you haven’t done so already (ErLang first, then RabbitMQ). You can leave all the defaults, they should be quick installs.

RabbitMQ has a fully featured command line tool for managing the service on your machine, but there is an amazingly helpful plugin for a web management interface. To install it:

  1. Open the Start Menu, start typing “rabbit” then you will see
    signalrchat_masstransit_1
    open the RabbitMQ Command Prompt
  2. Now execute the command
  3. Once complete, stop the service, and start it again (the start menu has both stop and start available for rabbitmq)

Great, you now have a Message Queue working, ready to be configured!

Step 2 – Configure RabbitMQ

Now open up http://localhost:15672/, and login with the default username/password of guest/guest. The first thing we will do is add our new admin user.

  1. Click on the Admin tab, and then add a user. Add a user, and give it admin priviledges. I chose to name mine admin, but you can choose whatever name you like.
    signalrchat_masstransit_2
  2. Once the user is created, logout of guest, and login as admin.
  3. Next go to the admin tab, click on the guest user, and then delete it.

Now for this project, we want to make a user and a virtual host in RabbitMQ which our .NET solution can connect with. We could just use the default virtual host ‘/’ and the admin username/password, but its better practice to create a separate account and virtual host.

  1. First click on the Admin tab, then in the far right, there are three options. You will click Virtual Hosts.
  2. Add a new virtual host and name it signalrchat
  3. Now in the far right, click on the Users, and create a new user named signalrchat and give the user ‘monitoring’ privilege (the username does not have to match the virtual host, but I did it that way).
  4. With the user created, click on them, and then set their permissions for the signalrchat virtual host. Leave .* for configure, read and write. When you actually deploy to a production environment, you might want to lock down priviledges more, but for local development we leave it open. You can read more about RabbitMQ access-control here.
    signalrchat_masstransit_3
  5. You can also add permissions for the admin user for both the / and signalrchat virtual hosts.
    signalrchat_masstransit_4

Great, now RabbitMQ is setup and ready to use with the SignalRChat project!

Step 3 – Lets start coding

I’m going to assume you already have the SignalRChat-Autofac project cloned and ready to go!

  1. First create a new project in the solution. Make it a class library, and name it SignalRChat.Contracts.
  2. Delete class1.cs, and make a new folder in the project named ServiceBus.
  3. Now add a new interface to the ServiceBus directory, and name it ISendChat.cs. The interface should contain:

    signalrchat_masstransit_5

That was easy, and the solution is well organized with contracts separated out in a separate class library, similar to our bootstrapper project.

Step 4 – Publish to the bus (almost)

Next, go into the MVC/Web project, and add a reference to our SignalRChat.Contracts project, because we will be publishing and consuming based on that ISentChat contract.
signalrchat_masstransit_6

Next open nuget package manager console (or use the gui version). Enter the command:

Make sure the target in package manager console is the web/mvc project.

Note #1: At the time of writing this, MassTransit is still in the pre-release phase. You can double check to see if v3 is released or not, and you might be able to omit the -pre flag.

Note #2: I noticed that the MassTransit nuget package has a Newtonsoft Json dependency of 6.0.8, and this doesn’t match our bootstrapper. You can ignore this, but I like my dependencies to match when possible, so I bumped up the versions to all match eachother.

  • Now open up ChatHub.cs, and replace the contents with:

The IBus interface is provided through Dependency Injection and the message is published based on the ISendChat contract. Notice that I am publishing an anonymous type, which still satisfies the ISendChat contact. Since this is a simple contract and I have no additional logic I want to add, I don’t want to have to implement the contract, so instead I used an anonymous type. Don’t feel forced to use an anonymous type, you can make a concrete class that implements ISendChat and it will still work.

Step 5 – Wire up MassTransit in the bootstrapper

If we were to compile, it would not work because we haven’t wired up MassTransit. You might also get an error about TestClass, we will be removing that next. Go into the Bootstrapper project and open IoCConfig.cs. Replace the contents with:

Autofac has a handy Module class that I derive HubModule and MassTransitModule from, so I can separate the registration of each function. Keeps the code nice and clean.

  1. Create a folder in the bootstrapper called Modules.
  2. Now create a class inside this folder named HubModule.cs and paste in these contents:

    You’ll notice this load function looks almost identical to what used to be in the IoCConfig.cs, it’s just now within an Autofac Module.
  3. Next, open nuget package manager console, and run the command:
  4. The next class will handle all of the MassTransit setup and configuration. Create a class named MassTransitModule.cs and paste in these contents:

    Line# 19: This statement registers all MassTransit consumers. We haven’t made any yet, so this statement will find nothing.
    Line #30: This is where we configure our endpoint, and you’ll notice we have the virtual host /signalrchat that we made. We also have the username and password configured as well. You’ll likely want these in some <appsettings/> config, but we’ll hard code them in for this demo.
    We could stop at this point, and publish to the ServiceBus, but we also want to subscribe to a queue
    Line #36:  This subscribes our consumers(s) to the queue. Remember those permissions we set for the signalrchat user? Well this means that the exchange/queue pair named “sendchat_queue” will be automatically created if they don’t already exist because our user signalrchat has the configure regexp of .*. The other thing to note that all consumers will be resolved with the ep.LoadFrom(…), and depending on your architecture/design, you might want to explicitly state which consumers resolve to what queue. These are more advanced design decisions, but it’s good to be aware of them.
  5. Lastly, open up Startup.cs and add these contents to the end of the configuration method (right after app.MapSignalR):

    What I discovered reading that stack overflow link, is IConnectionManager is not added to our autofac owin resolver, and so what we have to do here is create a new container builder, get the IConnectionManager from the hubConfig, and register that to our Autofac. It’s not the best, but until there’s a better alternative this will do. Then builder.Update(…) will add this to our existing container.

Step 6 – Peeking under the hood

We haven’t made a consumer yet, and there’s a reason for that. I want to peek under the hood in the RabbitMQ console so we can understand a bit of what’s going on.

  • Lets build and debug (F5) our solution
  • The webpage home should come up in your browser, navigate to /home/chat
  • Enter a username, and then type any message and send it

Now you’ll notice that we don’t have our message echoed into the webpage anymore. This is because we sent it to our service bus, but there is no consumer to receive the message.

  • Log into our RabbitMQ management console as admin (if not already logged in)
  • Click on the Exchanges tab at the top
  • You should see there are three exchanges created, two will be durable, and the xxxxxx-iiexpress-12345…. will be auto deleting.
    signalrchat_masstransit_7
  • The message contract is defined as ISendChat, and this exchange will distribute any incoming message to all exchanges that want this type of message. There is no subscription ‘yet’ as you can see if you click on the exchange, so our message that we sent in the webpage was received, but discarded because nobody wanted it.
    signalrchat_masstransit_8

So lets quickly make a test exchange and test queue to receive this message.

  1. Create a new exchange in virtual host signalrchat and name it test. Leave everything else a defaults
    signalrchat_masstransit_9
  2. Now click the Queues tab in the titlebar
  3. Again add a new queue, with the same vhost and name as the exchange, leaving all other defaults
    signalrchat_masstransit_10
  4. Next, we need to wire up the exchanges and queues. Click on the Exchanges tab, and click on the SignalRChat.Contracts.ServiceBus:ISendChat exchange
  5. Add a binding to the test exchange. Leave everything else blank
    signalrchat_masstransit_11
  6. Now back in the Exchanges tab, click on the test exchange. You’ll notice now there is a “From” binding. We will be adding the “To” binding to a queue
    signalrchat_masstransit_12
  7. Now Alt-Tab back to your browser window, and send another chat message
  8. Then in RabbitMQ, click on the Queues tab and you’ll notice that the test queue has a message ready
    signalrchat_masstransit_13
  9. Click on the queue, scroll down to Get Messages and click the Get Message(s) button. This lets us take a look at the payload
    signalrchat_masstransit_14

Now this seems like a lot of work, and normally we would not have to do this, because once we add our consumer, it will all be wired up automatically for us. But I want you to get more familiar with the RabbitMQ management. It’s a really useful to understand what’s going on, and can help you debug in the future.

You can now delete the test queue and exchange, or leave them if you like. But remember that the test queue will continue getting message and won’t be emptied until you purge it yourself.

Step 7 – Finish up the last consumer

Okay, lets get this last consumer completed.

  1. In the web/mvc project, create a new folder named Consumers
  2. Create a class inside the folder named SendChatConsumer.cs and paste in these contents

Line# 15: Remember in Startup.cs, we had to independently resolve the IConnectionManager? Well we needed that so we could get it through DI here.
Line #19: Doesn’t this look familiar? The ChatHub.cs used to echo the message back out. We are still doing the same thing, but now the message is being published to the bus, and then consumed by this.

We’ve just made the round trip a bit longer, but with the added reward of a horizontally scalable web application now. Now debug the solution again, and send a chat message. You will see that it now echo’s back out to the webpage as before.

Review

You can find the source for this completed solution here on github. There was a lot of content covered in this post, but here are some of the key takeaways:

  • Learned how to start MassTransit and SignalR with an IoC (Autofac) inside an OWIN ASP.NET MVC project.
  • Learned how Service Buses + Message Queues can help aid the design of horizontally scalable solutions. This type of design can be used when a multi-tenancy application has high load and throwing more powerful hardware at the problem (vertical scaling) is more costly then running multiple instances with lower hardware specs (horizontal scaling)
  • Distributed transactions were not needed for this example (if this topic interests you, you should also read about eventual consistency for distributed computing).
  • RabbitMQ web management can be intimidating, but understanding how permissions, vhosts, exchanges and queues work really helps wrap your head around some of the concepts used by the Service Bus.
  • MassTransit is a fantastic library, and when combined with RabbitMQ, creates a free, powerful, and scalable Service Bus.