MassTransit is capable of a lot, and one important feature is Send vs Publish. If you read the official documentation section “creating a contract“, it talks about commands (send) and events (publish). This makes sense, because commands you want to only be executed once, however events can be observed by one or more listeners.
This tutorial will look at what is happening with the two different message types. To follow along, download the completed project here. The only thing you will need to do is create a virtual host and user in RabbitMQ. Other tutorials for MassTransit typically use the default RabbitMQ guest/guest account, but I prefer to separate mine into virtual hosts. So open the solution and look in either of the three services App.config or the Mvc Web.config. You’ll see:
<add key="RabbitMQHost" value="rabbitmq://localhost/tutorial-send-vs-publish" /> <add key="RabbitMQHostUsername" value="tutorial-send-vs-publish" /> <add key="RabbitMQHostPassword" value="chooseapassword" />
So browse to http://localhost:15672, login as admin and configure. I make the virtual host and username the same. Then choose a password you want. After it’s setup run the solution and try it out!
Tutorial
I started with the Mvc StarterKit and created 3 services. One service is for the events (in practice it could be 1..n). The other two services for commands have artificially fast and slow processing speeds. This was done to illustrate that although X quantity of commands are queued, they are only processed once (well, once or more to be honest, as RabbitMQ is a ‘at least once’ message broker).
Note #1: Commands and events are both usually defined as interfaces. Therefore using descriptive names helps discern commands from events.
Publish (Events)
Lets use a post office and mailbox to help describe publish. An event in MassTransit is akin to flyer’s that we receive in the mail. They aren’t directly addressed to us, but everybody who has a mailbox gets them. So flyer = event, and mailbox = endpoint. When building your bus and registering an endpoint like so: sbc.ReceiveEndpoint(…), one has to be sure that the queueName parameter is unique. This is emphasized in the “common gotcha’s” part of the documentation. Now although it’s called queueName, it actually makes an Exchange and Queue pair, both named after the queueName parameter.
The below diagram shows the MyEvent flow in the sample project.
Although the sample doesn’t have multiple event services, I added my_events[N] just to show you each new service (and receive endpoint) will have a corresponding exchange+queue pair made.
Note #2: Now when I’ve used MassTransit and RabbitMQ, I’ve never worked with high traffic projects that result with queue or service bottlenecks. I believe RabbitMQ can be thrown quite a lot and keep up, but if you are encountering thresholds for RabbitMQ or the service, then you will need to think about RabbitMQ clustering, or adding more services (perhaps even scaling out services). But these are highly specialized and more advanced topics. My advice would be to use the MassTransit and RabbitMQ community discussion boards or mailing lists.
Send (Commands)
Commands should only be executed once (although you should be aware if making commands idempotent, but we will not cover this more complex topic in this tutorial). So in our scenario above (publish), if we have multiple services (ie. multiple endpoints) for the same contract type, we will have a single command being executed by many consumers. So rather than publish, we send a command to a specific queue+exchange pair. Then, our services can connect to the same receive endpoint’s queueName so long as they have the same consumers registered. I’m paraphrasing this important point from the documentation.
Here’s the MyCommand flow in the sample project.
The one main difference is the Message is sent directly to the my_commands endpoint exchange, rather than the MyCommand contract exchange. This avoids the contract exchange fan-out. In the sample, we also chose our services to subscribe to the same endpoint (as I made sure they both had the same consumers registered). I cheated a little and added a different sleep time to each consumer.
Code Comparison
To publish:
await _bus.Publish<MyEvent>(...);
Pretty simple, nothing to explain here
To send:
var sendEndpoint = await _bus.GetSendEndpoint(new Uri(ConfigurationManager.AppSettings["MyCommandQueueFullUri"])); await sendEndpoint.Send<MyCommand>(...);
The big difference, we have to get our send endpoint first by passing in the Full Uri.
I hope this has helped clarify commands vs events, and when to use each. The next post I have planned will look at Transient vs Persistent exchanges+queues.
Can I take your pictures into MassTransit docs?
You most certainly can. I’d be glad if they help improve the official documentation!
I have 2 questions on MassTransit.
1. I have a C# module with following code, and i set a breakpoint at the code that i am highlighting in blue. When the code execution hits the breakpoint, i went to stop the service of RabbitMQ (Win Service), after that, i hit F5 to continue the execution. But the problem here is, even though i have stopped the RabbitMQ win service, and the “Task sendTask = sendEndpoint.Send(new …. ” still executes successfully without throwing any exception. I am wondering is it a bug in MassTransit? This issue poses a concern because it seems that even though RabbitMQ is down during the execution of the program, the program is still “unaware” of it and treats it as a successful transaction.
Task sendEndpointTask = Bus.GetSendEndpoint(
new Uri(string.Concat(Helper.rabbitMqAddress, “/”, Helper.rabbitMqQueue)));
ISendEndpoint sendEndpoint = sendEndpointTask.Result;
Task sendTask = sendEndpoint.Send(new
{
Address = “New Street ### ” + System.Threading.Thread.CurrentThread.ManagedThreadId,
Id = Guid.NewGuid(),
Preferred = true,
Name = “Nice people LTD > ” + Guid.NewGuid(),
Type = 1,
DefaultDiscount = 0
});
2. Secondly, in MassTransit, i create a listener as following. By default, if let say there is any exception in Consume() method in RegisterConsumerCommand object, and if there is no try/catch code, the message will be inserted back into the queue. Question is, is there anyway to configure in MassTransit so that once the message is read from RabbitMQ, it straight away removed from the queue regardless of whether Consume() method is executed successfully or thrown with exception?
IRabbitMqHost rabbitMqHost = rabbit.Host(new Uri(“rabbitmq://localhost:5672”), settings =>
{
settings.Password(“accountant”);
settings.Username(“accountant”);
});
rabbit.ReceiveEndpoint(rabbitMqHost, “remit.queue”, conf =>
{
conf.Consumer();
conf.PrefetchCount = 2;
conf.UseConcurrencyLimit(2);
});
Thank you in advance.
Regards,
Tat Sean
1. I have the same question
2. Could it be that this is because the bus/queue a retry mechanism. In that case you could try to customize the retry policy and application composition time.
For question 1, that goes into the implementation of MT with rabbitmq, which you will need to ask one of the project creators. They respond back pretty quickly in Gitter [https://gitter.im/MassTransit/MassTransit].
For question 2, No. And I’m pretty sure this is by design, because you want to guarantee at least once delivery. So MT waits until the end of it’s pipeline to send the ACK back to the transport (in this case RabbitMq). This is also why it’s important that your consumers be designed to be idempotent. Because if lets say everything processed successfully, then for some reason the server died that was running the consumer, right before the ACK was sent back to RabbitMq. Then when server comes back up, it will pop that message off the queue it already processed, and process it again. So we have At Least once Delivery (good), and the consumer is idempotent (also good, wouldn’t have and bad effects from processing again). What you are describing (sending the ack back before processing finished), would break that. The safer design is guarantee at least once delivery, versus zero or once delivery.
Also, if you have no retry middleware in your endpoint/bus creation, then any exception will cause the message to move into an _error queue.
A very nice post indeed! What I find confusing in MassTransit is not being able to do context.Send() from within a command consumer (when a command A needs to schedule a command B), the message context provides the Send method, but calling it leads to “No convention found for ‘command B'” (needless to say that context.Publish() executes flawlessly, but I don’t want an event here). The confusing part is that the Docs emphasize that one must use the “closest interface” but the consumer has no knowledge about the Send Endpoint uri. I end up retrieving a Send endpoint from IBusControl for the time being, until I figure out a better approach.
In the new Masstransit Version 4 (in nuget as a prerelease package), there is Topology configuration, which will enable more control over this (not having to get the send endpoint).
However, in v3, you can use the endpoint convention, which can be configured once (typically at IoC registration/startup time).
So if you Map your Interface with an endpoint address, then your send method will work.
See an example here: https://github.com/phatboyg/Demo-Registration/blob/f9e83d1b25edb23e778582d65e80efaece00d775/src/Registration.Api/Global.asax.cs#L40
you can see
Nice post. Explains the difference between Send and Publish.
I had a question regarding the Publish message type. Are the messages published by default persistent? I know the queue is durable, but not sure about the message type, if it’s persistent or transient.
I have a question I would like to give a publish without having an endpoint to receive anything has like?
This is brilliant and gave me a better clarity than any other. The diagrams made it all the better. Thank you.