Storm & Kafka : serializing complex data types using Kryo

I recently had to work with Storm and Kafka, which are respectively : a framework for near real-time data processing ; a real-time pub/sub system. Both can be heavily distributed across multiple machines on a cluster.

Even if the promise of Storm combined with Kafka sounds great, things began to get more complex when I wanted to transfer non-primitive data inside and between the two systems.It happens that the documentation is not that clear about that topic (in addition to being “sparse”) and is really lacking some examples to be able to understand what’s going on under the hood. Here is the information I gathered about this topic.

(In the following, I’ll assume you are already familiar with the basics of Storm and Kafka.)

Retrieving primitive data

First things first, you have to get raw data in the first place. You can get it from any source, including a Kafka topic. Hopefuly, Storm is shipped with a KafkaSpout that works pretty much out of the box.

String topic = "mytopic";
String zkConnectStr = ",";
BrokerHosts hosts = new ZkHosts( zkConnectStr );

SpoutConfig spoutConf = new SpoutConfig( hosts, topic, "/kafka/consumers/storm/" + topic, "uniqid" );

spoutConf.scheme = new SchemeAsMultiScheme( scheme );
KafkaSpout kafkaSpout = new KafkaSpout( spoutConf );

Next, the “complex” type we’ll use is a Datapoint.

Emitting complex data types with your Bolts

Now that you got some raw data, here comes the time for some processing.

Depending on what you are trying to do, it might be worth to have a class that represents the data you’ll be manipulating in the bolts of your Storm topology. Otherwise, you may spend a lot of time parsing and serializing manually between each bolt.

With Storm, you have the ability to transmit complex data between your bolts.

Unfortunately, you have to explicitly tell Storm what serializer to use for each one of your data classes that will be emitted by your bolts. You have to do this on the configuration object used to setup your topology.

Config conf = new Config( );
conf.registerSerialization( Datapoint.class, DatapointSerializer.class);

