INTRODUCTION
Apache Kafka is a distributed event streaming platform that has revolutionized how modern applications handle data. Imagine you are running a large online store. Every second, customers are browsing products, adding items to their carts, making purchases, and leaving reviews. Traditional databases struggle when you need to process these events in real-time and share them across multiple systems simultaneously. This is where Kafka shines.
Think of Kafka as a highly efficient postal service for your application's data. Instead of sending letters directly from one person to another, you drop your letters at a central post office. The post office sorts them, stores them temporarily, and delivers them to the right recipients. Similarly, Kafka sits between your data producers and consumers, accepting messages from various sources and delivering them to interested parties.
The beauty of Kafka lies in its ability to handle massive volumes of data with minimal latency. Companies like LinkedIn, Netflix, and Uber rely on Kafka to process trillions of events daily. But you do not need to be a tech giant to benefit from Kafka. Even small applications can leverage its power for tasks like logging, metrics collection, user activity tracking, and microservice communication.
THE FUNDAMENTAL BUILDING BLOCKS OF KAFKA
Before we dive into code, we need to understand the core concepts that make Kafka work. These concepts form the foundation of everything you will do with Kafka.
A topic is the fundamental unit of organization in Kafka. Think of a topic as a category or feed name to which records are published. If you are building an e-commerce system, you might have topics named "customer-orders", "inventory-updates", and "payment-transactions". Each topic is a log of events that grows continuously as new events arrive.
Producers are applications that publish data to Kafka topics. In our e-commerce example, your order processing service would be a producer that sends order details to the "customer-orders" topic whenever a customer completes a purchase.
Consumers are applications that read data from Kafka topics. Your warehouse management system might be a consumer that reads from the "customer-orders" topic to prepare shipments. Your analytics system might also consume the same topic to generate sales reports. Multiple consumers can read from the same topic independently without interfering with each other.
Partitions are the way Kafka achieves scalability and parallelism. Each topic is divided into one or more partitions. Think of partitions as separate lanes on a highway. Just as multiple lanes allow more cars to travel simultaneously, multiple partitions allow Kafka to handle more data in parallel. Each partition is an ordered, immutable sequence of records that is continually appended to.
Brokers are the Kafka servers that store data and serve clients. A Kafka cluster typically consists of multiple brokers working together. Each broker handles a subset of the partitions for each topic. This distribution provides fault tolerance because if one broker fails, others can take over its responsibilities.
Consumer groups enable parallel consumption of data. When multiple consumers belong to the same consumer group, Kafka automatically distributes the partitions among them. This means each partition is consumed by exactly one consumer in the group, allowing you to scale your data processing horizontally.
Offsets are sequential identifiers assigned to each record within a partition. Kafka uses offsets to track which records a consumer has already processed. This mechanism allows consumers to resume from where they left off if they crash or restart.
SETTING UP YOUR KAFKA ENVIRONMENT
To start working with Kafka, you need to set up a Kafka cluster. For learning purposes, we will set up a single-node cluster on your local machine. In production, you would run multiple brokers across different servers for high availability.
First, download Apache Kafka from the official website. Kafka requires Java to run, so ensure you have Java 8 or higher installed. Extract the downloaded archive to a directory of your choice. The Kafka distribution includes ZooKeeper, which Kafka uses for cluster coordination and metadata management.
Navigate to your Kafka directory and start ZooKeeper first. ZooKeeper maintains configuration information and provides distributed synchronization for Kafka. Open a terminal and run the following command:
bin/zookeeper-server-start.sh config/zookeeper.properties
On Windows, use the batch file instead:
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
You should see log messages indicating that ZooKeeper has started successfully. Keep this terminal window open because ZooKeeper must remain running.
Now open a second terminal window and start the Kafka broker:
bin/kafka-server-start.sh config/server.properties
On Windows:
bin\windows\kafka-server-start.bat config\server.properties
The broker will start and connect to ZooKeeper. You will see numerous log messages as the broker initializes. Once you see messages indicating the broker is ready, your Kafka cluster is running and ready to accept connections.
CREATING YOUR FIRST TOPIC
With Kafka running, let us create a topic. We will create a topic called "user-events" that will store events related to user activities in our application. Open a third terminal window and run:
bin/kafka-topics.sh --create --topic user-events --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
On Windows:
bin\windows\kafka-topics.bat --create --topic user-events --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
Let us break down what each parameter means. The topic parameter specifies the name of our topic. The bootstrap-server parameter tells Kafka where to find the cluster. The partitions parameter sets how many partitions this topic should have. We chose three partitions to allow parallel processing. The replication-factor parameter determines how many copies of each partition Kafka maintains. We set it to one because we only have one broker, but in production, you would typically use three or more for fault tolerance.
You can verify that your topic was created successfully by listing all topics:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
You should see "user-events" in the output. To get detailed information about your topic, run:
bin/kafka-topics.sh --describe --topic user-events --bootstrap-server localhost:9092
This command shows you the partition count, replication factor, and which broker is leading each partition.
PRODUCING YOUR FIRST MESSAGES
Now that we have a topic, let us send some messages to it. Kafka provides a command-line producer for testing, but in real applications, you will use a programming language client library. Let us start with the command-line tool to understand the basics:
bin/kafka-console-producer.sh --topic user-events --bootstrap-server localhost:9092
On Windows:
bin\windows\kafka-console-producer.bat --topic user-events --bootstrap-server localhost:9092
This opens an interactive prompt where you can type messages. Each line you type becomes a separate message sent to the topic.
Try typing a few messages:
User john logged in
User sarah viewed product 12345
User john added product 67890 to cart
Press Ctrl+C to exit the producer when you are done.
Each message you typed was sent to one of the three partitions in the user-events topic. Kafka uses a partitioning strategy to determine which partition receives each message. By default, if you do not specify a key, Kafka distributes messages in a round-robin fashion across partitions.
CONSUMING YOUR FIRST MESSAGES
Now let us read the messages we just produced. Kafka provides a command-line consumer for this purpose:
bin/kafka-console-consumer.sh --topic user-events --from-beginning --bootstrap-server localhost:9092
On Windows:
bin\windows\kafka-console-consumer.bat --topic user-events --from-beginning --bootstrap-server localhost:9092
The from-beginning flag tells the consumer to read all messages from the start of the topic. Without this flag, the consumer would only read new messages that arrive after it starts.
You should see the three messages you produced earlier. Notice that they might not appear in the exact order you sent them. This is because we have three partitions, and the consumer reads from all partitions concurrently. Within each partition, messages maintain their order, but across partitions, there is no global ordering guarantee.
Press Ctrl+C to stop the consumer.
BUILDING A JAVA PRODUCER APPLICATION
Now let us build a real producer application using Java. We will create a producer that sends user activity events to our Kafka topic. First, you need to add the Kafka client library to your project. If you are using Maven, add this dependency to your pom.xml:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
If you are using Gradle, add this to your build.gradle:
implementation 'org.apache.kafka:kafka-clients:3.6.0'
Now let us create a simple producer. Here is a basic example:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
// Configure the producer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Create the producer
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Send a message
ProducerRecord<String, String> record = new ProducerRecord<>("user-events", "user123", "User logged in");
producer.send(record);
// Close the producer
producer.close();
}
}
Let us examine this code carefully. The Properties object holds configuration settings for the producer. The BOOTSTRAP_SERVERS_CONFIG tells the producer where to find the Kafka cluster. You only need to specify one or two brokers here because the producer will automatically discover the other brokers in the cluster.
The KEY_SERIALIZER_CLASS_CONFIG and VALUE_SERIALIZER_CLASS_CONFIG specify how to convert your keys and values into bytes. Kafka stores everything as byte arrays, so you need serializers to convert your Java objects into bytes. We are using StringSerializer because we are sending strings, but Kafka provides serializers for other types, and you can create custom serializers for complex objects.
The KafkaProducer is the main class you use to send messages. It is thread-safe, so you can share a single producer instance across multiple threads in your application. Creating a producer is relatively expensive, so you should create one instance and reuse it rather than creating a new producer for each message.
The ProducerRecord represents a message you want to send. It contains the topic name, an optional key, and the value. The key serves two important purposes. First, it determines which partition the message goes to. Messages with the same key always go to the same partition, which guarantees ordering for messages with the same key. Second, the key can carry business meaning, like a user ID or order ID.
The send method is asynchronous. It returns immediately without waiting for the broker to acknowledge receipt of the message. This allows high throughput because the producer can send many messages in parallel. However, this means you do not know if the send succeeded unless you check the result.
The close method flushes any pending messages and releases resources. Always call close when you are done with the producer, preferably in a try-with-resources block or a finally clause.
HANDLING PRODUCER CALLBACKS AND ERRORS
The simple producer we just created does not check whether messages were successfully sent. In production code, you should always handle potential errors.
Let us improve our producer to handle callbacks:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ImprovedProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, String> record = new ProducerRecord<>("user-events", "user123", "User logged in");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("Error sending message: " + exception.getMessage());
} else {
System.out.println("Message sent successfully to partition " + metadata.partition() + " at offset " + metadata.offset());
}
}
});
// Alternatively, you can use a lambda expression
ProducerRecord<String, String> record2 = new ProducerRecord<>("user-events", "user456", "User viewed product");
producer.send(record2, (metadata, exception) -> {
if (exception != null) {
System.err.println("Error sending message: " + exception.getMessage());
} else {
System.out.println("Message sent to partition " + metadata.partition() + " at offset " + metadata.offset());
}
});
}
}
}
We have added several important configuration properties. The ACKS_CONFIG setting controls how many broker acknowledgments the producer requires before considering a send successful. Setting it to "all" means the leader broker and all in-sync replicas must acknowledge the message. This provides the strongest durability guarantee but reduces throughput slightly.
The RETRIES_CONFIG setting tells the producer how many times to retry sending a message if it fails. Transient errors like network glitches often resolve themselves, so retrying can improve reliability.
The MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION setting limits how many unacknowledged requests the producer can have outstanding. Setting it to one ensures that if retries occur, messages are not reordered. This is important when message ordering matters.
We are now using a try-with-resources block to ensure the producer is properly closed even if an exception occurs. This is a best practice in Java for managing resources.
The Callback interface allows you to specify code that runs when the send completes. The onCompletion method receives two parameters. The RecordMetadata contains information about where the message was stored, including the partition number and offset. The Exception parameter is null if the send succeeded or contains error information if it failed.
Using callbacks is the recommended approach for production code because it allows you to handle errors appropriately while maintaining high throughput. You can log errors, retry failed messages, or alert monitoring systems.
BUILDING A JAVA CONSUMER APPLICATION
Now let us create a consumer application to read messages from our topic. Consumers are more complex than producers because they need to manage offsets, handle rebalancing, and coordinate with other consumers in the same group.
Here is a basic consumer:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "user-events-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user-events"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
} finally {
consumer.close();
}
}
}
Let us examine the consumer configuration. The GROUP_ID_CONFIG is crucial because it determines how Kafka coordinates multiple consumers. All consumers with the same group ID work together to consume the topic's partitions. Kafka ensures that each partition is consumed by only one consumer in the group.
The KEY_DESERIALIZER_CLASS_CONFIG and VALUE_DESERIALIZER_CLASS_CONFIG are the opposite of the serializers we used in the producer. They convert bytes back into Java objects.
The AUTO_OFFSET_RESET_CONFIG determines what happens when the consumer group has no committed offset for a partition. Setting it to "earliest" means the consumer starts reading from the beginning of the partition. Setting it to "latest" would make it start reading only new messages that arrive after the consumer starts.
The subscribe method tells the consumer which topics to read from. You can subscribe to multiple topics by passing a list of topic names. Kafka automatically assigns partitions to this consumer based on the number of partitions and the number of consumers in the group.
The poll method is the heart of the consumer. It fetches messages from Kafka and returns them as a ConsumerRecords object. The Duration parameter specifies how long to wait if no messages are available. Calling poll regularly is essential because it also handles important background tasks like sending heartbeats to the broker and participating in rebalancing.
The ConsumerRecords object is iterable and contains zero or more ConsumerRecord objects. Each ConsumerRecord represents a single message and includes the key, value, partition, offset, and timestamp.
The consumer runs in an infinite loop, continuously polling for new messages. In a real application, you would add logic to gracefully shut down the consumer when needed.
UNDERSTANDING CONSUMER GROUPS AND PARTITION ASSIGNMENT
Consumer groups are one of Kafka's most powerful features. They enable you to scale your message processing horizontally by adding more consumers. Let us explore how this works.
Imagine you have a topic with three partitions and one consumer in a consumer group. That single consumer receives messages from all three partitions. Now suppose you add a second consumer to the same group. Kafka automatically rebalances the partition assignments so that each consumer handles some partitions. With two consumers, one might handle two partitions while the other handles one. If you add a third consumer, each consumer gets exactly one partition.
What happens if you add a fourth consumer to a group consuming a topic with three partitions? The fourth consumer remains idle because there are no more partitions to assign. This illustrates an important principle: the maximum parallelism you can achieve with a consumer group equals the number of partitions in the topic.
Rebalancing is the process by which Kafka redistributes partitions among consumers when the group membership changes. Rebalancing occurs when a consumer joins the group, leaves the group, or fails to send heartbeats in time. During rebalancing, no messages are consumed, so you want to minimize rebalancing frequency.
Let us create a more sophisticated consumer that handles rebalancing gracefully:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class RebalanceAwareConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "user-events-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user-events"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Partitions revoked: " + partitions);
// Commit offsets before partitions are reassigned
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Partitions assigned: " + partitions);
}
});
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Processing: key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
// Process the message here
}
consumer.commitSync();
}
} finally {
consumer.close();
}
}
}
We have disabled automatic offset commits by setting ENABLE_AUTO_COMMIT_CONFIG to false. This gives us precise control over when offsets are committed. Committing offsets tells Kafka which messages we have successfully processed. If our consumer crashes and restarts, it will resume from the last committed offset.
The ConsumerRebalanceListener interface provides two callback methods that are invoked during rebalancing. The onPartitionsRevoked method is called before partitions are taken away from this consumer. This is your opportunity to commit offsets for the partitions you are about to lose. The onPartitionsAssigned method is called after new partitions are assigned to this consumer. You can use this to initialize any state needed for processing the new partitions.
We call commitSync after processing each batch of records. This is a synchronous operation that blocks until Kafka acknowledges the commit. There is also a commitAsync method that does not block, which can provide better throughput but requires more careful error handling.
WORKING WITH MESSAGE KEYS AND PARTITIONING
Message keys play a crucial role in Kafka's partitioning strategy. When you send a message with a key, Kafka uses a hash of the key to determine which partition receives the message. This ensures that all messages with the same key go to the same partition, which guarantees ordering for those messages.
Let us create a producer that demonstrates key-based partitioning:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KeyedProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
// Send multiple messages for the same user
String userId = "user123";
ProducerRecord<String, String> record1 = new ProducerRecord<>("user-events", userId, "User logged in");
producer.send(record1, (metadata, exception) -> {
if (exception == null) {
System.out.println("Message 1 sent to partition " + metadata.partition());
}
});
ProducerRecord<String, String> record2 = new ProducerRecord<>("user-events", userId, "User viewed product 123");
producer.send(record2, (metadata, exception) -> {
if (exception == null) {
System.out.println("Message 2 sent to partition " + metadata.partition());
}
});
ProducerRecord<String, String> record3 = new ProducerRecord<>("user-events", userId, "User added item to cart");
producer.send(record3, (metadata, exception) -> {
if (exception == null) {
System.out.println("Message 3 sent to partition " + metadata.partition());
}
});
// Send messages for a different user
String userId2 = "user456";
ProducerRecord<String, String> record4 = new ProducerRecord<>("user-events", userId2, "User logged in");
producer.send(record4, (metadata, exception) -> {
if (exception == null) {
System.out.println("Message 4 sent to partition " + metadata.partition());
}
});
producer.flush();
}
}
}
When you run this code, you will notice that all messages for user123 go to the same partition, and all messages for user456 go to the same partition, though user456's messages might go to a different partition than user123's messages. This behavior is crucial for maintaining order within a key while allowing parallelism across different keys.
If you need custom partitioning logic beyond the default hash-based approach, you can implement a custom partitioner:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// Get the number of partitions for this topic
int numPartitions = cluster.partitionCountForTopic(topic);
// Custom logic: if the key starts with "premium", send to partition 0
if (key != null && key.toString().startsWith("premium")) {
return 0;
}
// Otherwise, use default hash-based partitioning
return Math.abs(key.hashCode()) % numPartitions;
}
@Override
public void close() {
// Cleanup resources if needed
}
@Override
public void configure(Map<String, ?> configs) {
// Initialize with configuration if needed
}
}
To use this custom partitioner, add it to your producer configuration:
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
SERIALIZATION AND DESERIALIZATION WITH COMPLEX OBJECTS
So far, we have been working with simple string messages. In real applications, you often need to send complex objects. Kafka supports custom serializers and deserializers for any data type. Let us create a custom object and serialize it using JSON.
First, add the Jackson library to your project for JSON serialization. In Maven:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
Now let us create a UserEvent class:
import com.fasterxml.jackson.annotation.JsonProperty;
public class UserEvent {
@JsonProperty("userId")
private String userId;
@JsonProperty("eventType")
private String eventType;
@JsonProperty("timestamp")
private long timestamp;
@JsonProperty("details")
private String details;
// Default constructor for Jackson
public UserEvent() {
}
public UserEvent(String userId, String eventType, long timestamp, String details) {
this.userId = userId;
this.eventType = eventType;
this.timestamp = timestamp;
this.details = details;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getEventType() {
return eventType;
}
public void setEventType(String eventType) {
this.eventType = eventType;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public String getDetails() {
return details;
}
public void setDetails(String details) {
this.details = details;
}
@Override
public String toString() {
return "UserEvent{" +
"userId='" + userId + '\'' +
", eventType='" + eventType + '\'' +
", timestamp=" + timestamp +
", details='" + details + '\'' +
'}';
}
}
Now let us create a custom serializer for this class:
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
public class UserEventSerializer implements Serializer<UserEvent> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// No configuration needed
}
@Override
public byte[] serialize(String topic, UserEvent data) {
if (data == null) {
return null;
}
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing UserEvent", e);
}
}
@Override
public void close() {
// No cleanup needed
}
}
And a corresponding deserializer:
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
public class UserEventDeserializer implements Deserializer<UserEvent> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// No configuration needed
}
@Override
public UserEvent deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
try {
return objectMapper.readValue(data, UserEvent.class);
} catch (Exception e) {
throw new SerializationException("Error deserializing UserEvent", e);
}
}
@Override
public void close() {
// No cleanup needed
}
}
Now we can create a producer that sends UserEvent objects:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class UserEventProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserEventSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
try (KafkaProducer<String, UserEvent> producer = new KafkaProducer<>(props)) {
UserEvent event = new UserEvent(
"user123",
"LOGIN",
System.currentTimeMillis(),
"User logged in from mobile app"
);
ProducerRecord<String, UserEvent> record = new ProducerRecord<>("user-events", event.getUserId(), event);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Error sending event: " + exception.getMessage());
} else {
System.out.println("Event sent to partition " + metadata.partition() + " at offset " + metadata.offset());
}
});
}
}
}
And a consumer that reads UserEvent objects:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class UserEventConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "user-events-processor");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserEventDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
try (KafkaConsumer<String, UserEvent> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("user-events"));
while (true) {
ConsumerRecords<String, UserEvent> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, UserEvent> record : records) {
UserEvent event = record.value();
System.out.println("Received event: " + event);
System.out.println(" User: " + event.getUserId());
System.out.println(" Type: " + event.getEventType());
System.out.println(" Time: " + event.getTimestamp());
System.out.println(" Details: " + event.getDetails());
}
}
}
}
}
HANDLING ERRORS AND IMPLEMENTING RETRY LOGIC
Error handling is critical in production Kafka applications. Network issues, broker failures, and processing errors can all occur. Let us create a robust consumer with proper error handling and retry logic:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
public class RobustConsumer {
private final AtomicBoolean closed = new AtomicBoolean(false);
private KafkaConsumer<String, String> consumer;
public void run() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "robust-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
consumer = new KafkaConsumer<>(props);
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
try {
consumer.subscribe(Collections.singletonList("user-events"));
while (!closed.get()) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record);
} catch (Exception e) {
System.err.println("Error processing record at offset " + record.offset() + ": " + e.getMessage());
// Decide whether to continue or stop based on error type
handleProcessingError(record, e);
}
}
// Commit offsets after successful processing
try {
consumer.commitSync();
} catch (CommitFailedException e) {
System.err.println("Commit failed: " + e.getMessage());
// Rebalancing occurred, offsets already committed by another consumer
}
} catch (WakeupException e) {
// Ignore for shutdown
if (!closed.get()) {
throw e;
}
} catch (Exception e) {
System.err.println("Unexpected error in consumer loop: " + e.getMessage());
e.printStackTrace();
}
}
} finally {
try {
consumer.commitSync();
} catch (Exception e) {
System.err.println("Error committing final offsets: " + e.getMessage());
}
consumer.close();
}
}
private void processRecord(ConsumerRecord<String, String> record) {
// Simulate processing that might fail
System.out.println("Processing: " + record.value());
// Your business logic here
if (record.value().contains("error")) {
throw new RuntimeException("Simulated processing error");
}
}
private void handleProcessingError(ConsumerRecord<String, String> record, Exception e) {
// Log the error
System.err.println("Failed to process message: " + record.value());
// Send to dead letter queue or retry topic
sendToDeadLetterQueue(record, e);
}
private void sendToDeadLetterQueue(ConsumerRecord<String, String> record, Exception e) {
// In a real application, you would send failed messages to a separate topic
System.err.println("Sending to DLQ: " + record.value() + " - Error: " + e.getMessage());
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
public static void main(String[] args) {
RobustConsumer consumer = new RobustConsumer();
consumer.run();
}
}
This consumer demonstrates several important error handling patterns. We use an AtomicBoolean to track whether the consumer should shut down. The shutdown hook ensures graceful shutdown when the application terminates. We catch WakeupException separately because it is used for controlled shutdown. We commit offsets only after successfully processing all records in a batch, ensuring at-least-once delivery semantics.
The handleProcessingError method shows where you would implement retry logic or send failed messages to a dead letter queue. A dead letter queue is a separate Kafka topic where you send messages that cannot be processed after multiple retries.
IMPLEMENTING EXACTLY-ONCE SEMANTICS
Kafka supports exactly-once semantics, which guarantees that each message is processed exactly once even in the presence of failures. This requires careful coordination between producers and consumers. Let us implement a transactional producer:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.UUID;
public class TransactionalProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "user-events-producer-" + UUID.randomUUID());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
// Initialize transactions
producer.initTransactions();
// Begin transaction
producer.beginTransaction();
try {
// Send multiple messages as part of a transaction
ProducerRecord<String, String> record1 = new ProducerRecord<>("user-events", "user123", "Event 1");
producer.send(record1);
ProducerRecord<String, String> record2 = new ProducerRecord<>("user-events", "user123", "Event 2");
producer.send(record2);
ProducerRecord<String, String> record3 = new ProducerRecord<>("user-events", "user123", "Event 3");
producer.send(record3);
// Commit transaction
producer.commitTransaction();
System.out.println("Transaction committed successfully");
} catch (Exception e) {
// Abort transaction on error
producer.abortTransaction();
System.err.println("Transaction aborted: " + e.getMessage());
}
} finally {
producer.close();
}
}
}
The key configuration here is ENABLE_IDEMPOTENCE_CONFIG, which ensures that duplicate messages are not written to Kafka even if the producer retries. The TRANSACTIONAL_ID_CONFIG uniquely identifies this producer instance for transaction management.
When using transactions, you must call initTransactions before sending any messages. Then you wrap your send operations in beginTransaction and commitTransaction calls. If any error occurs, you call abortTransaction to roll back all messages in the transaction.
For consumers to benefit from exactly-once semantics, they must be configured to read only committed messages:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class TransactionalConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "transactional-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("user-events"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received: " + record.value());
}
consumer.commitSync();
}
}
}
}
The ISOLATION_LEVEL_CONFIG set to "read_committed" ensures that this consumer only sees messages from committed transactions. Messages from aborted transactions are filtered out automatically.
MONITORING AND METRICS
Production Kafka applications need comprehensive monitoring. Kafka exposes numerous metrics through JMX that you can collect and visualize. Let us create a producer that exposes custom metrics:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
public class MonitoredProducer {
private final AtomicLong messagesSent = new AtomicLong(0);
private final AtomicLong messagesFailedToSend = new AtomicLong(0);
private final AtomicLong totalLatency = new AtomicLong(0);
public void run() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
// Start metrics reporting thread
Thread metricsThread = new Thread(() -> reportMetrics(producer));
metricsThread.setDaemon(true);
metricsThread.start();
// Send messages
for (int i = 0; i < 100; i++) {
long startTime = System.currentTimeMillis();
ProducerRecord<String, String> record = new ProducerRecord<>("user-events", "key" + i, "Message " + i);
producer.send(record, (metadata, exception) -> {
long latency = System.currentTimeMillis() - startTime;
totalLatency.addAndGet(latency);
if (exception != null) {
messagesFailedToSend.incrementAndGet();
System.err.println("Failed to send message: " + exception.getMessage());
} else {
messagesSent.incrementAndGet();
}
});
// Simulate some delay
Thread.sleep(100);
}
producer.flush();
// Wait a bit for final metrics
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void reportMetrics(KafkaProducer<String, String> producer) {
while (true) {
try {
Thread.sleep(5000);
// Report custom metrics
long sent = messagesSent.get();
long failed = messagesFailedToSend.get();
long avgLatency = sent > 0 ? totalLatency.get() / sent : 0;
System.out.println("\n=== Custom Metrics ===");
System.out.println("Messages sent: " + sent);
System.out.println("Messages failed: " + failed);
System.out.println("Average latency: " + avgLatency + " ms");
// Report Kafka producer metrics
Map<org.apache.kafka.common.MetricName, ? extends org.apache.kafka.common.Metric> metrics = producer.metrics();
System.out.println("\n=== Kafka Producer Metrics ===");
for (Map.Entry<org.apache.kafka.common.MetricName, ? extends org.apache.kafka.common.Metric> entry : metrics.entrySet()) {
org.apache.kafka.common.MetricName metricName = entry.getKey();
if (metricName.name().equals("record-send-rate") ||
metricName.name().equals("record-error-rate") ||
metricName.name().equals("request-latency-avg")) {
System.out.println(metricName.name() + ": " + entry.getValue().metricValue());
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
public static void main(String[] args) {
MonitoredProducer producer = new MonitoredProducer();
producer.run();
}
}
This producer tracks custom metrics like messages sent, failures, and latency. It also accesses Kafka's built-in metrics through the metrics method. In production, you would send these metrics to a monitoring system like Prometheus, Grafana, or Datadog.
PERFORMANCE TUNING AND BEST PRACTICES
Kafka performance depends on proper configuration. Let us discuss key tuning parameters for producers and consumers.
For producers, batch size and linger time are critical. The batch size determines how many bytes the producer accumulates before sending. Larger batches improve throughput but increase latency. The linger time specifies how long the producer waits for additional messages before sending a batch.
Here is an optimized producer configuration:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class OptimizedProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Reliability settings
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
// Performance settings
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 32 KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64 MB
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
for (int i = 0; i < 10000; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("user-events", "key" + i, "Message " + i);
producer.send(record);
}
}
}
}
The COMPRESSION_TYPE_CONFIG enables message compression, which reduces network bandwidth and storage. Snappy provides a good balance between compression ratio and CPU usage. The BATCH_SIZE_CONFIG of 32 KB allows the producer to batch multiple messages together. The LINGER_MS_CONFIG of 10 milliseconds gives the producer a brief window to accumulate more messages into each batch.
For consumers, fetch size and poll interval are important. Here is an optimized consumer configuration:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class OptimizedConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "optimized-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Performance settings
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 1 KB
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1 MB
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// Reliability settings
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("user-events"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Process record
}
consumer.commitSync();
}
}
}
}
The FETCH_MIN_BYTES_CONFIG tells the consumer to wait until at least 1 KB of data is available before returning from poll. This reduces the number of fetch requests. The MAX_POLL_RECORDS_CONFIG limits how many records are returned in a single poll, preventing the consumer from being overwhelmed.
COMPLETE RUNNING EXAMPLE: E-COMMERCE EVENT STREAMING SYSTEM
Now let us put everything together into a complete, production-ready example. We will build an e-commerce event streaming system with producers that generate order events and consumers that process them for different purposes.
First, let us create the domain model for our order events:
import com.fasterxml.jackson.annotation.JsonProperty;
import java.math.BigDecimal;
import java.util.List;
import java.util.Objects;
public class OrderEvent {
@JsonProperty("orderId")
private String orderId;
@JsonProperty("customerId")
private String customerId;
@JsonProperty("eventType")
private OrderEventType eventType;
@JsonProperty("timestamp")
private long timestamp;
@JsonProperty("items")
private List<OrderItem> items;
@JsonProperty("totalAmount")
private BigDecimal totalAmount;
@JsonProperty("shippingAddress")
private Address shippingAddress;
public OrderEvent() {
}
public OrderEvent(String orderId, String customerId, OrderEventType eventType,
long timestamp, List<OrderItem> items, BigDecimal totalAmount,
Address shippingAddress) {
this.orderId = orderId;
this.customerId = customerId;
this.eventType = eventType;
this.timestamp = timestamp;
this.items = items;
this.totalAmount = totalAmount;
this.shippingAddress = shippingAddress;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getCustomerId() {
return customerId;
}
public void setCustomerId(String customerId) {
this.customerId = customerId;
}
public OrderEventType getEventType() {
return eventType;
}
public void setEventType(OrderEventType eventType) {
this.eventType = eventType;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public List<OrderItem> getItems() {
return items;
}
public void setItems(List<OrderItem> items) {
this.items = items;
}
public BigDecimal getTotalAmount() {
return totalAmount;
}
public void setTotalAmount(BigDecimal totalAmount) {
this.totalAmount = totalAmount;
}
public Address getShippingAddress() {
return shippingAddress;
}
public void setShippingAddress(Address shippingAddress) {
this.shippingAddress = shippingAddress;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
OrderEvent that = (OrderEvent) o;
return timestamp == that.timestamp &&
Objects.equals(orderId, that.orderId) &&
Objects.equals(customerId, that.customerId) &&
eventType == that.eventType &&
Objects.equals(items, that.items) &&
Objects.equals(totalAmount, that.totalAmount) &&
Objects.equals(shippingAddress, that.shippingAddress);
}
@Override
public int hashCode() {
return Objects.hash(orderId, customerId, eventType, timestamp, items, totalAmount, shippingAddress);
}
@Override
public String toString() {
return "OrderEvent{" +
"orderId='" + orderId + '\'' +
", customerId='" + customerId + '\'' +
", eventType=" + eventType +
", timestamp=" + timestamp +
", items=" + items +
", totalAmount=" + totalAmount +
", shippingAddress=" + shippingAddress +
'}';
}
}
public enum OrderEventType {
CREATED,
PAYMENT_RECEIVED,
SHIPPED,
DELIVERED,
CANCELLED
}
import com.fasterxml.jackson.annotation.JsonProperty;
import java.math.BigDecimal;
import java.util.Objects;
public class OrderItem {
@JsonProperty("productId")
private String productId;
@JsonProperty("productName")
private String productName;
@JsonProperty("quantity")
private int quantity;
@JsonProperty("price")
private BigDecimal price;
public OrderItem() {
}
public OrderItem(String productId, String productName, int quantity, BigDecimal price) {
this.productId = productId;
this.productName = productName;
this.quantity = quantity;
this.price = price;
}
public String getProductId() {
return productId;
}
public void setProductId(String productId) {
this.productId = productId;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public int getQuantity() {
return quantity;
}
public void setQuantity(int quantity) {
this.quantity = quantity;
}
public BigDecimal getPrice() {
return price;
}
public void setPrice(BigDecimal price) {
this.price = price;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
OrderItem orderItem = (OrderItem) o;
return quantity == orderItem.quantity &&
Objects.equals(productId, orderItem.productId) &&
Objects.equals(productName, orderItem.productName) &&
Objects.equals(price, orderItem.price);
}
@Override
public int hashCode() {
return Objects.hash(productId, productName, quantity, price);
}
@Override
public String toString() {
return "OrderItem{" +
"productId='" + productId + '\'' +
", productName='" + productName + '\'' +
", quantity=" + quantity +
", price=" + price +
'}';
}
}
———-
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
public class Address {
@JsonProperty("street")
private String street;
@JsonProperty("city")
private String city;
@JsonProperty("state")
private String state;
@JsonProperty("zipCode")
private String zipCode;
@JsonProperty("country")
private String country;
public Address() {
}
public Address(String street, String city, String state, String zipCode, String country) {
this.street = street;
this.city = city;
this.state = state;
this.zipCode = zipCode;
this.country = country;
}
public String getStreet() {
return street;
}
public void setStreet(String street) {
this.street = street;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
public String getZipCode() {
return zipCode;
}
public void setZipCode(String zipCode) {
this.zipCode = zipCode;
}
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Address address = (Address) o;
return Objects.equals(street, address.street) &&
Objects.equals(city, address.city) &&
Objects.equals(state, address.state) &&
Objects.equals(zipCode, address.zipCode) &&
Objects.equals(country, address.country);
}
@Override
public int hashCode() {
return Objects.hash(street, city, state, zipCode, country);
}
@Override
public String toString() {
return "Address{" +
"street='" + street + '\'' +
", city='" + city + '\'' +
", state='" + state + '\'' +
", zipCode='" + zipCode + '\'' +
", country='" + country + '\'' +
'}';
}
}
Now let us create the serializer and deserializer for OrderEvent:
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
public class OrderEventSerializer implements Serializer<OrderEvent> {
private final ObjectMapper objectMapper;
public OrderEventSerializer() {
this.objectMapper = new ObjectMapper();
this.objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// No additional configuration needed
}
@Override
public byte[] serialize(String topic, OrderEvent data) {
if (data == null) {
return null;
}
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing OrderEvent to JSON", e);
}
}
@Override
public void close() {
// No cleanup needed
}
}
——
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
public class OrderEventDeserializer implements Deserializer<OrderEvent> {
private final ObjectMapper objectMapper;
public OrderEventDeserializer() {
this.objectMapper = new ObjectMapper();
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// No additional configuration needed
}
@Override
public OrderEvent deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
try {
return objectMapper.readValue(data, OrderEvent.class);
} catch (Exception e) {
throw new SerializationException("Error deserializing OrderEvent from JSON", e);
}
}
@Override
public void close() {
// No cleanup needed
}
}
Now let us create a comprehensive order event producer:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
public class OrderEventProducerService {
private static final Logger logger = LoggerFactory.getLogger(OrderEventProducerService.class);
private static final String TOPIC_NAME = "order-events";
private final KafkaProducer<String, OrderEvent> producer;
private final AtomicBoolean running = new AtomicBoolean(true);
private final AtomicLong successCount = new AtomicLong(0);
private final AtomicLong failureCount = new AtomicLong(0);
public OrderEventProducerService(String bootstrapServers) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, OrderEventSerializer.class.getName());
// Reliability settings
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
// Performance settings
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);
// Timeout settings
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
this.producer = new KafkaProducer<>(props);
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
}
public void sendOrderEvent(OrderEvent event) {
if (!running.get()) {
throw new IllegalStateException("Producer is shutting down");
}
ProducerRecord<String, OrderEvent> record = new ProducerRecord<>(
TOPIC_NAME,
event.getOrderId(),
event
);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
failureCount.incrementAndGet();
logger.error("Failed to send order event for order {}: {}",
event.getOrderId(), exception.getMessage(), exception);
} else {
successCount.incrementAndGet();
logger.info("Successfully sent order event for order {} to partition {} at offset {}",
event.getOrderId(), metadata.partition(), metadata.offset());
}
}
});
}
public Future<RecordMetadata> sendOrderEventSync(OrderEvent event) throws InterruptedException, ExecutionException {
if (!running.get()) {
throw new IllegalStateException("Producer is shutting down");
}
ProducerRecord<String, OrderEvent> record = new ProducerRecord<>(
TOPIC_NAME,
event.getOrderId(),
event
);
return producer.send(record);
}
public void flush() {
producer.flush();
}
public long getSuccessCount() {
return successCount.get();
}
public long getFailureCount() {
return failureCount.get();
}
public void shutdown() {
logger.info("Shutting down producer...");
running.set(false);
try {
producer.flush();
producer.close();
logger.info("Producer shut down successfully. Success: {}, Failures: {}",
successCount.get(), failureCount.get());
} catch (Exception e) {
logger.error("Error during producer shutdown", e);
}
}
public static void main(String[] args) throws InterruptedException {
OrderEventProducerService producerService = new OrderEventProducerService("localhost:9092");
// Simulate order creation
Random random = new Random();
String[] customerIds = {"CUST001", "CUST002", "CUST003", "CUST004", "CUST005"};
String[] products = {"Laptop", "Mouse", "Keyboard", "Monitor", "Headphones"};
for (int i = 0; i < 100; i++) {
String orderId = "ORD" + String.format("%06d", i + 1);
String customerId = customerIds[random.nextInt(customerIds.length)];
List<OrderItem> items = new ArrayList<>();
int numItems = random.nextInt(3) + 1;
BigDecimal totalAmount = BigDecimal.ZERO;
for (int j = 0; j < numItems; j++) {
String product = products[random.nextInt(products.length)];
int quantity = random.nextInt(3) + 1;
BigDecimal price = new BigDecimal(random.nextInt(500) + 50);
items.add(new OrderItem("PROD" + j, product, quantity, price));
totalAmount = totalAmount.add(price.multiply(new BigDecimal(quantity)));
}
Address address = new Address(
random.nextInt(9999) + " Main St",
"San Francisco",
"CA",
String.format("%05d", random.nextInt(99999)),
"USA"
);
OrderEvent event = new OrderEvent(
orderId,
customerId,
OrderEventType.CREATED,
System.currentTimeMillis(),
items,
totalAmount,
address
);
producerService.sendOrderEvent(event);
// Simulate some delay between orders
Thread.sleep(100);
}
producerService.flush();
logger.info("All events sent. Success: {}, Failures: {}",
producerService.getSuccessCount(), producerService.getFailureCount());
// Keep running for a bit to ensure all callbacks complete
Thread.sleep(2000);
producerService.shutdown();
}
}
Now let us create multiple consumers for different purposes. First, a warehouse fulfillment consumer:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class WarehouseFulfillmentConsumer {
private static final Logger logger = LoggerFactory.getLogger(WarehouseFulfillmentConsumer.class);
private static final String TOPIC_NAME = "order-events";
private static final String GROUP_ID = "warehouse-fulfillment-group";
private final KafkaConsumer<String, OrderEvent> consumer;
private final AtomicBoolean running = new AtomicBoolean(true);
private final Map<String, OrderEvent> orderCache = new HashMap<>();
public WarehouseFulfillmentConsumer(String bootstrapServers) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, OrderEventDeserializer.class.getName());
// Consumer settings
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
this.consumer = new KafkaConsumer<>(props);
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
}
public void run() {
try {
consumer.subscribe(Collections.singletonList(TOPIC_NAME), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
logger.info("Partitions revoked: {}", partitions);
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
logger.info("Partitions assigned: {}", partitions);
}
});
logger.info("Warehouse fulfillment consumer started");
while (running.get()) {
try {
ConsumerRecords<String, OrderEvent> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, OrderEvent> record : records) {
processOrderEvent(record.value());
}
if (!records.isEmpty()) {
consumer.commitSync();
}
} catch (WakeupException e) {
if (!running.get()) {
break;
}
} catch (Exception e) {
logger.error("Error processing records", e);
}
}
} finally {
try {
consumer.commitSync();
} catch (Exception e) {
logger.error("Error committing final offsets", e);
}
consumer.close();
logger.info("Warehouse fulfillment consumer shut down");
}
}
private void processOrderEvent(OrderEvent event) {
logger.info("Processing order event: Order={}, Type={}, Customer={}",
event.getOrderId(), event.getEventType(), event.getCustomerId());
switch (event.getEventType()) {
case CREATED:
handleOrderCreated(event);
break;
case PAYMENT_RECEIVED:
handlePaymentReceived(event);
break;
case SHIPPED:
handleOrderShipped(event);
break;
case DELIVERED:
handleOrderDelivered(event);
break;
case CANCELLED:
handleOrderCancelled(event);
break;
default:
logger.warn("Unknown event type: {}", event.getEventType());
}
}
private void handleOrderCreated(OrderEvent event) {
orderCache.put(event.getOrderId(), event);
logger.info("Order {} created with {} items, total amount: {}",
event.getOrderId(), event.getItems().size(), event.getTotalAmount());
// In a real system, you would:
// - Check inventory availability
// - Reserve items
// - Create picking list
logger.info("Reserved inventory for order {}", event.getOrderId());
}
private void handlePaymentReceived(OrderEvent event) {
logger.info("Payment received for order {}, preparing for shipment", event.getOrderId());
// In a real system, you would:
// - Generate shipping label
// - Create packing slip
// - Assign to warehouse worker
}
private void handleOrderShipped(OrderEvent event) {
logger.info("Order {} shipped to {}", event.getOrderId(), event.getShippingAddress());
// In a real system, you would:
// - Update inventory
// - Send tracking information
}
private void handleOrderDelivered(OrderEvent event) {
logger.info("Order {} delivered successfully", event.getOrderId());
orderCache.remove(event.getOrderId());
}
private void handleOrderCancelled(OrderEvent event) {
logger.info("Order {} cancelled, releasing inventory", event.getOrderId());
// In a real system, you would:
// - Release reserved inventory
// - Process refund if payment was received
orderCache.remove(event.getOrderId());
}
public void shutdown() {
logger.info("Shutting down warehouse fulfillment consumer...");
running.set(false);
consumer.wakeup();
}
public static void main(String[] args) {
WarehouseFulfillmentConsumer consumer = new WarehouseFulfillmentConsumer("localhost:9092");
consumer.run();
}
}
Now let us create an analytics consumer that tracks order statistics:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
public class OrderAnalyticsConsumer {
private static final Logger logger = LoggerFactory.getLogger(OrderAnalyticsConsumer.class);
private static final String TOPIC_NAME = "order-events";
private static final String GROUP_ID = "order-analytics-group";
private final KafkaConsumer<String, OrderEvent> consumer;
private final AtomicBoolean running = new AtomicBoolean(true);
// Analytics data structures
private final AtomicLong totalOrders = new AtomicLong(0);
private final Map<String, AtomicLong> ordersByCustomer = new ConcurrentHashMap<>();
private final Map<OrderEventType, AtomicLong> ordersByType = new ConcurrentHashMap<>();
private BigDecimal totalRevenue = BigDecimal.ZERO;
private final Object revenueLock = new Object();
public OrderAnalyticsConsumer(String bootstrapServers) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, OrderEventDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
this.consumer = new KafkaConsumer<>(props);
// Initialize event type counters
for (OrderEventType type : OrderEventType.values()) {
ordersByType.put(type, new AtomicLong(0));
}
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
}
public void run() {
try {
consumer.subscribe(Collections.singletonList(TOPIC_NAME), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
logger.info("Partitions revoked: {}", partitions);
consumer.commitSync();
printStatistics();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
logger.info("Partitions assigned: {}", partitions);
}
});
logger.info("Order analytics consumer started");
// Start statistics reporting thread
Thread statsThread = new Thread(this::reportStatisticsPeriodically);
statsThread.setDaemon(true);
statsThread.start();
while (running.get()) {
try {
ConsumerRecords<String, OrderEvent> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, OrderEvent> record : records) {
analyzeOrderEvent(record.value());
}
if (!records.isEmpty()) {
consumer.commitSync();
}
} catch (WakeupException e) {
if (!running.get()) {
break;
}
} catch (Exception e) {
logger.error("Error processing records", e);
}
}
} finally {
try {
consumer.commitSync();
} catch (Exception e) {
logger.error("Error committing final offsets", e);
}
consumer.close();
printStatistics();
logger.info("Order analytics consumer shut down");
}
}
private void analyzeOrderEvent(OrderEvent event) {
// Count by event type
ordersByType.get(event.getEventType()).incrementAndGet();
// Track orders by customer
ordersByCustomer.computeIfAbsent(event.getCustomerId(), k -> new AtomicLong(0))
.incrementAndGet();
// Track revenue for created orders
if (event.getEventType() == OrderEventType.CREATED) {
totalOrders.incrementAndGet();
synchronized (revenueLock) {
totalRevenue = totalRevenue.add(event.getTotalAmount());
}
}
}
private void reportStatisticsPeriodically() {
while (running.get()) {
try {
Thread.sleep(10000); // Report every 10 seconds
printStatistics();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
private void printStatistics() {
logger.info("\n========== ORDER ANALYTICS ==========");
logger.info("Total Orders: {}", totalOrders.get());
synchronized (revenueLock) {
logger.info("Total Revenue: ${}", totalRevenue);
if (totalOrders.get() > 0) {
BigDecimal avgOrderValue = totalRevenue.divide(
new BigDecimal(totalOrders.get()), 2, BigDecimal.ROUND_HALF_UP);
logger.info("Average Order Value: ${}", avgOrderValue);
}
}
logger.info("\nOrders by Event Type:");
ordersByType.forEach((type, count) ->
logger.info(" {}: {}", type, count.get()));
logger.info("\nTop Customers:");
ordersByCustomer.entrySet().stream()
.sorted((e1, e2) -> Long.compare(e2.getValue().get(), e1.getValue().get()))
.limit(5)
.forEach(entry ->
logger.info(" {}: {} orders", entry.getKey(), entry.getValue().get()));
logger.info("=====================================\n");
}
public void shutdown() {
logger.info("Shutting down order analytics consumer...");
running.set(false);
consumer.wakeup();
}
public static void main(String[] args) {
OrderAnalyticsConsumer consumer = new OrderAnalyticsConsumer("localhost:9092");
consumer.run();
}
}
Finally, let us create a notification consumer that sends alerts:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class NotificationConsumer {
private static final Logger logger = LoggerFactory.getLogger(NotificationConsumer.class);
private static final String TOPIC_NAME = "order-events";
private static final String GROUP_ID = "notification-group";
private final KafkaConsumer<String, OrderEvent> consumer;
private final AtomicBoolean running = new AtomicBoolean(true);
public NotificationConsumer(String bootstrapServers) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, OrderEventDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
this.consumer = new KafkaConsumer<>(props);
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
}
public void run() {
try {
consumer.subscribe(Collections.singletonList(TOPIC_NAME), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
logger.info("Partitions revoked: {}", partitions);
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
logger.info("Partitions assigned: {}", partitions);
}
});
logger.info("Notification consumer started");
while (running.get()) {
try {
ConsumerRecords<String, OrderEvent> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, OrderEvent> record : records) {
sendNotification(record.value());
}
if (!records.isEmpty()) {
consumer.commitSync();
}
} catch (WakeupException e) {
if (!running.get()) {
break;
}
} catch (Exception e) {
logger.error("Error processing records", e);
}
}
} finally {
try {
consumer.commitSync();
} catch (Exception e) {
logger.error("Error committing final offsets", e);
}
consumer.close();
logger.info("Notification consumer shut down");
}
}
private void sendNotification(OrderEvent event) {
String message = buildNotificationMessage(event);
// In a real system, you would send email, SMS, or push notification
logger.info("NOTIFICATION to {}: {}", event.getCustomerId(), message);
// Simulate notification delivery
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private String buildNotificationMessage(OrderEvent event) {
switch (event.getEventType()) {
case CREATED:
return String.format("Your order %s has been received. Total: $%s",
event.getOrderId(), event.getTotalAmount());
case PAYMENT_RECEIVED:
return String.format("Payment confirmed for order %s. Preparing for shipment.",
event.getOrderId());
case SHIPPED:
return String.format("Your order %s has been shipped to %s",
event.getOrderId(), event.getShippingAddress().getCity());
case DELIVERED:
return String.format("Your order %s has been delivered. Thank you for your purchase!",
event.getOrderId());
case CANCELLED:
return String.format("Your order %s has been cancelled. Refund will be processed within 3-5 business days.",
event.getOrderId());
default:
return String.format("Order %s status update: %s",
event.getOrderId(), event.getEventType());
}
}
public void shutdown() {
logger.info("Shutting down notification consumer...");
running.set(false);
consumer.wakeup();
}
public static void main(String[] args) {
NotificationConsumer consumer = new NotificationConsumer("localhost:9092");
consumer.run();
}
}
This complete example demonstrates a production-ready Kafka application with proper error handling, monitoring, graceful shutdown, and multiple consumer groups processing the same data for different purposes. The producer generates realistic order events, and the three consumers each handle the events differently: the warehouse consumer manages fulfillment, the analytics consumer tracks statistics, and the notification consumer sends customer alerts. Each component follows best practices for reliability, performance, and maintainability.
No comments:
Post a Comment