Simple .Net consumer using Schema Registry
Review this example code to learn how you can create a simple .NET consumer which is using Schema Registry.
internal class Program { private static void Main(string[] args) { var schemaRegistryConfig = new SchemaRegistryConfig { Url = "http://schemareg:7788/api/v1" }; var consumerConfig = new ConsumerConfig { BootstrapServers = "broker:9092", GroupId = "simple-dotnet-consumer", AutoOffsetReset = AutoOffsetReset.Earliest, EnablePartitionEof = true }; using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig)) using (var consumer = new ConsumerBuilder<string, GenericRecord>(consumerConfig) .SetValueDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync()) .Build()) { consumer.Subscribe("topic"); var start = DateTime.Now; long messageCounter = 0; try { while (!(Console.KeyAvailable && Console.ReadKey().Key == ConsoleKey.Q)) { var result = consumer.Consume(TimeSpan.FromMilliseconds(100)); if (result == null) { continue; } if (result.IsPartitionEOF) { break; } ++messageCounter; if (messageCounter % 1024 == 0) { Console.WriteLine( $"Received message key: \"{result.Message.Key}\" " + $"value: < {result.Message.Value["message_id"]}, in flight on send: {result.Message.Value["in_flight_on_send"]} >"); } } } catch (OperationCanceledException) {} consumer.Close(); // commit offset and unsubscribe var elapsed = DateTime.Now - start; Console.WriteLine("average throughput: {0:N3} msg/sec, {1} messages over {2:N3} sec", messageCounter / elapsed.TotalSeconds, messageCounter, elapsed.TotalSeconds); } } }