Configure and Use Apache Kafka with Spring Boot
Kafka with a spring boot
Introduction
Apache Kafka is an open-source distributed streaming platform that handles large-scale, high-throughput, fault-tolerant real-time data streaming. The system is based on a publish-subscribe model where producers publish messages to topics, and consumers subscribe to those topics to consume the messages. Apache Kafka use case is seen in scenarios like real-time analytics, event-driven architectures, log aggregation, messaging systems, and building scalable data pipelines.
Usage of this doc:
You can use this doc to configure and use Apache Kafka to produce and consume messages(JSON content) in your Spring boot Application.
Prerequisites
You need a good understanding of Java, spring-boot, Apache Kafka, maven or Gradle before going through this doc else will suggest you check their official documentation and guide.
Installation
Before producing messages to the Kafka topics, you must complete a few steps to configure the required dependencies in your pom.xml(Maven) or build.gradle(Gradle project)
For Maven:
For Gradle
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
For Gradle:
implementation ‘org.springframework.boot:spring-boot-starter’
implementation ‘org.springframework.kafka:spring-kafka’
Now What?
Configuration
Configuring Kafka properties to the Application.properties file, you can have your custom properties also. Specify the Kafka server’s bootstrap servers and any additional configuration properties you need, such as the consumer group ID.
Application.properties
spring.kafka.bootstrap-servers=<kafka-bootstrap-servers>
spring.kafka.consumer.group-id=<consumer-group-id>
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.app.topic=<topic-name>
For example:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=group_category
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.app.topic=my-topic
Kafka Producer:
A Kafka producer is a component that sends messages to Kafka topics. It publishes data to Kafka, which one or more Kafka consumers can then consume.
To create a Kafka producer, you need to perform the following steps:
- Configure the Kafka producer properties: Set up the necessary configuration properties for the Kafka producer, such as the bootstrap servers (addresses of Kafka brokers) and serialization settings. This we have already performed in the last step.
- Create a Kafka producer: To send messages to Kafka; you can use the template provided by Spring Kafka. Here's an example of a simple producer:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component
public class KafkaEventProducer {
private static final Logger logger = LoggerFactory.getLogger(KafkaEventProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
logger.info("Producing message [{}]", message);
kafkaTemplate.send(topic, message);
}
}
Kafka Consumer
- Configure the Kafka producer properties: Configure Kafka properties in your application.properties file. Specify the Kafka server's bootstrap servers and any additional configuration properties you need for the consumer.
- Create a Kafka consumer listener: Implement a method in your application that will be called whenever a new message is received from Kafka. Use the kafkaListener annotation provided by Spring Kafka.
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component
public class KafkaEventConsumer {
private static final Logger logger = LoggerFactory.getLogger(KafkaEventConsumer.class);
@KafkaListener(topics = “<topic-name>”, groupId = “<consumer-group-id>”)
public void consumeMessage(String message) {
//Use log
log.info("Consumed message [{}]", message);
}
}
Make sure to replace, <consumer-group-id>
and <topic-name>
with your actual Kafka server, consumer group, and topic details.
@KafkaListener(topics = “${spring.kafka.app.topic}”,groupId= “${spring.kafka.consumer.group-id}”)
Send Logs to Kafka Producer
Send the message to Kafka: Use the sendMessage()
method of the Kafka producer to publish the message to a specific topic. The producer will handle the message internally, including serialization, partitioning, and sending it to the appropriate Kafka broker.
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MainApplication {
@Autowired
private KafkaEventProducer kafkaEventProducer;
public static void main(String[] args) {
SpringApplication.run(MainApplication.class, args);
}
public void YourMethod() {
// Send a message using the Kafka producer
kafkaEventProducer.sendMessage(“<topic-name>”, “Oh Kafka Boy How'z Everything?”);
}
}
When a new message is available in the specified Kafka topic, the corresponding listener method will be invoked to process the message.
Conclusion
In conclusion, Spring Boot provides excellent support for integrating Apache Kafka into your applications. With Spring Boot’s Kafka support, you can easily create Kafka producers and consumers, configure Kafka properties, and handle message processing.
Spring Boot’s integration with Kafka simplifies the development of Kafka-based applications, providing a higher level of abstraction and reducing the amount of boilerplate code required.
And before running the code, please ensure that the Kafka server is running and the topics have been created.
Note: Remember to refer to the official Spring Kafka documentation and explore the Spring Kafka samples to better understand the various features and options available for Kafka integration in Spring Boot.
Thank you!