Performant .NET producer example
Review this example code to learn how you can create a more complex, high-performing .NET producer with async processing and TPL. This producer is capable of producing around 50000 messages every second in development environment.
internal class Program { private static int _inFlight; private static long _delivered; private static void HandleDeliveryResult(int msgid, DeliveryResult<string, string> deliveryResult) { if (msgid % 1024 == 0) // writing to console on every message would be a bottleneck { Console.WriteLine($"Delivered '{deliveryResult.Value}' to '{deliveryResult.TopicPartitionOffset}', in flight on delivery confirmation: {_inFlight}"); } Interlocked.Decrement(ref _inFlight); Interlocked.Increment(ref _delivered); } private static async Task Main(string[] args) { const int inFlightRequests = 16384; const int bufferedMessages = inFlightRequests * 4; var config = new ProducerConfig { BootstrapServers = "broker:9092", LingerMs = 50, QueueBufferingMaxMessages = bufferedMessages }; using var p = new ProducerBuilder<string, string>(config).Build(); var actionBlock = new ActionBlock<int>( msgid => p.ProduceAsync("topic", new Message<string, string> { Key=msgid.ToString(), Value = $"{msgid}, in flight on send: {Interlocked.Increment(ref _inFlight)}" } .ContinueWith(async task => HandleDeliveryResult(msgid, await task)), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = inFlightRequests, BoundedCapacity = inFlightRequests }); int msgCounter = 0; bool accepted = true; var start = DateTime.Now; while (accepted && !(Console.KeyAvailable && Console.ReadKey().Key == ConsoleKey.Q || (DateTime.Now - start).TotalSeconds >= 60)) { try { accepted = await actionBlock.SendAsync(msgCounter++); } catch (ProduceException<string, string> e) { Console.WriteLine($"Produce failed: {e.Error.Reason}"); } } actionBlock.Complete(); await actionBlock.Completion; Console.WriteLine("average throughput: {0:N3} msg/sec", _delivered / (DateTime.Now - start).TotalSeconds); } }