RabbitMQ c# Implementation Part
Rabbitmq from a high-level perspective
Direct Exchange type:
The sender sends a message to exchange by adding a routing key to the message header. Once the message is received, the exchange tries to compare the route key with the binding key in all the queues. After that, if found any match, the message is routed to that queue otherwise the message is ignored.
(1) make an instance from the connection factory by setting the user name password and URL.
using Newtonsoft.Json;
using RabbitMQ.Client;
using System;
using System.Text;
namespace RabbitMQProducer
{
static class Program
{
static void Main(string[] args)
{
var connectionFactory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "guest",
Password = "guest",
};
using var con = connectionFactory .CreateConnection();
using var channel = con.CreateModel();
DirectExchangePublisher.Publish(channel);
}
}
}
(2) create new calss DirectExchangePulisher for publishing messages. After that, create messages and publish to exchange with routing key.
using Newtonsoft.Json;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMQProducer
{
public class DirectExchangePublisher
{
public static void Publish(IModel channel)
{
var ttl = new Dictionary<string, object>
{
{ "x-message-ttl", 30000 }
};
channel.ExchangeDeclare("demo-direct-exchange", ExchangeType.Direct, arguments: ttl);
int count = 0;
while (true)
{
var message = new { Name = "Producer", Message = $"Hello! Count: {count}" };
var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
channel.BasicPublish("demo-direct-exchange", "account.init", null, body);
count++;
Thread.Sleep(1000);
}
}
}
}
(3) make the instance again from the connection factory by setting the user name and password in a separate project for consuming messages.
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace RabbitMQConsumer
{
class Program
{
static void Main(string[] args)
{
var connectionFactory= new ConnectionFactory()
{
HostName = "localhost",
UserName = "guest",
Password = "guest",
};
using var con = factory.CreateConnection();
using var channel = con .CreateModel();
DirectExchangeConsumer.Consume(channel);
}
}
}
(4) create DirectExchangeConsumer class
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
namespace RabbitMQConsumer
{
public class DirectExchangeConsumer
{
public static void Consume(IModel channel)
{
channel.ExchangeDeclare("demo-direct-exchange", ExchangeType.Direct);
channel.QueueDeclare("demo-direct-queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind("demo-direct-queue", "demo-direct-exchange", "account.init");
channel.BasicQos(0, 10, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
var body = e.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(message);
};
channel.BasicConsume("demo-direct-queue", true, consumer);
Console.WriteLine("Consumer started");
Console.ReadLine();
}
}
}
The result after consuming messages
Topic Exchange type:
In Topic exchange, the exchange routes the message to queue based on matching between routing key and binding queue. The sender publish the message with routing key to topic exchange.
At the receiving end, the exchange attempt to match routing key to binding routing pattern of all queue, if the receiver queue found the message delivered otherwise the message ignored.
(1) we route the message to exchange by the routing key “account.update”.
using Newtonsoft.Json;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMQProducer
{
public static class TopicExchangePublisher
{
public static void Publish(IModel channel)
{
var ttl = new Dictionary<string, object>
{
{ "x-message-ttl", 30000 }
};
channel.ExchangeDeclare("demo-topic-exchange", ExchangeType.Topic, arguments: ttl);
int count = 0;
while (true)
{
var message = new { Name = "Producer", Message = $"Hello! Count: {count}" };
var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
channel.BasicPublish("demo-topic-exchange", "account.update", null, body);
count++;
Thread.Sleep(1000);
}
}
}
}
(2), consumer consume from “demo-topic -queue” with routing pattern “account.*”.
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; namespace RabbitMQConsumer { public static class TopicExchangeConsumer { public static void Consume(IModel channel) { channel.ExchangeDeclare("demo-topic-exchange", ExchangeType.Topic); channel.QueueDeclare("demo-topic-queue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueBind("demo-topic-queue", "demo-topic-exchange", "account.*"); channel.BasicQos(0, 10, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => { var body = e.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine(message); }; channel.BasicConsume("demo-topic-queue", true, consumer); Console.WriteLine("Consumer started"); Console.ReadLine(); } } }
Header Exchange type:
In Header exchange, the exchange routes the message based on the header value rather routing key.
A message publish to header exchange in key value pair. After that, the exchange start trying to match the header value with the binding value of all queues. In case the match found, delivered to that queue otherwise ignored the message.
(1) we route the message to exchange by header key value pair “account, new ”.
using Newtonsoft.Json;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMQProducer
{
public static class HeaderExchangePublisher
{
public static void Publish(IModel channel)
{
var ttl = new Dictionary<string, object>
{
{ "x-message-ttl", 30000 }
};
channel.ExchangeDeclare("demo-header-exchange", ExchangeType.Headers, arguments: ttl);
int count = 0;
while (true)
{
var message = new { Name = "Producer", Message = $"Hello! Count: {count}" };
var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
var properties = channel.CreateBasicProperties();
properties.Headers = new Dictionary<string, object> { { "account", "new" } };
channel.BasicPublish("demo-header-exchange", string.Empty, properties, body);
count++;
Thread.Sleep(1000);
}
}
}
}
(2) consumer consume from “demo-header-queue” with key value pair “account, new ”
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
namespace RabbitMQConsumer
{
public static class HeaderExchangeConsumer
{
public static void Consume(IModel channel)
{
channel.ExchangeDeclare("demo-header-exchange", ExchangeType.Headers);
channel.QueueDeclare("demo-header-queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var header = new Dictionary<string, object> { { "account", "new" } };
channel.QueueBind("demo-header-queue", "demo-header-exchange", string.Empty, header);
channel.BasicQos(0, 10, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
var body = e.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(message);
};
channel.BasicConsume("demo-header-queue", true, consumer);
Console.WriteLine("Consumer started");
Console.ReadLine();
}
}
}
Fanout Exchange type:
In Fanout exchange, the exchange sends the message to all the queues (broadcast). the message is published to a Fanout exchange and routed the message to all the queues.
(1) we route the message to exchange by empty routing key
using Newtonsoft.Json;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMQProducer
{
public static class FanoutExchangePublisher
{
public static void Publish(IModel channel)
{
var ttl = new Dictionary<string, object>
{
{ "x-message-ttl", 30000 }
};
channel.ExchangeDeclare("demo-fanout-exchange", ExchangeType.Fanout, arguments: ttl);
int count = 0;
while (true)
{
var message = new { Name = "Producer", Message = $"Hello! Count: {count}" };
var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
var properties = channel.CreateBasicProperties();
properties.Headers = new Dictionary<string, object> { { "account", "new" } };
channel.BasicPublish("demo-fanout-exchange", string.Empty, properties, body);
count++;
Thread.Sleep(1000);
}
}
}
}
(2) all queue will read the message.
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
namespace RabbitMQConsumer
{
public static class FanoutExchangeConsumer
{
public static void Consume(IModel channel)
{
channel.ExchangeDeclare("demo-fanout-exchange", ExchangeType.Fanout);
channel.QueueDeclare("demo-fanout-queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind("demo-fanout-queue", "demo-fanout-exchange", string.Empty);
channel.BasicQos(0, 10, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
var body = e.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(message);
};
channel.BasicConsume("demo-fanout-queue", true, consumer);
Console.WriteLine("Consumer started");
Console.ReadLine();
}
}
}
The queues which created in RabbitMQ dashboard are:
Comments
Post a Comment