Performant .NET producer example
Review this example code to learn how you can create a more complex, high-performing .NET producer with async processing and TPL. This producer is capable of producing around 50000 messages every second in development environment.
internal class Program
{
private static int _inFlight;
private static long _delivered;
private static void HandleDeliveryResult(int msgid, DeliveryResult<string, string> deliveryResult)
{
if (msgid % 1024 == 0) // writing to console on every message would be a bottleneck
{
Console.WriteLine($"Delivered '{deliveryResult.Value}' to '{deliveryResult.TopicPartitionOffset}', in flight on delivery confirmation: {_inFlight}");
}
Interlocked.Decrement(ref _inFlight);
Interlocked.Increment(ref _delivered);
}
private static async Task Main(string[] args)
{
const int inFlightRequests = 16384;
const int bufferedMessages = inFlightRequests * 4;
var config = new ProducerConfig { BootstrapServers = "broker:9092", LingerMs = 50, QueueBufferingMaxMessages = bufferedMessages };
using var p = new ProducerBuilder<string, string>(config).Build();
var actionBlock = new ActionBlock<int>(
msgid => p.ProduceAsync("topic", new Message<string, string> { Key=msgid.ToString(), Value = $"{msgid}, in flight on send: {Interlocked.Increment(ref _inFlight)}" }
.ContinueWith(async task => HandleDeliveryResult(msgid, await task)),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = inFlightRequests, BoundedCapacity = inFlightRequests });
int msgCounter = 0;
bool accepted = true;
var start = DateTime.Now;
while (accepted && !(Console.KeyAvailable && Console.ReadKey().Key == ConsoleKey.Q || (DateTime.Now - start).TotalSeconds >= 60))
{
try
{
accepted = await actionBlock.SendAsync(msgCounter++);
}
catch (ProduceException<string, string> e)
{
Console.WriteLine($"Produce failed: {e.Error.Reason}");
}
}
actionBlock.Complete();
await actionBlock.Completion;
Console.WriteLine("average throughput: {0:N3} msg/sec", _delivered / (DateTime.Now - start).TotalSeconds);
}
}