Generic Jackson Serde (Serializer - Deserializer) with Lombok for Kafka Streams

Motivation

While it is true that Avro is de facto standard for data serialization in Kafka, sometimes we don't want to use it for several reasons: don't want to set up the schema registry, generate the classes or write the Avro schema. As in a Proof of concept or a little project where JSON can meet our needs.

It is also true that writing your own Serdes is not fun at all.

I came up with a solution that satisfies me: a Generic Jackson Serdes to use for any Data Type

 new JacksonSerdes<>(Transaction.class);

It would be nice to make the invocation even shorter knowing the Generic class type but, due to type erasure, it is not possible.

If we make use of the Lombok - Jakson integration for our Data classes we reduce boilerplate code even more.

Data Class

@Builder
@Jacksonized
@Data
public class Transaction {

    @JsonProperty("transaction_code")
    private final String transactionCode;
    
    private final int accountId;
    private final Date date;
    private final int amount;
} 

Kafka Serdes

  public class JacksonSerdes<T> implements Serde<T> {

    private final Class<T> type;

    public JacksonSerdes(Class<T> type) {
        this.type = type;
    }

    @Override
    public Serializer<T> serializer() {
        return new JacksonSerializer<T>();
    }

    @Override
    public Deserializer<T> deserializer() {
        return new JacksonDeserializer<T>(type);
    }
  }

Deserializer

public class JacksonDeserializer<T> implements Deserializer<T> {

    private final ObjectMapper mapper;

    private final Class<T> type;

    public JacksonDeserializer(Class<T> type) {
        this.type = type;
        mapper = new ObjectMapper();
    }

    @Override
    public T deserialize(String topic, byte[] data) {
        try {
            return mapper.readValue(data, type);
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }

Serializer

public class JacksonSerializer <T>  implements Serializer<T> {

    private final ObjectMapper mapper = new ObjectMapper();

    @Override
    public byte[] serialize(String topic, T data) {
        try {
            return mapper.writeValueAsBytes(data);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            return null;
        }
    }