This tutorial assumes you have a proper environment setup to access the DAPLAB cluster.

This page aims at creating a "copy-paste"-like tutorial to publish and receive your first Kafka messages.


Introduction

Apache Kafka is a distributed, partitioned, replicated commit log service. It is designed to be elastic and easy to extend: it is possible to add machines, called brokers to a Kafka cluster without interruption.

Within a cluster (a group of machines/brokers), Kafka maintains feeds of messages in categories called topics. Each topic is divided into one or more partitions, where messages are stored. Multiple partitions is are interesting when a topic expects a large amount of messages, since each partition is handled by one broker. Multiple brokers thus means more storage space and, more importantly, parallelized treatment.

Kafka - cluster, partitions and brokers

In the above figure, topic A has one partition replicated twice. Topic B has two partitions, also replicated twice. Thus, each partition has a leader, responsible of any read/write operation initialized by consumers/producers, and two followers, which hold a passive replica of the data. In case the leader fails, one of the two followers is ready to take its place. The same broker can be leader and follower for different partitions.

Producers publish data into a topic of their choice. They are also responsible for choosing which message is attributed to which partition. They can thus choose their approach for load balancing. By default, Kafka will distribute data at random between partitions. When a new message is added to a partition, it receives a unique sequential numeric identifier called an offset. Thus, ordering within a partition is guaranteed, but not across partitions.

Consumers retrieve messages in a pulling fashion. They can choose one of two messaging models, queuing or publish-subscribe, through the notion of consumer groups. In short each consumer labels itself with a consumer group name. If multiple consumers have the same consumer group, then you have a traditional queuing system: each message is treated by one consumer only (load-balancing - for this to work well, you should have as many consumers as you have partitions inside the topic). If multiple consumer groups exist, then you have a publish-subscribe system: all messages are broadcasted to all consumers. You can of course mix the two approaches.

Kafka - consumer groups

The figure above represents a two server (brokers) Kafka cluster hosting four partitions (P0-P3) with two consumer groups. Consumer group A has two consumer instances and group B has four. Group A and Group B use the publish-subscribe feature, while consumers inside each group use the queuing feature.

For more information about how Kafka works, have a look at the documentation.


Resources

Terminology

Here is the basic terminology we'll use throughout the tutorial:

Consumers

Kafka has two types of consumers: Group Consumers and Simple Consumer.

It's thus always a trade-off to decide which one to pick, depending what you're trying to build. Lots of softwares are using both Simple and Group consumer, depending the task to achieve.

Before Starting: Zookeeper config

Side note: it is important to have some way to coordinating tasks, state management, configuration, etc across a distributed system. While some projects build their own mechanisms, Kafka relies on Zookeeper for coordination.

All the following examples assume you're running on the DAPLAB infrastructure. We're setting the servers into variable for sake of clarity and protability in the examples below.

ZK="daplab-wn-22.fri.lan,daplab-wn-25.fri.lan,daplab-wn-33.fri.lan"
BROKERS="daplab-rt-11.fri.lan:6667,daplab-rt-12.fri.lan:6667,daplab-rt-13.fri.lan:6667,daplab-rt-14.fri.lan:6667"

Listing existing topics

/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $ZK --list

Create a new topic

/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create \
  --zookeeper $ZK \
  --replication-factor 2 \
  --partitions 1 \
  --topic test-$(whoami)

Mind that this example creates a topic with one single partition. You can obviously create more partitions within your topic, but the consuming examples must be slightly adapted.

Publishing messages

echo -e "message1\nmessage2\nmessage3" | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh \
  --broker-list $BROKERS \
  --topic test-$(whoami)

Read topic offset

This command line call helps you validating if your messages have been properly published

/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list $BROKERS \
  --time -1 \
  --topic test-$(whoami)

Reading messages

Read the 3 first messages from partition 0 of the topic:

/usr/hdp/current/kafka-broker/bin/kafka-simple-consumer-shell.sh \
  --broker-list $BROKERS \
  --topic test-$(whoami) \
  --offset -2 \
  --max-messages 3 \
  --partition 0

Read the first 3 messages of the topic, if you have more than one partition

/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh \
  --zookeeper $ZK \
  --topic test-$(whoami) \
  --from-beginning \
  --max-messages 3

Java Code

Please checkout Kafka Starter project to get a complete Java, Maven-based project to import in your IDE, including embedded Kafka server for testing.

## Clone the project
git clone https://github.com/daplab/kafka-starter.git

## Build it
mvn clean install