Kafka, Protobuf and Spring Boot gRPC
Protobuf provides an easier and more performant approach to serialising and deserialising data. These performance capabilities and the ability to provide a common schema for data transfer objects, coupled with a performant message brokering service such as Kafka seems to be a match made in heaven.
Introduction
In this blog I will provide an example of how to use Protobuf as a serialisation-deserialisation framework in Kafka in a Spring Boot project. As a bonus you’ll also use Spring Boot’s gRPC starter to expose a gRPC API to publish a message to a Kafka topic.
The full application code can be found here.
What is Protobuf and why should you use it?
Protobuf (or Protocol Buffers) is a serialisation-deserialisation (SerDer) framework developed with the purpose of being a
'language-neutral, platform-neutral, extensible mechanism for serializing structured data'.
The idea was to improve on XML in almost every way by providing a framework that is simpler, faster and smaller than XML.
By defining schema’s for messages and services in a .proto
file, which are then shared between services, these separate
services can use a common schema to generate native code that defines the messages and services from the .proto
file.
Protobuf’s efficiency can be seen in its workflow as well as it’s encoding. Protobuf serializes objects from the native objects in the programming language directly into an efficient binary encoding, which can then be transmitted over the wire to be similarly deserialized by the receiver directly into a native object. This is in stark contrast with XML that requires converting a native object into an XML string, which then needs to be converted into a binary encoded string to be transmitted over the wire and then similarly decoded into an XML string, and only thereafter be deserialized into a native object. It’s hereby obvious that XML is grossly inefficient and transmits a large amount of redundant information over the wire. JSON has a similar workflow to XML even though it is slightly less verbose and with a smaller payload.
What is Kafka?
Kafka is an open-source distributed event streaming platform aimed at being able to handle a high throughput of messages, being massively scalable by allowing horizontal scaling of message brokers by using partitions, providing permanent storage of messages, and on top of that being highly available. In short Kafka is an extremely powerful message brokering service which has been in the software development industry for many years and is trusted by a large amount of businesses.
What is gRPC
gRPC is a high performance framework for Remote Procedure Calls which allows programmers to call functions provided by remote applications as if these functions were local methods. gRPC uses HTTP under the hood for remote communication with remote servers and encodes the data payload with Protobuf. gRPC further improves on REST services by providing nifty communication functionalities.
gRPC services can be defined with the following communication methods:
-
Unary: single request from client and single result from server
-
Server streaming: single request from client, stream of data from server as a response
-
Client streaming: stream of data from client, single result from server
-
Bidirectional streaming: stream of data from client, stream of data from server as a response
Compared to REST’s single-request-single-response protocol, gRPC is more feature rich and is especially well suited towards a microservices architecture. In our example we will be keeping it simple and building a Unary gRPC service which in turn publishes a single message onto a Kafka topic.
Why combine Protobuf and Kafka?
Protobuf provides an easier and more performant approach to serialising and deserialising data. These performance capabilities and the ability to provide a common schema for data transfer objects, coupled with a performant message brokering service such as Kafka seems to be a match made in heaven.
What will we be building today?
In this blog we will be building a small application in Spring Boot that exposes a gRPC service for checking the current condition of Franz Kafka’s favourite protagonist: Gregor Samsa. In the famous novella The Metamorphosis written by Franz Kafka the protagonist wakes up one morning to find himself transformed into an insect. Throughout the novella different characters come to his bed chamber to check on his condition. We will emulate this in a crude way by publishing details of Gregor’s condition on a Kafka topic to be logged by the Spring Boot application whenever someone checks on Gregor’s condition by making a call to the gRPC service.
Implementation
Using Protobuf for Serialisation-Deserialisation
Maven Dependencies:
In your pom.xml
add the following dependencies:
<dependency>
<groupId>org.springframework.grpc</groupId>
<artifactId>spring-grpc-spring-boot-starter</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-services</artifactId>
<version>1.72.0</version>
</dependency>
<!-- Not required by protobuf or kafka -->
<!-- Added for making blog code more concise -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
The dependency spring-grpc-spring-boot-starter
is required to implement the gRPC service defined in the .proto
file.
The dependency grpc-services
is required to generate and use the Java classes generated by protobuf-maven-plugin
.
Configure protobuf-maven-plugin
Normally class files for the specific programming language are generated using protobuf
in the command line.
But because we are building a Spring Boot application it is easier and more convenient to use the protobuf-maven-plugin
to read our .proto
files and generate the appropriate Java class files.
To configure the protobuf-maven-plugin
add the following plugin configuration to your pom.xml
:
<properties>
...
<grpc.version>1.72.0</grpc.version>
<protobuf-java.version>4.30.2</protobuf-java.version>
<spring-grpc.version>0.8.0</spring-grpc.version>
</properties>
Add the dependency management for importing the preferred versions of other dependencies:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.grpc</groupId>
<artifactId>spring-grpc-dependencies</artifactId>
<version>${spring-grpc.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
And then finally configure the protobuf-maven-plugin
:
<plugin>
<groupId>io.github.ascopes</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>3.4.2</version>
<configuration>
<protocVersion>${protobuf-java.version}</protocVersion>
<binaryMavenPlugins>
<binaryMavenPlugin>
<groupId>io.grpc</groupId>
<artifactId>protoc-gen-grpc-java</artifactId>
<version>${grpc.version}</version>
<options>jakarta_omit,@generated=omit</options>
</binaryMavenPlugin>
</binaryMavenPlugins>
</configuration>
<executions>
<execution>
<id>generate</id>
<goals>
<goal>generate</goal>
</goals>
</execution>
</executions>
</plugin>
Define Protobuf message
To define Protobuf messages we need to create a schema in a .proto
file.
Create a new directory called main/protobuf
in your project and then define your Protobuf message.
In this case the .proto
file is called: gregor_samsa.proto
:
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.hugo.main.v1";
option java_outer_classname = "GregorSamsaProto";
enum Form {
FORM_UNSPECIFIED = 0;
HUMAN = 1;
INSECT = 2;
}
message GregorSamsa {
Form current_form = 1;
string disposition = 2; // e.g., "Anxious", "Confused", "Resigned"
bool is_room_locked = 3;
}
At the start of our protobuf file we define some options:
-
java_multiple_files
: if set totrue
the protobuf compile will separate the protobuf functionality into different files. -
java_package
: defines the target package/directory of the generated Java code. -
java_outer_classname
: specifies the name of the wrapper class in the generated Java code
The GregorSamsa
message holds information about the condition of our protagonist Gregor Samsa.
In The Metamorphosis Gregor woke up to find himself transformed into an insect.
To capture his bodily form we define a Protobuf enum Form
with the possible values of FORM_UNSPECIFIED
(it is good
practice in Protobuf to provide an 'unspecified' enum value), HUMAN
or INSECT
.
Next we’ll define his disposition
which is a string
and finally a boolean
to specify whether his room is locked or
not.
To generate the necessary Java classes you can run the following commands:
mvn clean install
# OR
mvn clean protobuf:generate
You should now be able to see the GregorSamsa
and other Protobuf helper classes in your target directory under
generated-sources/protobuf/com/hugo/main/v1
as defined by the java_package
in gregor_samsa.proto
:
ls -l target/generated-sources/protobuf/com/hugo/main/v1
total 72
-rw-r--r--@ 1 hugogreyvenstein staff 3491 Aug 25 16:53 Form.java
-rw-r--r--@ 1 hugogreyvenstein staff 21942 Aug 25 16:53 GregorSamsa.java
-rw-r--r--@ 1 hugogreyvenstein staff 1154 Aug 25 16:53 GregorSamsaOrBuilder.java
-rw-r--r--@ 1 hugogreyvenstein staff 2524 Aug 25 16:53 GregorSamsaProto.java
By running the following test you are able to validate that our Serialize-Deserialize for our gregor_samsa
message works
correctly:
@Test
public void testGregorSamsaSerDer() throws IOException {
GregorSamsa gregor = GregorSamsa.newBuilder()
.setCurrentForm(Form.HUMAN)
.setDisposition("Anxious")
.setIsRoomLocked(false)
.build();
ByteArrayOutputStream output = new ByteArrayOutputStream();
gregor.writeTo(output);
GregorSamsa deserialized = GregorSamsa.parseFrom(output.toByteArray());
assertEquals(gregor.getCurrentForm(), deserialized.getCurrentForm());
assertEquals(gregor.getDisposition(), deserialized.getDisposition());
assertEquals(gregor.getIsRoomLocked(), deserialized.getIsRoomLocked());
}
The above test creates a GregorSamsa
object form the generated Java classes and tests that the SerDer
functionality works correctly.
Spring gRPC Service
Next we’ll create a gRPC service to check on Gregor’s current condition.
This service will initially accept a request to check on Gregor and then return a response (later we will change this
behaviour to publish a message on a Kafka topic).
The request will contain a string
with the name visitor_name
specifying who the visitor is that will be checking
Gregor’s condition.
The response will contain a GregorSamsa
object showing the current condition of Gregor.
Our workflow will be to define the service in the .proto
file, generate the Java sources from our .proto
file, and
then implement the service using Spring Boot gRPC.
Add a gRPC service to gregor_samsa.proto
file
//...
service SamsaFamilyService {
rpc CheckOnGregor(CheckOnGregorRequest) returns (CheckOnGregorResponse);
}
message CheckOnGregorRequest {
string visitor_name = 1; // e.g., "Mother", "Father", "The Chief Clerk"
}
message CheckOnGregorResponse {
GregorSamsa gregor_state = 1;
}
Run mvn clean protobuf:generate
to generate the Java classes for our new service
Create a gRPC service implementation:
@GrpcService
@Slf4j
public class SamsaFamilyServiceImpl extends
SamsaFamilyServiceGrpc.SamsaFamilyServiceImplBase {
public void checkOnGregor(CheckOnGregorRequest request,
StreamObserver<CheckOnGregorResponse> responseObserver) {
log.info("{} is checking on Gregor", request.getVisitorName());
GregorSamsa gregorState = GregorSamsa.newBuilder()
.setCurrentForm(Form.HUMAN)
.setDisposition("Confused")
.setIsRoomLocked(true)
.build();
CheckOnGregorResponse response = CheckOnGregorResponse.newBuilder()
.setGregorState(gregorState)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
Protobuf generates a class called SamsaFamilyServiceGrpc
for our Protobuf service.
This class contains an abstract class called SamsaFamilyServiceImplBase
with all the rpc
methods that need to be
implemented.
In our case we only defined one: CheckOnGregor
.
The @GrpcService
annotation registers your class as a gRPC service.
Here I am using Lombok’s @Slf4j
annotation to inject a logger.
In the implementation of the class you see two parameters: CheckOnGregorRequest request
and StreamObserver<CheckOnGregorResponse> responseObserver
.
gRPC in Java uses stream observers to return responses to the client by using the onNext()
function on the response
object.
The amount of times the onNext()
method can be called is determined by the definition of the rpc
method in your
.proto
file.
In our case we defined our rpc
function to only return one value, but if we wanted to return more than one value i.e.
a stream of objects, we can add the stream
keyword to the rpc
definition as follows:
rpc CheckOnGregor(CheckOnGregorRequest) returns (stream CheckOnGregorResponse);
When we have successfully returned our response object we can call the onCompleted()
method to send a signal to the
observer/client to inform them that the request has been completed successfully without errors.
When something did go wrong however, gRPC provides the onError(Throwable t)
method on the response object to return an
exception to the client.
To start your Spring Boot service you can run the following command:
mvn spring-boot:run
With the default gRPC starter configuration your server should be started on port 9090.
To call your new gRPC service grpcurl
can be used:
grpcurl --plaintext -d '{ "visitor_name": "Mother"}' localhost:9090 SamsaFamilyService/CheckOnGregor
Output:
{
"gregor_state": {
"current_form": "HUMAN",
"disposition": "Confused",
"is_room_locked": true
}
}
Now that we have a gRPC server running. Let’s configure Kafka!
Kafka
Run Kafka using docker-compose
The easiest way to run Kafka is by using docker compose.
Note that we’re using a newer version of Kafka that does not require a separate Zookeeper instance to be running.
Create the following docker-compose.yml
file in your project:
services:
kafka_broker:
image: apache/kafka:latest
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 2
Start your Kafka service with the docker compose command:
docker-compose up
Your Kafka broker service should now be running in a docker container which exposes the port 9092.
Spring Boot Dependency
Add the Kafka Spring dependency to your pom.xml
:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Kafka Topic Configuration
The Kafka server URL will be used across your application to configure Kafka Producers
and Consumers, therefore we will be adding it to our application.properties
file.
In your application.properties
file add the following property:
spring.kafka.bootstrap-servers=localhost:9092
Then add the following configuration class to your Spring Boot application to configure your Kafka connection and create a topic:
@Configuration
public class KafkaTopicConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic gregorConditionTopic() {
return new NewTopic("gregorCondition", 1, (short) 1);
}
}
For simplicity, we only define the topic gregorCondition
with partition and replication values of 1
.
Feel free to experiment with these values depending on the needs of your application.
If you restart your Spring Boot application the topic gregorCondition
should be created.
Protobuf Serializer and Deserializer
In order for Kafka to serialize and deserialize our GregorSamsa
objects we need to provide a custom implementation of
the SerDer functionality.
Luckily this is extremely convenient and can be done in a few easy steps.
Serializer
import com.hugo.main.v1.GregorSamsa;
import org.apache.kafka.common.serialization.Serializer;
public class GregorSamsaSerializer implements Serializer<GregorSamsa> {
@Override
public byte[] serialize(String topic, GregorSamsa data) {
return data.toByteArray();
}
}
The GregorSamsaSerializer
class implements a serializer interface provided by Kafka which receives the topic and our
GregorSamsa
object as parameters.
Here you see how we can again use the toByteArray()
method for serialising the GregorSamsa
object using Protobuf.
Deserializer
Implementing the deserializer is similar to implementing the serializer:
import com.hugo.main.v1.GregorSamsa;
import lombok.SneakyThrows;
import org.apache.kafka.common.serialization.Deserializer;
public class GregorSamsaDeserializer implements Deserializer<GregorSamsa> {
@SneakyThrows
@Override
public GregorSamsa deserialize(String topic, byte[] data) {
return GregorSamsa.parseFrom(data);
}
}
Here I am using Lombok’s @SneakyThrows
for convenience’s sake, but this should always be used with caution in production
code.
The above deserialize
method receives the topic and an encoded byte array.
Here you can see that we use the static GregorSamsa.parseFrom()
method to deserialize the byte array into a
GregoreSamsa
object.
Consumer
In order to read messages published on a topic we need to configure a consumer.
We’ll create a configuration class telling Spring how to create consumers that are annotated with @KafkaListener
and
connect them to a Kafka topic.
Configure Consumer
Add the following configuration class to your project:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "gregor-group");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
GregorSamsaDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configs);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
An important step in the configuration is to pass in our Protobuf deserializer by using the ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
configuration key.
This configuration property tells Spring how to deserialize byte array values received from Kafka.
Spring intercepts the value received from Kafka, passes it to the GregorSamsaDeserializer
then passes the deserialized
object to our listener method defined in the next section using the @KafkaListener
annotation.
Create Consumer Service
To consume a message a listener needs to be defined with the @KafkaListener
annotation.
The following Java class processes a message from the gregorCondition
topic and logs the condition of Gregor to the log
output of our Spring Boot application:
@Component
@Slf4j
public class GregorConditionLogger {
@KafkaListener(topics = "gregorCondition", groupId = "gregor-log")
public void listenOnGregorCondition(GregorSamsa gregorSamsa) {
log.info("Gregor's current condition is: \n{}", gregorSamsa);
}
}
Producer
Similar to a consumer we need to define a configuration for the producer factory to create Kafka producers.
Configure Producer
Add the following Java class to your application:
@Configuration
public class KafkaProducerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, GregorSamsa> producerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GregorSamsaSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
@Bean
public KafkaTemplate<String, GregorSamsa> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Similarly to the consumer we need to configure our custom serializer, but this time using the
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
configuration property.
Create Producer Service
Now that we have defined the configuration for the producer it’s time to put a message on the queue.
Let’s create a service that uses the KafkaTemplate
to publish a message on the gregorCondition
topic:
@Component
@AllArgsConstructor
public class GregorConditionProducer {
private final KafkaTemplate<String, GregorSamsa> kafkaTemplate;
public CompletableFuture<SendResult<String, GregorSamsa>> sendToGregorCondition(GregorSamsa gregorSamsa) {
return kafkaTemplate.send("gregorCondition", gregorSamsa);
}
}
Here we use the @AllArgsConstructor
provided by Lombok to generate a constructor which is in turn used by Spring Boot to
inject the kafkaTemplate
object.
In our sendGregorCondition
method we use this kafkaTemplate
object to publish a message on the gregorCondition
topic.
The Spring Boot’s Kafka framework returns a CompletableFuture
with a SendResult
wrapper object.
The SendResult
object contains metadata pertaining to the specific message publication as well as the
ProducerRecord
which contains, among others, a key-value pair that was sent to a topic.
Now let’s adapt our gRPC service to also publish the condition of Gregor Samsa on the gregorCondition
topic to in turn
be read by the GregorConditionLogger
consumer:
public void checkOnGregor(CheckOnGregorRequest request, StreamObserver<CheckOnGregorResponse> responseObserver) {
log.info("{} is checking on Gregor", request.getVisitorName());
GregorSamsa gregorState = GregorSamsa.newBuilder()
.setCurrentForm(Form.HUMAN)
.setDisposition("Confused")
.setIsRoomLocked(true)
.build();
CompletableFuture<SendResult<String, GregorSamsa>> sendResultCompletableFuture = producer.sendToGregorCondition(gregorState);
sendResultCompletableFuture.whenComplete((r, e) -> {
if (e != null) {
log.error("Error when sending GregorCondition result", e);
}
log.info("{} sending GregorCondition result:\n{}", request.getVisitorName(), r.getProducerRecord().value());
});
CheckOnGregorResponse response = CheckOnGregorResponse.newBuilder()
.setGregorState(gregorState)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
Our sendGregorCondition
method returns a CompletableFuture
which can be used to do error checking or perform some
action after publishing.
In our case we merely log the returned result.
When you make the gRPC call using:
grpcurl --plaintext -d '{ "visitor_name": "Father"}' localhost:9090 SamsaFamilyService/CheckOnGregor
You should be able to see two different logs in your Spring Boot logs:
-
The consumer that observes the
gregorSamsa
topic and logs the published object:
c.e.k.kafka.GregorConditionLogger : Gregor's current condition is:
current_form: HUMAN
disposition: "Confused"
is_room_locked: true
-
The logging for when the publishing of the
gregorSamsa
object to the queue has been completed:
c.e.k.grpc.SamsaFamilyServiceImpl : Father sending GregorCondition result:
current_form: HUMAN
disposition: "Confused"
is_room_locked: true
Conclusion
In this blog we saw how we can use Protobuf for serialisation and deserialisation of objects when publishing a message on
a Kafka topic.
We defined a Protobuf message in a .proto
file and generated the Java classes by making use of the
protobuf-maven-plugin
.
By using Spring Boot’s gRPC starter we were able to create a gRPC service by defining a service in the .proto
file and
then implementing that service in SamsaFamilyServiceImpl
.
For ease of use we also defined a docker-compose.yml
file to start up the Kafka service.
The full application code can be found here.