public class DatapointSerializer extends com.esotericsoftware.kryo.Serializer<Datapoint> implements
        Serializable {

    public void write( Kryo kryo, Output output, Datapoint datapoint ) {
        // Write down all the fields of the object
        output.writeString( datapoint.providerID );
        output.writeString( datapoint.deviceID );
        output.writeString( datapoint.dataType );
        output.writeString( datapoint.rawValue );
        output.writeLong( datapoint.timestamp );

    public Datapoint read( Kryo kryo, Input input, Class<Datapoint> aClass ) {
        // Read all the fields of the object
        Datapoint dp = new Datapoint( );
        dp.providerID = input.readString( );
        dp.deviceID = input.readString( );
        dp.dataType = input.readString( );
        dp.rawValue = input.readString( );
        dp.timestamp = input.readLong( );
        return dp;

But Kryo hopefully ships with a FieldSerializer, which will automatically parse all the attributes of the class that are either primitive or that have a serializer declared.

conf.registerSerialization( Datapoint.class, FieldSerializer.class);

Sending complex data types to Kafka

Once your data has been parsed and processed, it’s time to send it back to Kafka (on another topic).

To achieve this, we’ll use a Kafka Producer and you will have to create an Encoder for the complex data type your are using. Storm already embeds the Kryo serializer, which seems to work pretty well, so we’ll wrap it in a KafkaKryoEncoder class of our own:

public class KafkaKryoEncoder<T> implements Encoder<T>, Decoder<T> {

    private final Kryo kryo;
    private Class<T> clazz;

    // Note: this constructor is needed as it won't work if it's missing
    public KafkaKryoEncoder(Serializer<T> serializer, Class<T> clazz) {
        // Kryo to bytes for kafka
        this.kryo = new Kryo( );
        this.kryo.register( clazz, serializer );

        this.clazz = clazz;

    public byte[ ] toBytes( T datapoint ) {
        ByteArrayOutputStream stream = new ByteArrayOutputStream( );
        Output output = new Output( stream );

        kryo.writeObject( output, datapoint );

        output.close( ); // Also calls output.flush()

        return stream.toByteArray( ); // Serialization done, get bytes

    public T fromBytes( byte[ ] bytes ) {
        return kryo.readObject( new Input( new ByteArrayInputStream( bytes ) ), clazz );


Side-note. There is no need of implementing the Decoder, as the parsing is performed by Storm through the KafkaSpout that we’ll see later in the post.

Now, the producer needs to know how to encode the tuple value into bytes. You have to provide the implementation of an Encoder<T> (or use an existing one) and register it in the Kafka Producer.

Solution 1: register your encoder class in the Kafka config

Properties props = new Properties( );
props.put( "", "" );
props.put( "serializer.class", "com.tomsdev.MyCustomEncoder");
ProducerConfig config = new ProducerConfig( props );

KeyedMessage<String, T> data = new KeyedMessage<>( "myTopic", myValue );
producer.send( data );

Pros. The encoding is performed automatically, there’s no need of extra manipulation of your complex type.

Cons. You can’t pass an instance of the encoder, you have no choice but to pass the class name as a string, which mean no additional constructor argument. This is problematic, especially when you use Kryo to serialize, as it needs the target class as an argument in order to work. Then the only solution is to create an additional class for your datatype, a “constructor class”, in which you call “super” with the right arguments. Example:

public class DatapointKafkaKryoEncoder extends KafkaKryoEncoder<Datapoint> {
    public DatapointKafkaKryoEncoder(VerifiableProperties props) {
        super(new DatapointKit.InnerDatapointSerializer(), Datapoint.class);

Important note. when registering a custom serializer.class , the class must have a constructor with the following signature: (VerifiableProperties props).

Solution 2 – manual pre-processing with an instance of your encoder

With solution, you stick with the default (byte[]) encoder of Storm and manually encode your data before writing it.

Properties props = new Properties( );
props.put( "", "" );
props.put( "serializer.class", "kafka.serializer.DefaultEncoder");
ProducerConfig config = new ProducerConfig( props );
byte[] bytes = this.encoder.toBytes(value);
KeyedMessage<String, byte[]> data = new KeyedMessage<>( topic, bytes );
byteProducer.send( data );

Note. The kafka.serializer.DefaultEncoder is not actually an encoder: it just takes a byte[] and returns it. Good thing to know.

Pros. Reduces code redundancy (in particular, the need of “constructor classes”). You have control over the serialization process.
Cons. You have to explicitly encode the data.

Overall, I prefer this solution, as it allows for more flexibility.

Retrieving complex data types from a KafkaSpout

Finally, we need to be able to retrieve complex data types from Kafka.

To do this, you will have to provide a Scheme to the KafkaSpout, that will parse the byte array into our complex object.

Here is an utility class that will help you using Kryo as a Scheme:

public class KryoScheme<T> implements Scheme {

    private Class<T> clazz;
   private Serializer<T> serializer;

    public KryoScheme( Class<T> clazz, Serializer<T> serializer) {
        this.clazz = clazz;
      this.serializer = serializer;

   transient private static Logger log = LogManager.getLogger(KryoScheme.class);

    public List<Object> deserialize( byte[ ] buffer ) {
        Kryo kryo = new Kryo( );
        kryo.register( clazz, serializer );

      T dp;
        try {
         dp = kryo.readObject( new Input( new ByteArrayInputStream( buffer ) ), this.clazz );
      } catch (Exception e) {
         log.error("Kryo Scheme failed to deserialize data from Kafka to {}. Raw: {}",
               new String(buffer));
         dp = null;

        return Utils.tuple( dp );

    public Fields getOutputFields( ) {
        return new Fields( "tuple" );


End note about terminology

By using a combination of Storm, Kafka and Kryo, we stumble upon various terminology designating the same thing, but used in the different libraries.

That’s why we have a Serializer (from Kryo, used by Storm) and an Encoder (used by Kafka), or to wrap the Serializer in a Scheme (used by KafkaSpout).

These libraries/frameworks are really nice but it’s a shame that there is no clean way of unifying all the encoding/decoding process (I already tried to wrap everything nicely but because I could not come up with an elegant solution…)



It is possible to serialize complex data types both internally (between bolts) and externally (to/from Kafka), but the process is quite cumberstone as there are lot of wrapping involved.

In simple cases, it might be more efficient to use Storm Fields for internal communication between Bolts/Spouts and perform a JSON encoding/parsing when sending/receiving it to/from Kafka. But, of course, this will reduce the performances as Kryo serialization is very lightweight. Performance VS Maintainability.



Storm & Kafka : serializing complex data types using Kryo was last modified: July 14th, 2015 by Tom Guillermin

1 thought on Storm & Kafka : serializing complex data types using Kryo

Leave a Reply