MassTransit Send vs. Publish

MassTransit is capable of a lot, and one important feature is Send vs Publish. If you read the official documentation section “creating a contract“, it talks about commands (send) and events (publish). This makes sense, because commands you want to only be executed once, however events can be observed by one or more listeners.

This tutorial will look at what is happening with the two different message types. To follow along, download the completed project here. The only thing you will need to do is create a virtual host and user in RabbitMQ. Other tutorials for MassTransit typically use the default RabbitMQ guest/guest account, but I prefer to separate mine into virtual hosts. So open the solution and look in either of the three services App.config or the Mvc Web.config. You’ll see:

So browse to http://localhost:15672, login as admin and configure. I make the virtual host and username the same. Then choose a password you want. After it’s setup run the solution and try it out!

Tutorial

I started with the Mvc StarterKit and created 3 services. One service is for the events (in practice it could be 1..n). The other two services for commands have artificially fast and slow processing speeds. This was done to illustrate that although X quantity of commands are queued, they are only processed once (well, once or more to be honest, as RabbitMQ is a ‘at least once’ message broker).

Note #1: Commands and events are both usually defined as interfaces. Therefore using descriptive names helps discern commands from events.

Publish (Events)

Lets use a post office and mailbox to help describe publish. An event in MassTransit is akin to flyer’s that we receive in the mail. They aren’t directly addressed to us, but everybody who has a mailbox gets them. So flyer = event, and mailbox = endpoint. When building your bus and registering an endpoint like so:  sbc.ReceiveEndpoint(...), one has to be sure that the queueName parameter is unique. This is emphasized in the “common gotcha’s” part of the documentation. Now although it’s called queueName, it actually makes an Exchange and Queue pair, both named after the queueName parameter.

The below diagram shows the MyEvent flow in the sample project.

send-vs-publish_2r2

Although the sample doesn’t have multiple event services, I added my_events[N] just to show you each new service (and receive endpoint) will have a corresponding exchange+queue pair made.

Note #2: Now when I’ve used MassTransit and RabbitMQ, I’ve never worked with high traffic projects that result with queue or service bottlenecks. I believe RabbitMQ can be thrown quite a lot and keep up, but if you are encountering thresholds for RabbitMQ or the service, then you will need to think about RabbitMQ clustering, or adding more services (perhaps even scaling out services). But these are highly specialized and more advanced topics. My advice would be to use the MassTransit and RabbitMQ community discussion boards or mailing lists.

Send (Commands)

Commands should only be executed once (although you should be aware if making commands idempotent, but we will not cover this more complex topic in this tutorial). So in our scenario above (publish), if we have multiple services (ie. multiple endpoints) for the same contract type, we will have a single command being executed by many consumers. So rather than publish, we send a command to a specific queue+exchange pair. Then, our services can connect to the same receive endpoint’s queueName so long as they have the same consumers registered. I’m paraphrasing this important point from the documentation.

Here’s the MyCommand flow in the sample project.

send-vs-publish_2

The one main difference is the Message is sent directly to the my_commands endpoint exchange, rather than the MyCommand contract exchange. This avoids the contract exchange fan-out. In the sample, we also chose our services to subscribe to the same endpoint (as I made sure they both had the same consumers registered). I cheated a little and added a different sleep time to each consumer.

Code Comparison

To publish:

Pretty simple, nothing to explain here

To send:

The big difference, we have to get our send endpoint first by passing in the Full Uri.

I hope this has helped clarify commands vs events, and when to use each. The next post I have planned will look at Transient vs Persistent exchanges+queues.

5 thoughts on “MassTransit Send vs. Publish”

  1. I have 2 questions on MassTransit.

    1. I have a C# module with following code, and i set a breakpoint at the code that i am highlighting in blue. When the code execution hits the breakpoint, i went to stop the service of RabbitMQ (Win Service), after that, i hit F5 to continue the execution. But the problem here is, even though i have stopped the RabbitMQ win service, and the “Task sendTask = sendEndpoint.Send(new …. ” still executes successfully without throwing any exception. I am wondering is it a bug in MassTransit? This issue poses a concern because it seems that even though RabbitMQ is down during the execution of the program, the program is still “unaware” of it and treats it as a successful transaction.

    Task sendEndpointTask = Bus.GetSendEndpoint(
    new Uri(string.Concat(Helper.rabbitMqAddress, “/”, Helper.rabbitMqQueue)));
    ISendEndpoint sendEndpoint = sendEndpointTask.Result;

    Task sendTask = sendEndpoint.Send(new
    {
    Address = “New Street ### ” + System.Threading.Thread.CurrentThread.ManagedThreadId,
    Id = Guid.NewGuid(),
    Preferred = true,
    Name = “Nice people LTD > ” + Guid.NewGuid(),
    Type = 1,
    DefaultDiscount = 0
    });

    2. Secondly, in MassTransit, i create a listener as following. By default, if let say there is any exception in Consume() method in RegisterConsumerCommand object, and if there is no try/catch code, the message will be inserted back into the queue. Question is, is there anyway to configure in MassTransit so that once the message is read from RabbitMQ, it straight away removed from the queue regardless of whether Consume() method is executed successfully or thrown with exception?

    IRabbitMqHost rabbitMqHost = rabbit.Host(new Uri(“rabbitmq://localhost:5672”), settings =>
    {
    settings.Password(“accountant”);
    settings.Username(“accountant”);
    });

    rabbit.ReceiveEndpoint(rabbitMqHost, “remit.queue”, conf =>
    {
    conf.Consumer();
    conf.PrefetchCount = 2;
    conf.UseConcurrencyLimit(2);
    });

    Thank you in advance.

    Regards,
    Tat Sean

  2. A very nice post indeed! What I find confusing in MassTransit is not being able to do context.Send() from within a command consumer (when a command A needs to schedule a command B), the message context provides the Send method, but calling it leads to “No convention found for ‘command B'” (needless to say that context.Publish() executes flawlessly, but I don’t want an event here). The confusing part is that the Docs emphasize that one must use the “closest interface” but the consumer has no knowledge about the Send Endpoint uri. I end up retrieving a Send endpoint from IBusControl for the time being, until I figure out a better approach.

  3. In the new Masstransit Version 4 (in nuget as a prerelease package), there is Topology configuration, which will enable more control over this (not having to get the send endpoint).

    However, in v3, you can use the endpoint convention, which can be configured once (typically at IoC registration/startup time).

    So if you Map your Interface with an endpoint address, then your send method will work.

    See an example here: https://github.com/phatboyg/Demo-Registration/blob/f9e83d1b25edb23e778582d65e80efaece00d775/src/Registration.Api/Global.asax.cs#L40

    you can see

Leave a Reply

Your email address will not be published. Required fields are marked *