Apache Kafka Using C# & Asp.net 6 Core Implementation
As we talked about before in this article conceptual wise
Today we are gonna talk about the implementation part
Prerequisites:
-
.Net 6 Core
- Docker Desktop
- Confluent.Kafka package
- Postman
let's start
Open the CMD window and run this command
curl --silent --output docker-compose.yml https://raw.githubusercontent.com/confluentinc/cp-all-in-one/6.1.1-post/cp-all-in-one/docker-compose.yml
After finishing the installation, you will need to run images so use this command also
docker-compose up -d
Here all is good 😊
- Create a new API project and name it the producer
- Install the following package from the NuGet package: Confluent. Kafka
- Create ProducerController
- Then you can use the following code
public class public class ProducerController : ControllerBase
{
private readonly string bootstrapServers = "localhost:9092";
private readonly string topic = "test";
private readonly ILogger<string> logger;
public ProducerController(ILogger<string> logger)
{
this.logger = logger;
}
[HttpPost]
public async Task<IActionResult> PostOrder([FromBody] OrderRequest orderRequest)
{
string message = JsonSerializer.Serialize(orderRequest);
return Ok(await SendOrderRequest(topic, message));
}
private async Task<bool> SendOrderRequest(string topic, string message)
{
ProducerConfig config = new()
{
BootstrapServers = bootstrapServers,
ClientId = Dns.GetHostName()
};
try
{
using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
var result = await producer.ProduceAsync(topic, new Message<Null, string>()
{
Value = message
});
Debug.WriteLine($"Delivery Timestamp:{result.Timestamp.UtcDateTime}");
return await Task.FromResult(true);
}
}
catch (Exception ex)
{
logger.LogError(ex.Message);
return await Task.FromResult(false);
}
}
}
- Create a new API project and name it a consumer
- Install the following package from the NuGet package: Confluent. Kafka
- Create hosted service by the following
public class KafkaConsumerService : IHostedService
{
private readonly string topic = "test";
private readonly string groupId = "test_group";
private readonly string bootstrapServers = "localhost:9092";
public Task StartAsync(CancellationToken cancellationToken)
{
ConsumerConfig config = new()
{
GroupId = groupId,
BootstrapServers = bootstrapServers,
AutoOffsetReset = AutoOffsetReset.Earliest
};
try
{
using (var consumerBuilder = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumerBuilder.Subscribe(topic);
var cancelToken = new CancellationTokenSource();
try
{
while (true)
{
var consumer = consumerBuilder.Consume(cancelToken.Token);
var order = JsonSerializer.Deserialize<OrderRequest>(consumer.Message.Value);
Console.WriteLine($"Processing Order Id:{order.OrderId}");
}
}
catch (Exception)
{
consumerBuilder.Close();
}
}
}
catch (Exception ex)
{
System.Diagnostics.Debug.WriteLine(ex.Message);
}
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
- don't forget to register new hosted service
go to the program file and set it like
builder.Services.AddSingleton<IHostedService, KafkaConsumerService>();
- Also use OrderRequest in both projects to send and bind data when receive
public class OrderRequest
{
public int OrderId { get; set; }
public int ProductId { get; set; }
public int CustomerId { get; set; }
public int Quantity { get; set; }
public string Status { get; set; }
}
- Make sure that Kafka all images running in the docker desktop
- Run both projects together and start calling PostOrder using the following request from the postman as a post request
{
"orderId": 150,
"productId": 1,
"customerId": 120,
"quantity": 30,
"status": "xyz"
}
- By start sending this request ,your request will be sent to kafka proker on the predefined topic name and server name so by debging the code in consumer you will see that there is a messge sent on this topic and consumed successfully and printed in console
- Also you can use this link to see http://localhost:9021/clusters kafka server on local host
- By clicking on topics in the left hand side when open it Topics you will see your created topic
Good luck ✌
Comments
Post a Comment