Main menu

Pages

Apache Kafka Using C# & Asp.net 6 Core Implementation

 Apache Kafka Using C#  & Asp.net 6  Core Implementation

As we talked about before in this article conceptual wise
Today we are gonna talk about the implementation part 

Prerequisites:

  • .Net 6  Core

  • Docker Desktop
  • Confluent.Kafka package
  • Postman

let's start 

Open the CMD window and run this command 

curl --silent --output docker-compose.yml https://raw.githubusercontent.com/confluentinc/cp-all-in-one/6.1.1-post/cp-all-in-one/docker-compose.yml

After finishing the installation, you will need to run images so use this command also

docker-compose up -d

Here all is good 😊

- Create a new API project and name it the producer 

- Install the following package from the NuGet package: Confluent. Kafka

- Create ProducerController 

- Then you can use the following code

public class public class ProducerController : ControllerBase
    {
        private readonly string bootstrapServers = "localhost:9092";
        private readonly string topic = "test";
        private readonly ILogger<string> logger;

        public ProducerController(ILogger<string> logger)
        {
            this.logger = logger;
        }

        [HttpPost]
        public async Task<IActionResult> PostOrder([FromBody] OrderRequest orderRequest)
        {
            string message = JsonSerializer.Serialize(orderRequest);
            return Ok(await SendOrderRequest(topic, message));
        }

        private async Task<bool> SendOrderRequest(string topic, string message)
        {
            ProducerConfig config = new()
            {
                BootstrapServers = bootstrapServers,
                ClientId = Dns.GetHostName()
            };

            try
            {
                using (var producer = new ProducerBuilder<Null, string>(config).Build())
                {
                    var result = await producer.ProduceAsync(topic, new Message<Null, string>()
                    {
                        Value = message
                    });
                    Debug.WriteLine($"Delivery Timestamp:{result.Timestamp.UtcDateTime}");
                    return await Task.FromResult(true);
                }
            }
            catch (Exception ex)
            {
                logger.LogError(ex.Message);
                return await Task.FromResult(false);
            }
        }
    }


- Create a new API project and name it a consumer

Install the following package from the NuGet package: Confluent.  Kafka

- Create hosted service by the following




public class KafkaConsumerService : IHostedService
    {
        private readonly string topic = "test";
        private readonly string groupId = "test_group";
        private readonly string bootstrapServers = "localhost:9092";

        public Task StartAsync(CancellationToken cancellationToken)
        {
            ConsumerConfig config = new()
            {
                GroupId = groupId,
                BootstrapServers = bootstrapServers,
                AutoOffsetReset = AutoOffsetReset.Earliest
            };

            try
            {
                using (var consumerBuilder = new ConsumerBuilder<Ignore, string>(config).Build())
                {
                    consumerBuilder.Subscribe(topic);
                    var cancelToken = new CancellationTokenSource();

                    try
                    {
                        while (true)
                        {
                            var consumer = consumerBuilder.Consume(cancelToken.Token);
                            var order = JsonSerializer.Deserialize<OrderRequest>(consumer.Message.Value);
                            Console.WriteLine($"Processing Order Id:{order.OrderId}");
                        }
                    }
                    catch (Exception)
                    {

                        consumerBuilder.Close();
                    }

                }
            }
            catch (Exception ex)
            {
                System.Diagnostics.Debug.WriteLine(ex.Message);
            }
            return Task.CompletedTask;
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            return Task.CompletedTask;
        }
    }

- don't forget to register new hosted service go to the program file and set it like 

 builder.Services.AddSingleton<IHostedService, KafkaConsumerService>();

- Also use OrderRequest in both projects to send and bind data when receive



public class OrderRequest
    {
        public int OrderId { get; set; }
        public int ProductId { get; set; }
        public int CustomerId { get; set; }
        public int Quantity { get; set; }
        public string Status { get; set; }
    }


- Make sure that Kafka all images running in the docker desktop

- Run both projects together and start calling PostOrder using the following request from the postman as a post request



{
  "orderId": 150,
  "productId": 1,
  "customerId": 120,
  "quantity": 30,
  "status": "xyz"
}


- By start sending this request ,your request will be sent to kafka proker on the predefined topic name and server name so by debging the code in consumer you will see that there is a messge sent on this topic and consumed successfully and printed in console



- Also you can use this link to see http://localhost:9021/clusters kafka server on local host



- By clicking on topics in the left hand side when open it Topics you will see your created topic

Good luck ✌

Comments