
Easy JSON Serialization for Kafka in DotNet
How to Serialize JSON Without a Schema Registry
If you have used the dotnet Kafka Client from Confluent you may have noticed that the JsonSerializer requires a schema registry.
var producer = new ProducerBuilder<Null, Person>(producerConfig)
.SetValueSerializer(new JsonSerializer<Person>(schemaRegistry, jsonSerializerConfig))
.Build()
It’s always good practice to use the schema registry, but sometimes we have JSON in a topic that we didn’t really have control over, or maybe we don’t have a schema registry for some other reason. Fear not, we can still make this work. We just need to implement our own JsonSerializer and JsonDeserializer that doesn’t need a schema registry.
For the Serializer, we can just serialize our object to JSON, then convert that JSON string to bytes. And for the Deserializer, we just do the reverse; convert the bytes to a JSON string and Deserialize that JSON back into our object.
using Confluent.Kafka;
using System.Text;
using System.Text.Json;
namespace JsonSerDes
{
public class JsonSerializer<T> : IAsyncSerializer<T> where T : class
{
Task<byte[]> IAsyncSerializer<T>.SerializeAsync(T data, SerializationContext context)
{
string jsonString = JsonSerializer.Serialize(data);
return Task.FromResult(Encoding.ASCII.GetBytes(jsonString));
}
}
public class JsonDeserializer<T> : IAsyncDeserializer<T> where T : class
{
public Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool isNull, SerializationContext context)
{
string json = Encoding.ASCII.GetString(data.Span);
return Task.FromResult(JsonSerializer.Deserialize<T>(json));
}
}
}
Now we can simply use the SetValueSerializer and SetValueDeserializer with our new implementations.
var producer = new ProducerBuilder<Null, User>(config)
.SetValueSerializer(new JsonSerializer<User>())
.Build();
var consumer = new ConsumerBuilder<Null, User>(config)
.SetValueDeserializer(new JsonDeserializer<User>().AsSyncOverAsync())
.Build();