Simple .Net consumer using Schema Registry
Review this example code to learn how you can create a simple .NET consumer which is using Schema Registry.
internal class Program
{
private static void Main(string[] args)
{
var schemaRegistryConfig = new SchemaRegistryConfig {
Url = "http://schemareg:7788/api/v1"
};
var consumerConfig = new ConsumerConfig {
BootstrapServers = "broker:9092",
GroupId = "simple-dotnet-consumer",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnablePartitionEof = true
};
using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
using (var consumer = new ConsumerBuilder<string, GenericRecord>(consumerConfig)
.SetValueDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
.Build())
{
consumer.Subscribe("topic");
var start = DateTime.Now;
long messageCounter = 0;
try
{
while (!(Console.KeyAvailable && Console.ReadKey().Key == ConsoleKey.Q))
{
var result = consumer.Consume(TimeSpan.FromMilliseconds(100));
if (result == null) { continue; }
if (result.IsPartitionEOF) { break; }
++messageCounter;
if (messageCounter % 1024 == 0) {
Console.WriteLine(
$"Received message key: \"{result.Message.Key}\" " +
$"value: < {result.Message.Value["message_id"]}, in flight on send: {result.Message.Value["in_flight_on_send"]} >");
}
}
} catch (OperationCanceledException) {}
consumer.Close(); // commit offset and unsubscribe
var elapsed = DateTime.Now - start;
Console.WriteLine("average throughput: {0:N3} msg/sec, {1} messages over {2:N3} sec", messageCounter / elapsed.TotalSeconds, messageCounter, elapsed.TotalSeconds);
}
}
}