Apache Kafka is an open-source dis­trib­uted event-stream­ing plat­form designed to handle real-time data feeds. Developed by LinkedIn and later open-sourced, it is now part of the Apache Soft­ware Found­a­tion. Kafka is used to build real-time data pipelines and stream­ing applic­a­tions that can col­lect, pro­cess, store, and ana­lyse large amounts of data in real time.

Kafka is based on the concept of a dis­trib­uted com­mit log, allow­ing it to pub­lish and sub­scribe to streams of records, store them reli­ably in a fault-tol­er­ant way, and pro­cess them in real time. With its high through­put, scalab­il­ity, dur­ab­il­ity, and fault tol­er­ance, Kafka has become a corner­stone of mod­ern Big Data architectures.

In this blog post, we’ll out­line the steps to con­sume Kafka topic mes­sages that have been com­pressed and seri­al­ised using Pro­to­buf (Pro­tocol Buf­fers), based on our exper­i­ence with top­ics gen­er­ated by Netcracker’s Tele­com Oper­a­tions and Man­age­ment Solu­tion software.

Use Case

We’ll start with Kafka mes­sages that have been com­pressed using the LZ4 algorithm and seri­al­ised with Pro­to­buf, pro­duced by an external Kafka producer.

Pro­to­buf is an effi­cient, lan­guage-neut­ral data seri­al­isa­tion mech­an­ism developed by Google. It is widely used for trans­mit­ting data between sys­tems or stor­ing it in a com­pact format.

Pro­to­buf uses a bin­ary format, which is sig­ni­fic­antly more space-effi­cient than text-based formats like JSON or XML. Moreover, it is lan­guage-agnostic, offer­ing com­pat­ib­il­ity with mul­tiple pro­gram­ming lan­guages, includ­ing Java, C++, Python, Go, and more. 

Our object­ive is to code a Kafka con­sumer in Python to deseri­al­ise, in real time, a topic mes­sage that was pre­vi­ously pro­duced, com­pressed, and seri­al­ised using Pro­to­buf. The deseri­al­ised mes­sage con­tent will then be con­ver­ted into JSON for ease of use.

Just fol­low these steps:

Com­pile Pro­to­buf Schema

The struc­ture of Pro­to­buf-seri­al­ised data is defined in “.proto“ files, which con­tain the schema for the data mes­sages we want to deseri­al­ise, includ­ing their data types and struc­ture. These “.proto“ files are gen­er­ated on the pro­du­cer side (in our case, they were provided by Netcracker).

Before deseri­al­isa­tion, these files must be com­piled using the Google com­mand-line tool “pro­toc”. This tool can be down­loaded from the offi­cial Pro­to­buf Git­Hub repos­it­ory (file “protoc-xx.x‑win64.zip” for Win­dows).

When com­pil­ing, a para­meter must be spe­cified to indic­ate the tar­get pro­gram­ming lan­guage for the out­put: in our example we’ll use Python.

The com­pil­a­tion pro­cess will gen­er­ate a Python file (“.py”) for every “.proto“ file. These Python files will later be impor­ted into the Python con­sumer script to deseri­al­ise the messages. 

Mes­sage Consumption

This pro­cess will always be listen­ing for pro­duced, com­pressed and seri­al­ised Kafka messages.

The Kafka con­sumer can be imple­men­ted with any data integ­ra­tion tool or a cus­tom script. To do so, it’s import­ant to ensure that the “LZ4” lib­rary is installed in your con­sumer environment.

In our example, we’ll use a cus­tom Python script. Besides “LZ4”, the other lib­rar­ies that must be installed in the Python envir­on­ment before pro­ceed­ing are “kafka-python”, “google” and “pro­to­buf”.

In our script, the first step is to import the neces­sary mod­ules, includ­ing “Kafka­Con­sumer” to con­sume the mes­sages, the pre­vi­ously gen­er­ated “example_pb2.py” file (pro­duced dur­ing “.proto“ com­pil­a­tion), and “Mes­sageToJson” to con­vert the deseri­al­ised object into JSON:

Now it’s time to ini­tial­ise the con­sumer by defin­ing the neces­sary para­met­ers. Some typ­ical Kafka con­sumer con­fig­ur­a­tion para­met­ers include: 

  • Top­ics: the name of the topic to listen to. 
  • Boostrap serv­ers: the Kafka server hostname. 
  • Con­sumer group: the con­sumer group name. 
  • Auto off­set reset: “earli­est” will move to the old­est avail­able mes­sage, while “latest” (the default) will move to the most recent.
  • Off­set man­age­ment: spe­cifies when to com­mit off­sets, either after each record is read or after a batch is completed.

As a res­ult of this con­sumer code, we will have the value of every Kafka mes­sage, still seri­al­ised in Pro­to­buf format. Here’s an example:

Mes­sage Deserialisation

Once the Kafka mes­sage is con­sumed inside the loop, the key point is to deseri­al­ise it with Pro­to­buf into a “Per­son” object:

Then the deseri­al­ised object can be con­ver­ted into JSON, mak­ing it ready for fur­ther pro­cessing, per­sist­ence in a data­base table, or trans­form­a­tion into another struc­tured object!

This pro­cess pro­duces the fol­low­ing out­put for the example:

Con­clu­sions

In this post, we have demon­strated how to build a Kafka con­sumer in Python to handle mes­sages com­pressed using the LZ4 algorithm and seri­al­ised with Pro­to­buf. By deseri­al­ising these mes­sages and con­vert­ing them into JSON format, we make the data eas­ily access­ible for fur­ther use, whether it involves stor­ing it in a data­base, ana­lys­ing it, or integ­rat­ing it into other systems.

This approach high­lights the power and flex­ib­il­ity of Kafka and Pro­to­buf in hand­ling real-time data pipelines, espe­cially when deal­ing with high-through­put, com­plex data struc­tures. Using Python allows developers to cre­ate tailored solu­tions that fit spe­cific busi­ness require­ments, ensur­ing effi­cient data processing.

Are you look­ing to imple­ment sim­ilar real-time data solu­tions in your organ­isa­tion? At ClearPeaks, we spe­cial­ise in design­ing and deploy­ing data archi­tec­tures tailored to your unique needs. Whether it’s work­ing with Kafka, Pro­to­buf, or other mod­ern tools, our team of experts can help you unlock the full poten­tial of your data. Con­tact us today!