Simple .Net producer using Schema Registry
Review this example code to learn how you can create a simple .NET producer which is using Schema Registry.
internal class Program
{
private static int _inFlight;
private static long _delivered;
private static void HandleDeliveryResult(int msgid, DeliveryResult<string, GenericRecord> deliveryResult)
{
if (msgid % 1024 == 0) // writing to console on every message would be a bottleneck
{
Console.WriteLine($"Delivered '{deliveryResult.Value}' to '{deliveryResult.TopicPartitionOffset}', in flight on delivery confirmation: {_inFlight}");
}
--_inFlight;
++_delivered;
}
private static void Main(string[] args)
{
var schema = (Avro.RecordSchema) Avro.Schema.Parse(@"{
""type"": ""record"",
""name"": ""ExampleRecord"",
""namespace"": ""ExampleSchema"",
""fields"": [
{
""name"": ""message_id"",
""type"": ""int""
},
{
""name"": ""in_flight_on_send"",
""type"": ""int""
}
]
}");
var schemaRegistryConfig = new SchemaRegistryConfig { Url = "http://schemareg:7788/api/v1" };
var producerConfig = new ProducerConfig { BootstrapServers = "broker:9092" };
using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
using (var producer = new ProducerBuilder<string, GenericRecord>(producerConfig)
.SetValueSerializer(new AvroSerializer<GenericRecord>(schemaRegistry, new AvroSerializerConfig
{
ProtocolVersion = AvroSerializerConfig.ProtocolVersions.SchemaMetadataIdAsVersionProtocol
}))
.Build())
{
int msgCounter = 0;
var start = DateTime.Now;
while (!(Console.KeyAvailable && Console.ReadKey().Key == ConsoleKey.Q || (DateTime.Now - start).TotalSeconds >= 60))
{
int msgid = ++msgCounter;
var record = new GenericRecord(schema);
record.Add("message_id", msgid);
record.Add("in_flight_on_send", ++_inFlight);
try
{
producer.ProduceAsync("topic", new Message<string, GenericRecord>
{
Key = msgid.ToString(),
Value = record
}
).ContinueWith(task => HandleDeliveryResult(msgid, task.Result));
}
catch (ProduceException<string, string> e)
{
Console.WriteLine($"Produce failed: {e.Error.Reason}");
}
}
producer.Flush();
Console.WriteLine("average throughput: {0:N3} msg/sec", _delivered / (DateTime.Now - start).TotalSeconds);
}
}
}
Supported values for ProtocolVersion
The Schema Registry client consists of two major components: The REST client and the serializers (SerDes). You are supposed to define the record schemas, and to set the respective SerDes to Kafka client. When the first message with the given schema definition is produced, the REST client stores it on the Schema Registry server. The server might have multiple ways to reference these schemas from the client. Each message produced to Kafka is transformed to contain an ID to the schema stored on the Schema Registry server, so when the other side consumes these messages, it is able to download these schemas by the IDs. The format of these IDs are denoted by the first byte of the message, which is called the Magic Byte or Protocol ID.
ProtocolVersion | Integer value | Specification |
---|---|---|
AvroSerializerConfig.ProtocolVersions.ConfluentProtocol | 0 | https://registry-project.readthedocs.io/en/latest/serdes.html#confluent-protocol |
AvroSerializerConfig.ProtocolVersions.SchemaMetadataIdAsVersionProtocol | 1 | https://registry-project.readthedocs.io/en/latest/serdes.html#schema-metadata-id-and-version-protocol |
AvroSerializerConfig.ProtocolVersions.SchemaVersionIdAsLongProtocol | 2 | https://registry-project.readthedocs.io/en/latest/serdes.html#schema-version-id-as-long-protocol |
AvroSerializerConfig.ProtocolVersions.SchemaVersionIdAsIntProtocol | 3 | https://registry-project.readthedocs.io/en/latest/serdes.html#schema-version-id-as-int-protocol |