Thumbnail image

Easy JSON Serialization for Kafka in DotNet

How to Serialize JSON Without a Schema Registry

If you have used the dotnet Kafka Client from Confluent you may have noticed that the JsonSerializer requires a schema registry.

var producer = new ProducerBuilder<Null, Person>(producerConfig)
    .SetValueSerializer(new JsonSerializer<Person>(schemaRegistry, jsonSerializerConfig))
    .Build()

It’s always good practice to use the schema registry, but sometimes we have JSON in a topic that we didn’t really have control over, or maybe we don’t have a schema registry for some other reason. Fear not, we can still make this work. We just need to implement our own JsonSerializer and JsonDeserializer that doesn’t need a schema registry.

For the Serializer, we can just serialize our object to JSON, then convert that JSON string to bytes. And for the Deserializer, we just do the reverse; convert the bytes to a JSON string and Deserialize that JSON back into our object.

using Confluent.Kafka;
using System.Text;
using System.Text.Json;

namespace JsonSerDes
{
    public class JsonSerializer<T> : IAsyncSerializer<T> where T : class
    {
        Task<byte[]> IAsyncSerializer<T>.SerializeAsync(T data, SerializationContext context)
        {
            string jsonString = JsonSerializer.Serialize(data);
            return Task.FromResult(Encoding.ASCII.GetBytes(jsonString));
        }
    }

    public class JsonDeserializer<T> : IAsyncDeserializer<T> where T : class
    {
        public Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool isNull, SerializationContext context)
        {
            string json = Encoding.ASCII.GetString(data.Span);
            return Task.FromResult(JsonSerializer.Deserialize<T>(json));
        }
    }
}

Now we can simply use the SetValueSerializer and SetValueDeserializer with our new implementations.

var producer = new ProducerBuilder<Null, User>(config)
    .SetValueSerializer(new JsonSerializer<User>())
    .Build();
    
var consumer = new ConsumerBuilder<Null, User>(config)
    .SetValueDeserializer(new JsonDeserializer<User>().AsSyncOverAsync())
    .Build();

comments

comments powered by Disqus