How to send message to kafka
How to send message to kafka
Kafka Tutorial: Using Kafka from the command line
If you are not sure what Kafka is, start here “What is Kafka?”.
Getting started with Kafka tutorial
Let’s show a simple example using producers and consumers from the Kafka command line.
Download Kafka 0.10.2.x from the Kafka download page. Later versions will likely work, but this was example was done with 0.10.2.x.
We assume that you have Java SDK 1.8.x installed.
We unzipped the Kafka download and put it in
Next, we are going to run ZooKeeper and then run Kafka Server/Broker. We will use some Kafka command line utilities, to create Kafka topics, send messages via a producer and consume messages from the command line.
Run ZooKeeper for Kafka
Kafka relies on ZooKeeper. To keep things simple, we will use a single ZooKeeper node.
Kafka provides a startup script for ZooKeeper called zookeeper-server-start.sh which is located at
The Kafka distribution also provide a ZooKeeper config file which is setup to run single node.
To run ZooKeeper, we create this script in kafka-training and run it.
Run run-zookeeper.sh
Wait about 30 seconds or so for ZooKeeper to startup.
Run Kafka Server
To run Kafka, we create this script in kafka-training and run it in another terminal window.
Run run-kafka.sh
Wait about 30 seconds or so for Kafka to startup.
Now let’s create the topic that we will send records on.
Create Kafka Topic
Kafka also provides a utility to work with topics called kafka-topics.sh which is located at
Run create-topic.sh
List Topics
You can see which topics that Kafka is managing using kafka-topics.sh as follows.
Create the file in
Run list-topics.sh
You can see the topic my-topic in the list of topics.
Run Kafka Producer Console
The Kafka distribution provides a command utility to send messages from the command line. It start up a terminal window where everything you type is sent to the Kafka topic.
Kafka provides the utility kafka-console-producer.sh which is located at
/kafka-training/kafka/bin/kafka-console-producer.sh to send messages to a topic on the command line.
Create the file in
/kafka-training/lab1/start-producer-console.sh and run it.
Run start-producer-console.sh and send at least four messages
In order to see these messages, we will need to run the consumer console.
Run Kafka Consumer Console
The Kafka distribution provides a command utility to see messages from the command line. It displays the messages in various modes.
Kafka provides the utility kafka-console-consumer.sh which is located at
/kafka-training/kafka/bin/kafka-console-producer.sh to receive messages from a topic on the command line.
Create the file in
/kafka-training/lab1/start-consumer-console.sh and run it.
Run start-consumer-console.sh in another terminal
Notice that the messages are not coming in order. This is because we only have one consumer so it is reading the messages from all 13 partitions. Order is only guaranteed within a partition.
How to Publish and Subscribe messages (with key) using Kafka?
Learn how you can publish and subscribe to messages in the easiest ways.
Kafka messages sent from the Producers have two properties:
Note: before we have sent message is without keys.
→ We are going to publish all the records, start with the same character should go to the same partition. So that consumers can read the messages in the order they will publish to the topic.
In the above diagram, we have used the key- A the key can be of any type, in this example, we are using String.
When a message is sent, Kafka Partioner is going to apply some hashing techniques to determine the partition value and if the same key is sent then it is going to resolve to the same partitions. Partition-0 in these cases.
Now in order to message to the different partitions, we are going to another key say key- B so that it will go to Partition-1 and so on…
Without Key (earlier method)
With Key
Note: The commands for without key and with key are the same, the only difference is that with the key we have two additional properties.
Sending and Receiving messages with Key
How to instantiate a Console Producer with Key?
Step 1: let’s start our Kafka producer
→ Now we have sent messages till now, now Kafka consumer is waiting to consume these messages.
How to instantiate a Console Consumer?
Step 2: let’s start our Kafka consumer
Note: The rest of the messages is showing null because they are produced as key-value as null and order is not maintained they are totally in the different partitions.
Kafka Messaging Explored
In this post, we will take a closer look at Apache Kafka Messaging. We will show how you can easily start a Kafka cluster and how messages can be sent and received by means of CLI and from a Java application. At the end, we will explore how partitions work from a practical point of view.
1. Introduction
It is advised to read some documentation if you are not yet acquainted with Apache Kafka Messaging. There is quite a lot of documentation available which is also similar to each other. A good starting point is the official Apache Kafka documentation which can be found here. Another good reference is from Kevin Sookocheff, it contains duplicate information compared to the official documentation but it has a nice section about partitions, which we will cover at the end of this post.
As mentioned before, this post will not be a theoretical exercise, but we are going to try to get more acquainted with Apache Kafka Messaging from a more practical point of view. The sources that are being used can be found at GitHub.
2. Run Kafka
An easy way to run a Kafka cluster on your local machine, is to use the Wurstmeister Kafka Docker Compose file. Download the git repository or clone it to your local machine. We will be using Ubuntu 18.04. Also make sure that you have installed Docker Compose.
Change the line:
We can start the Kafka cluster now, make sure that you execute the following command from the directory where the docker-compose-single-broker.yml file resides:
We now have a running Kafka cluster.
In order to stop the cluster, we can issue the following command (again, make sure that this command is executed from the directory where the docker-compose-single-broker.yml file resides):
3. Send and Receive Messages by Means of CLI
Now that we have a running Kafka cluster, we are already able to send and receive messages. When starting the Kafka cluster with Docker Compose, a topic test was automatically created.
In order to send messages to a topic, we need to create a Producer. We do so by means of the kafka-console-producer.sh script in the bin directory of the Kafka binary download.
Parameter broker-list indicates the Kafka cluster we are connecting to, parameter topic indicates to which topic we want to send messages to.
In order to receive messages from a topic, we need to create a Consumer. We do so by means of the kafka-console-consumer.sh script in the bin directory of the Kafka binary download. Start this command in another terminal window.
Parameter bootstrap-server indicates the Kafka cluster we are connecting to, parameter topic indicates from which topic we want to receive the messages, parameter from-beginning indicates that we want to receive all messages present in the topic, also those that are sent to the topic before we connected to the topic.
At this moment, it is possible to enter messages in the Producer terminal window and to receive them in the Consumer terminal window. When you terminate the Consumer, and then connect again, you will see that all messages are received again because of the from-beginning parameter.
4. Send and Receive Messages by Means of Java
In this section, we will send and receive messages by means of a Java application. We will do so based on the JavaDoc for the KafkaProducer and the JavaDoc for the KafkaConsumer.
We are going to create a Maven multi-module project with Java 11 containing the following modules:
4.1 The Kafka Producer
The Kafka Producer will send 100 messages to the topic when a URL is invoked. We will make use of Spring Web MVC in order to do so. Therefore, we add the dependency spring-boot-starter-web to the pom and also the dependency kafka-clients in order to access the Java classes for sending messages to the topic.
4.2 The Kafka Consumer
4.3 Test the Java Application
In order to test the Java application, we first need to create the topic. Ensure that the Kafka cluster is running and execute the following command from the bin directory from the Kafka binary download. This command will create the my-kafka-topic with 1 partition for us.
Check whether the topic has been created successfully:
Start the Kafka Producer by executing the following command from the mykafkaproducerplanet directory:
At this point, we first check whether we can send messages to the topic by invoking the URL http://localhost:8081/sendMessages/. The response ‘Messages sent’ is received and in the console output we can verify that 100 messages have been sent (the offsets can differ from what you see in your console):
Start the Kafka Consumer by executing the following command from the mykafkaconsumerplanet directory:
After successful startup of the application, the 100 messages present in the topic are printed to the console:
5. Something about Partitions
Up till now, we made use of a topic with one partition, one producer and one consumer. The producer sends data to the partition of the topic, the consumer consumes all data from the one partition. But what happens when we have another consumer? And what happens when we have two partitions?
5.1 One Partition, Two Consumers
In order to see what happens when we have two consumers belonging to the same consumer group (i.e. we have one logical consumer), we just start a second consumer application at another port.
Now invoke the URL again in order to send another 100 messages. What we see now, is that all messages are consumed by the second consumer. Stop the second consumer (the one that consumed the messages) and send another 100 messages. Now the first consumer is consuming the messages. When we start the second consumer again and send another 100 messages, the messages are consumed by the second consumer again. This confirms what is stated in the documentation that a consumer consumes from exactly one partition. In our case, the other consumer, which is not consuming the messages, is taking over when the consuming consumer crashes.
5.2 Two Partitions, Two Consumers
Start the producer and two consumers just like we did before. Invoke the URL in order to send 100 messages to the topic. We now can see that approximately half of the messages are sent to each partition. Each consumer is subscribed to one of the partitions and consumes the messages from that partition.
Snippet of log of first consumer:
Snippet of log of second consumer:
6. Conclusion
We showed how you can easily start a Kafka cluster on a local machine and how messages can be sent and received by means of CLI. Also, a producer and consumer Java application is created for sending and receiving messages. At the end, we took a closer look at how partitions are used in combination with more than one consumers.
Spring Boot Kafka JSON Message:
We can publish the JSON messages to Apache Kafka through spring boot application, in the previous article we have seen how to send simple string messages to Kafka.
Technologies:
Prerequisites:
To run the application, you have to have Apache Kafka installed on your machine. I have provided a couple of articles which helps you to install Apache Kafka on Windows and Ubuntu operating systems.
After successful installation, start the zookeeper, Kafka servers to connect from spring boot application.
Start Zookeeper server:
Start Kafka Server:
Create a Kafka Topic:
Start Kafka consumer console:
1. Sending Spring Boot JSON Message to Kafka:
On the above we have created an items-topic from Kafka cli, now we are going to send some JSON messages to items-topic using KafkaTemplate through Spring Boot application.
2. Dependencies:
3. JSON representing class:
This class represents a JSON message to sending messages to Kafka topic.
4. Kafka JSON Configuration:
Since we are going to send JSON messages to Kafka topic, we have to configure the KafkaProducerFactory with JsonSerializer class. The default configuration for KafkaProducerFactory is StringSerializer, so we don’t have to configure this to send simple messages to Kafka topic.
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG tells Kafka IP address and port “127.0.0.1:9092. Currently, I am going to use my local Kafka so that it 127.0.0.1 and the port is 9092.
Note: You can find this config information at Kafka/config/server.properties file.
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG tells the type of key which we are going to send messages to a Kafka topic StringSerializer.class.
5. Rest Controller:
It a simple rest client having one post method which will send JSON message to Kafka topic (items-topic) using KafkaTemplate.
6. Spring Main:
7. Run it:
8. Sending JSON messages to Kafka Topic:
Access the application from the postman and make a post request.
9. Kafka Consumer Console:
We can see our JSON message on the Kafka consumer console whether it consumed or not.
Documentation
Kafka 0.9.0
1. Getting Started
1.1 Introduction
Kafka® is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.
What does all that mean?
First let’s review some basic messaging terminology:
Topics and Logs
A topic is a category or feed name to which messages are published. For each topic, the Kafka cluster maintains a partitioned log that looks like this:
The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time. For example if the log retention is set to two days, then for the two days after a message is published it is available for consumption, after which it will be discarded to free up space. Kafka’s performance is effectively constant with respect to data size so retaining lots of data is not a problem.
In fact the only metadata retained on a per-consumer basis is the position of the consumer in the log, called the «offset». This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads messages, but in fact the position is controlled by the consumer and it can consume messages in any order it likes. For example a consumer can reset to an older offset to reprocess.
This combination of features means that Kafka consumers are very cheap—they can come and go without much impact on the cluster or on other consumers. For example, you can use our command line tools to «tail» the contents of any topic without changing what is consumed by any existing consumers.
The partitions in the log serve several purposes. First, they allow the log to scale beyond a size that will fit on a single server. Each individual partition must fit on the servers that host it, but a topic may have many partitions so it can handle an arbitrary amount of data. Second they act as the unit of parallelism—more on that in a bit.
Distribution
Each partition has one server which acts as the «leader» and zero or more servers which act as «followers». The leader handles all read and write requests for the partition while the followers passively replicate the leader. If the leader fails, one of the followers will automatically become the new leader. Each server acts as a leader for some of its partitions and a follower for others so load is well balanced within the cluster.
Producers
Consumers
Consumers label themselves with a consumer group name, and each message published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.
If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers.
If all the consumer instances have different consumer groups, then this works like publish-subscribe and all messages are broadcast to all consumers.
More commonly, however, we have found that topics have a small number of consumer groups, one for each «logical subscriber». Each group is composed of many consumer instances for scalability and fault tolerance. This is nothing more than publish-subscribe semantics where the subscriber is cluster of consumers instead of a single process.
Kafka has stronger ordering guarantees than a traditional messaging system, too.
A traditional queue retains messages in-order on the server, and if multiple consumers consume from the queue then the server hands out messages in the order they are stored. However, although the server hands out messages in order, the messages are delivered asynchronously to consumers, so they may arrive out of order on different consumers. This effectively means the ordering of the messages is lost in the presence of parallel consumption. Messaging systems often work around this by having a notion of «exclusive consumer» that allows only one process to consume from a queue, but of course this means that there is no parallelism in processing.
Kafka does it better. By having a notion of parallelism—the partition—within the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes. This is achieved by assigning the partitions in the topic to the consumers in the consumer group so that each partition is consumed by exactly one consumer in the group. By doing this we ensure that the consumer is the only reader of that partition and consumes the data in order. Since there are many partitions this still balances the load over many consumer instances. Note however that there cannot be more consumer instances in a consumer group than partitions.
Kafka only provides a total order over messages within a partition, not between different partitions in a topic. Per-partition ordering combined with the ability to partition data by key is sufficient for most applications. However, if you require a total order over messages this can be achieved with a topic that has only one partition, though this will mean only one consumer process per consumer group.
Guarantees
1.2 Use Cases
Messaging
In our experience messaging uses are often comparatively low-throughput, but may require low end-to-end latency and often depend on the strong durability guarantees Kafka provides.
In this domain Kafka is comparable to traditional messaging systems such as ActiveMQ or RabbitMQ.
Website Activity Tracking
Activity tracking is often very high volume as many activity messages are generated for each user page view.
Metrics
Log Aggregation
Stream Processing
Event Sourcing
Commit Log
1.3 Quick Start
Step 1: Download the code
Step 2: Start the server
Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don’t already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance. Now start the Kafka server:
Step 3: Create a topic
Step 4: Send some messages
Run the producer and then type a few messages into the console to send to the server.
Step 5: Start a consumer
If you have each of the above commands running in a different terminal then you should now be able to type messages into the producer terminal and see them appear in the consumer terminal.
All of the command line tools have additional options; running the command with no arguments will display usage information documenting them in more detail.
Step 6: Setting up a multi-broker cluster
First we make a config file for each of the brokers: Now edit these new files and set the following properties: The broker.id property is the unique and permanent name of each node in the cluster. We have to override the port and log directory only because we are running these all on the same machine and we want to keep the brokers from all trying to register on the same port or overwrite each others data.
We can run the same command on the original topic we created to see where it is: So there is no surprise there—the original topic has no replicas and is on server 0, the only server in our cluster when we created it.
Let’s publish a few messages to our new topic: Now let’s consume these messages: Now let’s test out fault-tolerance. Broker 1 was acting as the leader so let’s kill it: Leadership has switched to one of the slaves and node 1 is no longer in the in-sync replica set: But the messages are still be available for consumption even though the leader that took the writes originally is down:
Step 7: Use Kafka Connect to import/export data
1.4 Ecosystem
1.5 Upgrading From Previous Versions
Upgrading from 0.8.0, 0.8.1.X or 0.8.2.X to 0.9.0.0
For a rolling upgrade:
Note: If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.
Note: Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.
Potential breaking changes in 0.9.0.0
Notable changes in 0.9.0.1
Deprecations in 0.9.0.0
Upgrading from 0.8.1 to 0.8.2
Upgrading from 0.8.0 to 0.8.1
Upgrading from 0.7
2. API
2.1 Producer API
For those interested in the legacy Scala producer api, information can be found here.
2.2 Consumer API
2.2.1 Old High Level Consumer API
2.2.2 Old Simple Consumer API
2.2.3 New Consumer API
3. Configuration
3.1 Broker Configs
3.2 Producer Configs
No attempt will be made to batch records larger than this size.
Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent.
For those interested in the legacy Scala producer configs, information can be found here.
3.3 Consumer Configs
3.3.1 Old Consumer Configs
Generated automatically if not set.
What to do when there is no initial offset in ZooKeeper or if an offset is out of range:
* smallest : automatically reset the offset to the smallest offset
* largest : automatically reset the offset to the largest offset
* anything else: throw exception to the consumer
Select between the «range» or «roundrobin» strategy for assigning partitions to consumer streams.
The round-robin partition assignor lays out all the available partitions and all the available consumer threads. It then proceeds to do a round-robin assignment from partition to consumer thread. If the subscriptions of all consumer instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts will be within a delta of exactly one across all consumer threads.) Round-robin assignment is permitted only if: (a) Every topic has the same number of streams within a consumer instance (b) The set of subscribed topics is identical for every consumer instance within the group.
Range partitioning works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumer threads in lexicographic order. We then divide the number of partitions by the total number of consumer streams (threads) to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition.
3.3.2 New Consumer Configs
3.4 Kafka Connect Configs
4. Design
4.1 Motivation
We designed Kafka to be able to act as a unified platform for handling all the real-time data feeds a large company might have. To do this we had to think through a fairly broad set of use cases.
It would have to have high-throughput to support high volume event streams such as real-time log aggregation.
It would need to deal gracefully with large data backlogs to be able to support periodic data loads from offline systems.
It also meant the system would have to handle low-latency delivery to handle more traditional messaging use-cases.
We wanted to support partitioned, distributed, real-time processing of these feeds to create new, derived feeds. This motivated our partitioning and consumer model.
Finally in cases where the stream is fed into other data systems for serving, we knew the system would have to be able to guarantee fault-tolerance in the presence of machine failures.
Supporting these uses led us to a design with a number of unique elements, more akin to a database log than a traditional messaging system. We will outline some elements of the design in the following sections.
4.2 Persistence
Don’t fear the filesystem!
Kafka relies heavily on the filesystem for storing and caching messages. There is a general perception that «disks are slow» which makes people skeptical that a persistent structure can offer competitive performance. In fact disks are both much slower and much faster than people expect depending on how they are used; and a properly designed disk structure can often be as fast as the network.
The key fact about disk performance is that the throughput of hard drives has been diverging from the latency of a disk seek for the last decade. As a result the performance of linear writes on a JBOD configuration with six 7200rpm SATA RAID-5 array is about 600MB/sec but the performance of random writes is only about 100k/sec—a difference of over 6000X. These linear reads and writes are the most predictable of all usage patterns, and are heavily optimized by the operating system. A modern operating system provides read-ahead and write-behind techniques that prefetch data in large block multiples and group smaller logical writes into large physical writes. A further discussion of this issue can be found in this ACM Queue article; they actually find that sequential disk access can in some cases be faster than random memory access!
To compensate for this performance divergence modern operating systems have become increasingly aggressive in their use of main memory for disk caching. A modern OS will happily divert all free memory to disk caching with little performance penalty when the memory is reclaimed. All disk reads and writes will go through this unified cache. This feature cannot easily be turned off without using direct I/O, so even if a process maintains an in-process cache of the data, this data will likely be duplicated in OS pagecache, effectively storing everything twice.
As a result of these factors using the filesystem and relying on pagecache is superior to maintaining an in-memory cache or other structure—we at least double the available cache by having automatic access to all free memory, and likely double again by storing a compact byte structure rather than individual objects. Doing so will result in a cache of up to 28-30GB on a 32GB machine without GC penalties. Furthermore this cache will stay warm even if the service is restarted, whereas the in-process cache will need to be rebuilt in memory (which for a 10GB cache may take 10 minutes) or else it will need to start with a completely cold cache (which likely means terrible initial performance). This also greatly simplifies the code as all logic for maintaining coherency between the cache and filesystem is now in the OS, which tends to do so more efficiently and more correctly than one-off in-process attempts. If your disk usage favors linear reads then read-ahead is effectively pre-populating this cache with useful data on each disk read.
This suggests a design which is very simple: rather than maintain as much as possible in-memory and flush it all out to the filesystem in a panic when we run out of space, we invert that. All data is immediately written to a persistent log on the filesystem without necessarily flushing to disk. In effect this just means that it is transferred into the kernel’s pagecache.
This style of pagecache-centric design is described in an article on the design of Varnish here (along with a healthy dose of arrogance).
Constant Time Suffices
The persistent data structure used in messaging systems are often a per-consumer queue with an associated BTree or other general-purpose random access data structures to maintain metadata about messages. BTrees are the most versatile data structure available, and make it possible to support a wide variety of transactional and non-transactional semantics in the messaging system. They do come with a fairly high cost, though: Btree operations are O(log N). Normally O(log N) is considered essentially equivalent to constant time, but this is not true for disk operations. Disk seeks come at 10 ms a pop, and each disk can do only one seek at a time so parallelism is limited. Hence even a handful of disk seeks leads to very high overhead. Since storage systems mix very fast cached operations with very slow physical disk operations, the observed performance of tree structures is often superlinear as data increases with fixed cache—i.e. doubling your data makes things much worse then twice as slow.
Intuitively a persistent queue could be built on simple reads and appends to files as is commonly the case with logging solutions. This structure has the advantage that all operations are O(1) and reads do not block writes or each other. This has obvious performance advantages since the performance is completely decoupled from the data size—one server can now take full advantage of a number of cheap, low-rotational speed 1+TB SATA drives. Though they have poor seek performance, these drives have acceptable performance for large reads and writes and come at 1/3 the price and 3x the capacity.
Having access to virtually unlimited disk space without any performance penalty means that we can provide some features not usually found in a messaging system. For example, in Kafka, instead of attempting to deleting messages as soon as they are consumed, we can retain messages for a relative long period (say a week). This leads to a great deal of flexibility for consumers, as we will describe.
4.3 Efficiency
We have put significant effort into efficiency. One of our primary use cases is handling web activity data, which is very high volume: each page view may generate dozens of writes. Furthermore we assume each message published is read by at least one consumer (often many), hence we strive to make consumption as cheap as possible.
We have also found, from experience building and running a number of similar systems, that efficiency is a key to effective multi-tenant operations. If the downstream infrastructure service can easily become a bottleneck due to a small bump in usage by the application, such small changes will often create problems. By being very fast we help ensure that the application will tip-over under load before the infrastructure. This is particularly important when trying to run a centralized service that supports dozens or hundreds of applications on a centralized cluster as changes in usage patterns are a near-daily occurrence.
We discussed disk efficiency in the previous section. Once poor disk access patterns have been eliminated, there are two common causes of inefficiency in this type of system: too many small I/O operations, and excessive byte copying.
The small I/O problem happens both between the client and the server and in the server’s own persistent operations.
To avoid this, our protocol is built around a «message set» abstraction that naturally groups messages together. This allows network requests to group messages together and amortize the overhead of the network roundtrip rather than sending a single message at a time. The server in turn appends chunks of messages to its log in one go, and the consumer fetches large linear chunks at a time.
This simple optimization produces orders of magnitude speed up. Batching leads to larger network packets, larger sequential disk operations, contiguous memory blocks, and so on, all of which allows Kafka to turn a bursty stream of random message writes into linear writes that flow to the consumers.
The other inefficiency is in byte copying. At low message rates this is not an issue, but under load the impact is significant. To avoid this we employ a standardized binary message format that is shared by the producer, the broker, and the consumer (so data chunks can be transferred without modification between them).
The message log maintained by the broker is itself just a directory of files, each populated by a sequence of message sets that have been written to disk in the same format used by the producer and consumer. Maintaining this common format allows optimization of the most important operation: network transfer of persistent log chunks. Modern unix operating systems offer a highly optimized code path for transferring data out of pagecache to a socket; in Linux this is done with the sendfile system call.
This is clearly inefficient, there are four copies and two system calls. Using sendfile, this re-copying is avoided by allowing the OS to send the data from pagecache to the network directly. So in this optimized path, only the final copy to the NIC buffer is needed.
We expect a common use case to be multiple consumers on a topic. Using the zero-copy optimization above, data is copied into pagecache exactly once and reused on each consumption instead of being stored in memory and copied out to kernel space every time it is read. This allows messages to be consumed at a rate that approaches the limit of the network connection.
This combination of pagecache and sendfile means that on a Kafka cluster where the consumers are mostly caught up you will see no read activity on the disks whatsoever as they will be serving data entirely from cache.
For more background on the sendfile and zero-copy support in Java, see this article.
End-to-end Batch Compression
In some cases the bottleneck is actually not CPU or disk but network bandwidth. This is particularly true for a data pipeline that needs to send messages between data centers over a wide-area network. Of course the user can always compress its messages one at a time without any support needed from Kafka, but this can lead to very poor compression ratios as much of the redundancy is due to repetition between messages of the same type (e.g. field names in JSON or user agents in web logs or common string values). Efficient compression requires compressing multiple messages together rather than compressing each message individually.
Kafka supports this by allowing recursive message sets. A batch of messages can be clumped together compressed and sent to the server in this form. This batch of messages will be written in compressed form and will remain compressed in the log and will only be decompressed by the consumer.
Kafka supports GZIP and Snappy compression protocols. More details on compression can be found here.
4.4 The Producer
Load balancing
The producer sends data directly to the broker that is the leader for the partition without any intervening routing tier. To help the producer do this all Kafka nodes can answer a request for metadata about which servers are alive and where the leaders for the partitions of a topic are at any given time to allow the producer to appropriate direct its requests.
The client controls which partition it publishes messages to. This can be done at random, implementing a kind of random load balancing, or it can be done by some semantic partitioning function. We expose the interface for semantic partitioning by allowing the user to specify a key to partition by and using this to hash to a partition (there is also an option to override the partition function if need be). For example if the key chosen was a user id then all data for a given user would be sent to the same partition. This in turn will allow consumers to make locality assumptions about their consumption. This style of partitioning is explicitly designed to allow locality-sensitive processing in consumers.
Asynchronous send
Batching is one of the big drivers of efficiency, and to enable batching the Kafka producer will attempt to accumulate data in memory and to send out larger batches in a single request. The batching can be configured to accumulate no more than a fixed number of messages and to wait no longer than some fixed latency bound (say 64k or 10 ms). This allows the accumulation of more bytes to send, and few larger I/O operations on the servers. This buffering is configurable and gives a mechanism to trade off a small amount of additional latency for better throughput.
Details on configuration and api for the producer can be found elsewhere in the documentation.
4.5 The Consumer
Push vs. pull
An initial question we considered is whether consumers should pull data from brokers or brokers should push data to the consumer. In this respect Kafka follows a more traditional design, shared by most messaging systems, where data is pushed to the broker from the producer and pulled from the broker by the consumer. Some logging-centric systems, such as Scribe and Apache Flume follow a very different push based path where data is pushed downstream. There are pros and cons to both approaches. However a push-based system has difficulty dealing with diverse consumers as the broker controls the rate at which data is transferred. The goal is generally for the consumer to be able to consume at the maximum possible rate; unfortunately in a push system this means the consumer tends to be overwhelmed when its rate of consumption falls below the rate of production (a denial of service attack, in essence). A pull-based system has the nicer property that the consumer simply falls behind and catches up when it can. This can be mitigated with some kind of backoff protocol by which the consumer can indicate it is overwhelmed, but getting the rate of transfer to fully utilize (but never over-utilize) the consumer is trickier than it seems. Previous attempts at building systems in this fashion led us to go with a more traditional pull model.
Another advantage of a pull-based system is that it lends itself to aggressive batching of data sent to the consumer. A push-based system must choose to either send a request immediately or accumulate more data and then send it later without knowledge of whether the downstream consumer will be able to immediately process it. If tuned for low latency this will result in sending a single message at a time only for the transfer to end up being buffered anyway, which is wasteful. A pull-based design fixes this as the consumer always pulls all available messages after its current position in the log (or up to some configurable max size). So one gets optimal batching without introducing unnecessary latency.
The deficiency of a naive pull-based system is that if the broker has no data the consumer may end up polling in a tight loop, effectively busy-waiting for data to arrive. To avoid this we have parameters in our pull request that allow the consumer request to block in a «long poll» waiting until data arrives (and optionally waiting until a given number of bytes is available to ensure large transfer sizes).
You could imagine other possible designs which would be only pull, end-to-end. The producer would locally write to a local log, and brokers would pull from that with consumers pulling from them. A similar type of «store-and-forward» producer is often proposed. This is intriguing but we felt not very suitable for our target use cases which have thousands of producers. Our experience running persistent data systems at scale led us to feel that involving thousands of disks in the system across many applications would not actually make things more reliable and would be a nightmare to operate. And in practice we have found that we can run a pipeline with strong SLAs at large scale without a need for producer persistence.
Consumer Position
Most messaging systems keep metadata about what messages have been consumed on the broker. That is, as a message is handed out to a consumer, the broker either records that fact locally immediately or it may wait for acknowledgement from the consumer. This is a fairly intuitive choice, and indeed for a single machine server it is not clear where else this state could go. Since the data structure used for storage in many messaging systems scale poorly, this is also a pragmatic choice—since the broker knows what is consumed it can immediately delete it, keeping the data size small.
What is perhaps not obvious, is that getting the broker and consumer to come into agreement about what has been consumed is not a trivial problem. If the broker records a message as consumed immediately every time it is handed out over the network, then if the consumer fails to process the message (say because it crashes or the request times out or whatever) that message will be lost. To solve this problem, many messaging systems add an acknowledgement feature which means that messages are only marked as sent not consumed when they are sent; the broker waits for a specific acknowledgement from the consumer to record the message as consumed. This strategy fixes the problem of losing messages, but creates new problems. First of all, if the consumer processes the message but fails before it can send an acknowledgement then the message will be consumed twice. The second problem is around performance, now the broker must keep multiple states about every single message (first to lock it so it is not given out a second time, and then to mark it as permanently consumed so that it can be removed). Tricky problems must be dealt with, like what to do with messages that are sent but never acknowledged.
Kafka handles this differently. Our topic is divided into a set of totally ordered partitions, each of which is consumed by one consumer at any given time. This means that the position of consumer in each partition is just a single integer, the offset of the next message to consume. This makes the state about what has been consumed very small, just one number for each partition. This state can be periodically checkpointed. This makes the equivalent of message acknowledgements very cheap.
There is a side benefit of this decision. A consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a queue, but turns out to be an essential feature for many consumers. For example, if the consumer code has a bug and is discovered after some messages are consumed, the consumer can re-consume those messages once the bug is fixed.
Offline Data Load
In the case of Hadoop we parallelize the data load by splitting the load over individual map tasks, one for each node/topic/partition combination, allowing full parallelism in the loading. Hadoop provides the task management, and tasks which fail can restart without danger of duplicate data—they simply restart from their original position.
4.6 Message Delivery Semantics
Many systems claim to provide «exactly once» delivery semantics, but it is important to read the fine print, most of these claims are misleading (i.e. they don’t translate to the case where consumers or producers can fail, or cases where there are multiple consumer processes, or cases where data written to disk can be lost).
Kafka’s semantics are straight-forward. When publishing a message we have a notion of the message being «committed» to the log. Once a published message is committed it will not be lost as long as one broker that replicates the partition to which this message was written remains «alive». The definition of alive as well as a description of which types of failures we attempt to handle will be described in more detail in the next section. For now let’s assume a perfect, lossless broker and try to understand the guarantees to the producer and consumer. If a producer attempts to publish a message and experiences a network error it cannot be sure if this error happened before or after the message was committed. This is similar to the semantics of inserting into a database table with an autogenerated key.
These are not the strongest possible semantics for publishers. Although we cannot be sure of what happened in the case of a network error, it is possible to allow the producer to generate a sort of «primary key» that makes retrying the produce request idempotent. This feature is not trivial for a replicated system because of course it must work even (or especially) in the case of a server failure. With this feature it would suffice for the producer to retry until it receives acknowledgement of a successfully committed message at which point we would guarantee the message had been published exactly once. We hope to add this in a future Kafka version.
Not all use cases require such strong guarantees. For uses which are latency sensitive we allow the producer to specify the durability level it desires. If the producer specifies that it wants to wait on the message being committed this can take on the order of 10 ms. However the producer can also specify that it wants to perform the send completely asynchronously or that it wants to wait only until the leader (but not necessarily the followers) have the message.
So effectively Kafka guarantees at-least-once delivery by default and allows the user to implement at most once delivery by disabling retries on the producer and committing its offset prior to processing a batch of messages. Exactly-once delivery requires co-operation with the destination storage system but Kafka provides the offset which makes implementing this straight-forward.
4.7 Replication
Kafka replicates the log for each topic’s partitions across a configurable number of servers (you can set this replication factor on a topic-by-topic basis). This allows automatic failover to these replicas when a server in the cluster fails so messages remain available in the presence of failures.
Other messaging systems provide some replication-related features, but, in our (totally biased) opinion, this appears to be a tacked-on thing, not heavily used, and with large downsides: slaves are inactive, throughput is heavily impacted, it requires fiddly manual configuration, etc. Kafka is meant to be used with replication by default—in fact we implement un-replicated topics as replicated topics where the replication factor is one.
The unit of replication is the topic partition. Under non-failure conditions, each partition in Kafka has a single leader and zero or more followers. The total number of replicas including the leader constitute the replication factor. All reads and writes go to the leader of the partition. Typically, there are many more partitions than brokers and the leaders are evenly distributed among brokers. The logs on the followers are identical to the leader’s log—all have the same offsets and messages in the same order (though, of course, at any given time the leader may have a few as-yet unreplicated messages at the end of its log).
Followers consume messages from the leader just as a normal Kafka consumer would and apply them to their own log. Having the followers pull from the leader has the nice property of allowing the follower to naturally batch together log entries they are applying to their log.
In distributed systems terminology we only attempt to handle a «fail/recover» model of failures where nodes suddenly cease working and then later recover (perhaps without knowing that they have died). Kafka does not handle so-called «Byzantine» failures in which nodes produce arbitrary or malicious responses (perhaps due to bugs or foul play).
A message is considered «committed» when all in sync replicas for that partition have applied it to their log. Only committed messages are ever given out to the consumer. This means that the consumer need not worry about potentially seeing a message that could be lost if the leader fails. Producers, on the other hand, have the option of either waiting for the message to be committed or not, depending on their preference for tradeoff between latency and durability. This preference is controlled by the request.required.acks setting that the producer uses.
The guarantee that Kafka offers is that a committed message will not be lost, as long as there is at least one in sync replica alive, at all times.
Kafka will remain available in the presence of node failures after a short fail-over period, but may not remain available in the presence of network partitions.
Replicated Logs: Quorums, ISRs, and State Machines (Oh my!)
Of course if leaders didn’t fail we wouldn’t need followers! When the leader does die we need to choose a new leader from among the followers. But followers themselves may fall behind or crash so we must ensure we choose an up-to-date follower. The fundamental guarantee a log replication algorithm must provide is that if we tell the client a message is committed, and the leader fails, the new leader we elect must also have that message. This yields a tradeoff: if the leader waits for more followers to acknowledge a message before declaring it committed then there will be more potentially electable leaders.
If you choose the number of acknowledgements required and the number of logs that must be compared to elect a leader such that there is guaranteed to be an overlap, then this is called a Quorum.
A common approach to this tradeoff is to use a majority vote for both the commit decision and the leader election. This is not what Kafka does, but let’s explore it anyway to understand the tradeoffs. Let’s say we have 2f+1 replicas. If f+1 replicas must receive a message prior to a commit being declared by the leader, and if we elect a new leader by electing the follower with the most complete log from at least f+1 replicas, then, with no more than f failures, the leader is guaranteed to have all committed messages. This is because among any f+1 replicas, there must be at least one replica that contains all committed messages. That replica’s log will be the most complete and therefore will be selected as the new leader. There are many remaining details that each algorithm must handle (such as precisely defined what makes a log more complete, ensuring log consistency during leader failure or changing the set of servers in the replica set) but we will ignore these for now.
This majority vote approach has a very nice property: the latency is dependent on only the fastest servers. That is, if the replication factor is three, the latency is determined by the faster slave not the slower one.
There are a rich variety of algorithms in this family including ZooKeeper’s Zab, Raft, and Viewstamped Replication. The most similar academic publication we are aware of to Kafka’s actual implementation is PacificA from Microsoft.
The downside of majority vote is that it doesn’t take many failures to leave you with no electable leaders. To tolerate one failure requires three copies of the data, and to tolerate two failures requires five copies of the data. In our experience having only enough redundancy to tolerate a single failure is not enough for a practical system, but doing every write five times, with 5x the disk space requirements and 1/5th the throughput, is not very practical for large volume data problems. This is likely why quorum algorithms more commonly appear for shared cluster configuration such as ZooKeeper but are less common for primary data storage. For example in HDFS the namenode’s high-availability feature is built on a majority-vote-based journal, but this more expensive approach is not used for the data itself.
Kafka takes a slightly different approach to choosing its quorum set. Instead of majority vote, Kafka dynamically maintains a set of in-sync replicas (ISR) that are caught-up to the leader. Only members of this set are eligible for election as leader. A write to a Kafka partition is not considered committed until all in-sync replicas have received the write. This ISR set is persisted to ZooKeeper whenever it changes. Because of this, any replica in the ISR is eligible to be elected leader. This is an important factor for Kafka’s usage model where there are many partitions and ensuring leadership balance is important. With this ISR model and f+1 replicas, a Kafka topic can tolerate f failures without losing committed messages.
For most use cases we hope to handle, we think this tradeoff is a reasonable one. In practice, to tolerate f failures, both the majority vote and the ISR approach will wait for the same number of replicas to acknowledge before committing a message (e.g. to survive one failure a majority quorum needs three replicas and one acknowledgement and the ISR approach requires two replicas and one acknowledgement). The ability to commit without the slowest servers is an advantage of the majority vote approach. However, we think it is ameliorated by allowing the client to choose whether they block on the message commit or not, and the additional throughput and disk space due to the lower required replication factor is worth it.
Another important design distinction is that Kafka does not require that crashed nodes recover with all their data intact. It is not uncommon for replication algorithms in this space to depend on the existence of «stable storage» that cannot be lost in any failure-recovery scenario without potential consistency violations. There are two primary problems with this assumption. First, disk errors are the most common problem we observe in real operation of persistent data systems and they often do not leave data intact. Secondly, even if this were not a problem, we do not want to require the use of fsync on every write for our consistency guarantees as this can reduce performance by two to three orders of magnitude. Our protocol for allowing a replica to rejoin the ISR ensures that before rejoining, it must fully re-sync again even if it lost unflushed data in its crash.
Unclean leader election: What if they all die?
This is a simple tradeoff between availability and consistency. If we wait for replicas in the ISR, then we will remain unavailable as long as those replicas are down. If such replicas were destroyed or their data was lost, then we are permanently down. If, on the other hand, a non-in-sync replica comes back to life and we allow it to become leader, then its log becomes the source of truth even though it is not guaranteed to have every committed message. In our current release we choose the second strategy and favor choosing a potentially inconsistent replica when all replicas in the ISR are dead. In the future, we would like to make this configurable to better support use cases where downtime is preferable to inconsistency.
This dilemma is not specific to Kafka. It exists in any quorum-based scheme. For example in a majority voting scheme, if a majority of servers suffer a permanent failure, then you must either choose to lose 100% of your data or violate consistency by taking what remains on an existing server as your new source of truth.
Availability and Durability Guarantees
Replica Management
It is also important to optimize the leadership election process as that is the critical window of unavailability. A naive implementation of leader election would end up running an election per partition for all partitions a node hosted when that node failed. Instead, we elect one of the brokers as the «controller». This controller detects failures at the broker level and is responsible for changing the leader of all affected partitions in a failed broker. The result is that we are able to batch together many of the required leadership change notifications which makes the election process far cheaper and faster for a large number of partitions. If the controller fails, one of the surviving brokers will become the new controller.
4.8 Log Compaction
So far we have described only the simpler approach to data retention where old log data is discarded after a fixed period of time or when the log reaches some predetermined size. This works well for temporal event data such as logging where each record stands alone. However an important class of data streams are the log of changes to keyed, mutable data (for example, the changes to a database table).
Let’s discuss a concrete example of such a stream. Say we have a topic containing user email addresses; every time a user updates their email address we send a message to this topic using their user id as the primary key. Now say we send the following messages over some time period for a user with id 123, each message corresponding to a change in email address (messages for other ids are omitted): Log compaction gives us a more granular retention mechanism so that we are guaranteed to retain at least the last update for each primary key (e.g. bill@gmail.com ). By doing this we guarantee that the log contains a full snapshot of the final value for every key not just keys that changed recently. This means downstream consumers can restore their own state off this topic without us having to retain a complete log of all changes.
The general idea is quite simple. If we had infinite log retention, and we logged each change in the above cases, then we would have captured the state of the system at each time from when it first began. Using this complete log we could restore to any point in time by replaying the first N records in the log. This hypothetical complete log is not very practical for systems that update a single record many times as the log will grow without bound even for a stable dataset. The simple log retention mechanism which throws away old updates will bound space but the log is no longer a way to restore the current state—now restoring from the beginning of the log no longer recreates the current state as old updates may not be captured at all.
Log compaction is a mechanism to give finer-grained per-record retention, rather than the coarser-grained time-based retention. The idea is to selectively remove records where we have a more recent update with the same primary key. This way the log is guaranteed to have at least the last state for each key.
This retention policy can be set per-topic, so a single cluster can have some topics where retention is enforced by size or time and other topics where retention is enforced by compaction.
This functionality is inspired by one of LinkedIn’s oldest and most successful pieces of infrastructure—a database changelog caching service called Databus. Unlike most log-structured storage systems Kafka is built for subscription and organizes data for fast linear reads and writes. Unlike Databus, Kafka acts a source-of-truth store so it is useful even in situations where the upstream data source would not otherwise be replayable.
Log Compaction Basics
The head of the log is identical to a traditional Kafka log. It has dense, sequential offsets and retains all messages. Log compaction adds an option for handling the tail of the log. The picture above shows a log with a compacted tail. Note that the messages in the tail of the log retain the original offset assigned when they were first written—that never changes. Note also that all offsets remain valid positions in the log, even if the message with that offset has been compacted away; in this case this position is indistinguishable from the next highest offset that does appear in the log. For example, in the picture above the offsets 36, 37, and 38 are all equivalent positions and a read beginning at any of these offsets would return a message set beginning with 38.
Compaction also allows for deletes. A message with a key and a null payload will be treated as a delete from the log. This delete marker will cause any prior message with that key to be removed (as would any new message with that key), but delete markers are special in that they will themselves be cleaned out of the log after a period of time to free up space. The point in time at which deletes are no longer retained is marked as the «delete retention point» in the above diagram.
The compaction is done in the background by periodically recopying log segments. Cleaning does not block reads and can be throttled to use no more than a configurable amount of I/O throughput to avoid impacting producers and consumers. The actual process of compacting a log segment looks something like this:
What guarantees does log compaction provide?
Log Compaction Details
Configuring The Log Cleaner
Further cleaner configurations are described here.
Log Compaction Limitations
4.9 Quotas
Starting in 0.9, the Kafka cluster has the ability to enforce quotas on produce and fetch requests. Quotas are basically byte-rate thresholds defined per client-id. A client-id logically identifies an application making a request. Hence a single client-id can span multiple producer and consumer instances and the quota will apply for all of them as a single entity i.e. if client-id=»test-client» has a produce quota of 10MB/sec, this is shared across all instances with that same id.
Why are quotas necessary?
It is possible for producers and consumers to produce/consume very high volumes of data and thus monopolize broker resources, cause network saturation and generally DOS other clients and the brokers themselves. Having quotas protects against these issues and is all tbe more important in large multi-tenant clusters where a small set of badly behaved clients can degrade user experience for the well behaved ones. In fact, when running Kafka as a service this even makes it possible to enforce API limits according to an agreed upon contract.
Enforcement
By default, each unique client-id receives a fixed quota in bytes/sec as configured by the cluster (quota.producer.default, quota.consumer.default). This quota is defined on a per-broker basis. Each client can publish/fetch a maximum of X bytes/sec per broker before it gets throttled. We decided that defining these quotas per broker is much better than having a fixed cluster wide bandwidth per client because that would require a mechanism to share client quota usage among all the brokers. This can be harder to get right than the quota implementation itself!
How does a broker react when it detects a quota violation? In our solution, the broker does not return an error rather it attempts to slow down a client exceeding its quota. It computes the amount of delay needed to bring a guilty client under it’s quota and delays the response for that time. This approach keeps the quota violation transparent to clients (outside of client side metrics). This also keeps them from having to implement any special backoff and retry behavior which can get tricky. In fact, bad client behavior (retry without backoff) can exacerbate the very problem quotas are trying to solve.
Client byte rate is measured over multiple small windows (for e.g. 30 windows of 1 second each) in order to detect and correct quota violations quickly. Typically, having large measurement windows (for e.g. 10 windows of 30 seconds each) leads to large bursts of traffic followed by long delays which is not great in terms of user experience.
Quota overrides
It is possible to override the default quota for client-ids that need a higher (or even lower) quota. The mechanism is similar to the per-topic log config overrides. Client-id overrides are written to ZooKeeper under /config/clients. These overrides are read by all brokers and are effective immediately. This lets us change quotas without having to do a rolling restart of the entire cluster. See here for details.
5. Implementation
5.1 API Design
Producer APIs
kafka.producer.Producer provides the ability to batch multiple produce requests ( producer.type=async ), before serializing and dispatching them to the appropriate kafka broker partition. The size of the batch can be controlled by a few config parameters. As events enter a queue, they are buffered in a queue, until either queue.time or batch.size is reached. A background thread ( kafka.producer.async.ProducerSendThread ) dequeues the batch of data and lets the kafka.producer.EventHandler serialize and send the data to the appropriate kafka broker partition. A custom event handler can be plugged in through the event.handler config parameter. At various stages of this producer queue pipeline, it is helpful to be able to inject callbacks, either for plugging in custom logging/tracing code or custom monitoring logic. This is possible by implementing the kafka.producer.async.CallbackHandler interface and setting callback.handler config parameter to that class.
The default is the no-op kafka.serializer.DefaultEncoder
Consumer APIs
We have 2 levels of consumer APIs. The low-level «simple» API maintains a connection to a single broker and has a close correspondence to the network requests sent to the server. This API is completely stateless, with the offset being passed in on every request, allowing the user to maintain this metadata however they choose.
The high-level API hides the details of brokers from the consumer and allows consuming off the cluster of machines without concern for the underlying topology. It also maintains the state of what has been consumed. The high-level API also provides the ability to subscribe to topics that match a filter expression (i.e., either a whitelist or a blacklist regular expression).
Low-level API
High-level API
This API is centered around iterators, implemented by the KafkaStream class. Each KafkaStream represents the stream of messages from one or more partitions on one or more servers. Each stream is used for single threaded processing, so the client can provide the number of desired streams in the create call. Thus a stream may represent the merging of multiple server partitions (to correspond to the number of processing threads), but each partition only goes to one stream.
The createMessageStreams call registers the consumer for the topic, which results in rebalancing the consumer/broker assignment. The API encourages creating many topic streams in a single call in order to minimize this rebalancing. The createMessageStreamsByFilter call (additionally) registers watchers to discover new topics that match its filter. Note that each stream that createMessageStreamsByFilter returns may iterate over messages from multiple topics (i.e., if multiple topics are allowed by the filter).
5.2 Network Layer
The network layer is a fairly straight-forward NIO server, and will not be described in great detail. The sendfile implementation is done by giving the MessageSet interface a writeTo method. This allows the file-backed message set to use the more efficient transferTo implementation instead of an in-process buffered write. The threading model is a single acceptor thread and N processor threads which handle a fixed number of connections each. This design has been pretty thoroughly tested elsewhere and found to be simple to implement and fast. The protocol is kept quite simple to allow for future implementation of clients in other languages.
5.3 Messages
5.4 Message Format
5.5 Log
A log for a topic named «my_topic» with two partitions consists of two directories (namely my_topic_0 and my_topic_1 ) populated with data files containing the messages for that topic. The format of the log files is a sequence of «log entries»»; each log entry is a 4 byte integer N storing the message length which is followed by the N message bytes. Each message is uniquely identified by a 64-bit integer offset giving the byte position of the start of this message in the stream of all messages ever sent to that topic on that partition. The on-disk format of each message is given below. Each log file is named with the offset of the first message it contains. So the first file created will be 00000000000.kafka, and each additional file will have an integer name roughly S bytes from the previous file where S is the max log file size given in the configuration.
The exact binary format for messages is versioned and maintained as a standard interface so message sets can be transfered between producer, broker, and client without recopying or conversion when desirable. This format is as follows:
The use of the message offset as the message id is unusual. Our original idea was to use a GUID generated by the producer, and maintain a mapping from GUID to offset on each broker. But since a consumer must maintain an ID for each server, the global uniqueness of the GUID provides no value. Furthermore the complexity of maintaining the mapping from a random id to an offset requires a heavy weight index structure which must be synchronized with disk, essentially requiring a full persistent random-access data structure. Thus to simplify the lookup structure we decided to use a simple per-partition atomic counter which could be coupled with the partition id and node id to uniquely identify a message; this makes the lookup structure simpler, though multiple seeks per consumer request are still likely. However once we settled on a counter, the jump to directly using the offset seemed natural—both after all are monotonically increasing integers unique to a partition. Since the offset is hidden from the consumer API this decision is ultimately an implementation detail and we went with the more efficient approach.
Writes
The log allows serial appends which always go to the last file. This file is rolled over to a fresh file when it reaches a configurable size (say 1GB). The log takes two configuration parameter M which gives the number of messages to write before forcing the OS to flush the file to disk, and S which gives a number of seconds after which a flush is forced. This gives a durability guarantee of losing at most M messages or S seconds of data in the event of a system crash.
Reads
Reads are done by giving the 64-bit logical offset of a message and an S-byte max chunk size. This will return an iterator over the messages contained in the S-byte buffer. S is intended to be larger than any single message, but in the event of an abnormally large message, the read can be retried multiple times, each time doubling the buffer size, until the message is read successfully. A maximum message and buffer size can be specified to make the server reject messages larger than some size, and to give a bound to the client on the maximum it need ever read to get a complete message. It is likely that the read buffer ends with a partial message, this is easily detected by the size delimiting.
The actual process of reading from an offset requires first locating the log segment file in which the data is stored, calculating the file-specific offset from the global offset value, and then reading from that file offset. The search is done as a simple binary search variation against an in-memory range maintained for each file.
The log provides the capability of getting the most recently written message to allow clients to start subscribing as of «right now». This is also useful in the case the consumer fails to consume its data within its SLA-specified number of days. In this case when the client attempts to consume a non-existant offset it is given an OutOfRangeException and can either reset itself or fail as appropriate to the use case.
The following is the format of the results sent to the consumer.
Deletes
Data is deleted one log segment at a time. The log manager allows pluggable delete policies to choose which files are eligible for deletion. The current policy deletes any log with a modification time of more than N days ago, though a policy which retained the last N GB could also be useful. To avoid locking reads while still allowing deletes that modify the segment list we use a copy-on-write style segment list implementation that provides consistent views to allow a binary search to proceed on an immutable static snapshot view of the log segments while deletes are progressing.
Guarantees
The log provides a configuration parameter M which controls the maximum number of messages that are written before forcing a flush to disk. On startup a log recovery process is run that iterates over all messages in the newest log segment and verifies that each message entry is valid. A message entry is valid if the sum of its size and offset are less than the length of the file AND the CRC32 of the message payload matches the CRC stored with the message. In the event corruption is detected the log is truncated to the last valid offset.
Note that two kinds of corruption must be handled: truncation in which an unwritten block is lost due to a crash, and corruption in which a nonsense block is ADDED to the file. The reason for this is that in general the OS makes no guarantee of the write order between the file inode and the actual block data so in addition to losing written data the file can gain nonsense data if the inode is updated with a new size but a crash occurs before the block containing that data is not written. The CRC detects this corner case, and prevents it from corrupting the log (though the unwritten messages are, of course, lost).
5.6 Distribution
Consumer Offset Tracking
The high-level consumer tracks the maximum offset it has consumed in each partition and periodically commits its offset vector so that it can resume from those offsets in the event of a restart. Kafka provides the option to store all the offsets for a given consumer group in a designated broker (for that group) called the offset manager. i.e., any consumer instance in that consumer group should send its offset commits and fetches to that offset manager (broker). The high-level consumer handles this automatically. If you use the simple consumer you will need to manage offsets manually. This is currently unsupported in the Java simple consumer which can only commit or fetch offsets in ZooKeeper. If you use the Scala simple consumer you can discover the offset manager and explicitly commit or fetch offsets to the offset manager. A consumer can look up its offset manager by issuing a GroupCoordinatorRequest to any Kafka broker and reading the GroupCoordinatorResponse which will contain the offset manager. The consumer can then proceed to commit or fetch offsets from the offsets manager broker. In case the offset manager moves, the consumer will need to rediscover the offset manager. If you wish to manage your offsets manually, you can take a look at these code samples that explain how to issue OffsetCommitRequest and OffsetFetchRequest.
When the offset manager receives an OffsetCommitRequest, it appends the request to a special compacted Kafka topic named __consumer_offsets. The offset manager sends a successful offset commit response to the consumer only after all the replicas of the offsets topic receive the offsets. In case the offsets fail to replicate within a configurable timeout, the offset commit will fail and the consumer may retry the commit after backing off. (This is done automatically by the high-level consumer.) The brokers periodically compact the offsets topic since it only needs to maintain the most recent offset commit per partition. The offset manager also caches the offsets in an in-memory table in order to serve offset fetches quickly.
When the offset manager receives an offset fetch request, it simply returns the last committed offset vector from the offsets cache. In case the offset manager was just started or if it just became the offset manager for a new set of consumer groups (by becoming a leader for a partition of the offsets topic), it may need to load the offsets topic partition into the cache. In this case, the offset fetch will fail with an OffsetsLoadInProgress exception and the consumer may retry the OffsetFetchRequest after backing off. (This is done automatically by the high-level consumer.)
Migrating offsets from ZooKeeper to Kafka
ZooKeeper Directories
The following gives the ZooKeeper structures and algorithms used for co-ordination between consumers and brokers.
Notation
Broker Node Registry
This is a list of all present broker nodes, each of which provides a unique logical broker id which identifies it to consumers (which must be given as part of its configuration). On startup, a broker node registers itself by creating a znode with the logical broker id under /brokers/ids. The purpose of the logical broker id is to allow a broker to be moved to a different physical machine without affecting consumers. An attempt to register a broker id that is already in use (say because two servers are configured with the same broker id) is an error.
Since the broker registers itself in ZooKeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available).
Broker Topic Registry
Each broker registers itself under the topics it maintains and stores the number of partitions for that topic.
Consumers and Consumer Groups
Multiple consumers can form a group and jointly consume a single topic. Each consumer in the same group is given a shared group_id. For example if one consumer is your foobar process, which is run across three machines, then you might assign this group of consumers the id «foobar». This group id is provided in the configuration of the consumer, and is your way to tell the consumer which group it belongs to.
The consumers in a group divide up the partitions as fairly as possible, each partition is consumed by exactly one consumer in a consumer group.
Consumer Id Registry
Consumer Offsets
Partition Owner registry
Each broker partition is consumed by a single consumer within a given consumer group. The consumer must establish its ownership of a given partition before any consumption can begin. To establish its ownership, a consumer writes its own id in an ephemeral node under the particular broker partition it is claiming.
Broker node registration
The broker nodes are basically independent, so they only publish information about what they have. When a broker joins, it registers itself under the broker node registry directory and writes information about its host name and port. The broker also register the list of existing topics and their logical partitions in the broker topic registry. New topics are registered dynamically when they are created on the broker.
Consumer registration algorithm
Consumer rebalancing algorithm
The consumer rebalancing algorithms allows all the consumers in a group to come into consensus on which consumer is consuming which partitions. Consumer rebalancing is triggered on each addition or removal of both broker nodes and other consumers within the same group. For a given topic and a given consumer group, broker partitions are divided evenly among consumers within the group. A partition is always consumed by a single consumer. This design simplifies the implementation. Had we allowed a partition to be concurrently consumed by multiple consumers, there would be contention on the partition and some kind of locking would be required. If there are more consumers than partitions, some consumers won’t get any data at all. During rebalancing, we try to assign partitions to consumers in such a way that reduces the number of broker nodes each consumer has to connect to.
Each consumer does the following during rebalancing:
When rebalancing is triggered at one consumer, rebalancing should be triggered in other consumers within the same group about the same time.
6. Operations
6.1 Basic Kafka Operations
Adding and removing topics
Topics are added and modified using the topic tool: The replication factor controls how many servers will replicate each message that is written. If you have a replication factor of 3 then up to 2 servers can fail before you will lose access to your data. We recommend you use a replication factor of 2 or 3 so that you can transparently bounce machines without interrupting data consumption.
The partition count controls how many logs the topic will be sharded into. There are several impacts of the partition count. First each partition must fit entirely on a single server. So if you have 20 partitions the full data set (and read and write load) will be handled by no more than 20 servers (no counting replicas). Finally the partition count impacts the maximum parallelism of your consumers. This is discussed in greater detail in the concepts section.
The configurations added on the command line override the default settings the server has for things like the length of time data should be retained. The complete set of per-topic configurations is documented here.
Modifying topics
To add partitions you can do Be aware that one use case for partitions is to semantically partition data, and adding partitions doesn’t change the partitioning of existing data so this may disturb consumers if they rely on that partition. That is if data is partitioned by hash(key) % number_of_partitions then this partitioning will potentially be shuffled by adding partitions but Kafka will not attempt to automatically redistribute data in any way.
To add configs: To remove a config: And finally deleting a topic: Topic deletion option is disabled by default. To enable it set the server config
Kafka does not currently support reducing the number of partitions for a topic.
Instructions for changing the replication factor of a topic can be found here.
Graceful shutdown
Balancing leadership
To avoid this imbalance, Kafka has a notion of preferred replicas. If the list of replicas for a partition is 1,5,9 then node 1 is preferred as the leader to either node 5 or 9 because it is earlier in the replica list. You can have the Kafka cluster try to restore leadership to the restored replicas by running the command: Since running this command can be tedious you can also configure Kafka to do this automatically by setting the following configuration:
Mirroring data between clusters
A common use case for this kind of mirroring is to provide a replica in another datacenter. This scenario will be discussed in more detail in the next section.
You can run many such mirroring processes to increase throughput and for fault-tolerance (if one process dies, the others will take overs the additional load).
Data will be read from topics in the source cluster and written to a topic with the same name in the destination cluster. In fact the mirror maker is little more than a Kafka consumer and producer hooked together.
The source and destination clusters are completely independent entities: they can have different numbers of partitions and the offsets will not be the same. For this reason the mirror cluster is not really intended as a fault-tolerance mechanism (as the consumer position will be different); for that we recommend using normal in-cluster replication. The mirror maker process will, however, retain and use the message key for partitioning so order is preserved on a per-key basis.
Combining mirroring with the configuration auto.create.topics.enable=true makes it possible to have a replica cluster that will automatically create and replicate all data in a source cluster even as new topics are added.
Checking consumer position
Expanding your cluster
The process of migrating data is manually initiated but fully automated. Under the covers what happens is that Kafka will add the new server as a follower of the partition it is migrating and allow it to fully replicate the existing data in that partition. When the new server has fully replicated the contents of this partition and joined the in-sync replica one of the existing replicas will delete their partition’s data.
The partition reassignment tool can be used to move partitions across brokers. An ideal partition distribution would ensure even data load and partition sizes across all brokers. The partition reassignment tool does not have the capability to automatically study the data distribution in a Kafka cluster and move partitions around to attain an even load distribution. As such, the admin has to figure out which topics or partitions should be moved around.
Automatically migrating data to new machines
For instance, the following example will move all partitions for topics foo1,foo2 to the new set of brokers 5,6. At the end of this move, all partitions for topics foo1 and foo2 will only exist on brokers 5,6
Since, the tool accepts the input list of topics as a json file, you first need to identify the topics you want to move and create the json file as follows- Once the json file is ready, use the partition reassignment tool to generate a candidate assignment-
Custom partition assignment and migration
For instance, the following example moves partition 0 of topic foo1 to brokers 5,6 and partition 1 of topic foo2 to brokers 2,3
Decommissioning brokers
Increasing replication factor
For instance, the following example increases the replication factor of partition 0 of topic foo from 1 to 3. Before increasing the replication factor, the partition’s only replica existed on broker 5. As part of increasing the replication factor, we will add more replicas on brokers 6 and 7.
Setting quotas
6.2 Datacenters
This deployment pattern allows datacenters to act as independent entities and allows us to manage and tune inter-datacenter replication centrally. This allows each facility to stand alone and operate even if the inter-datacenter links are unavailable: when this occurs the mirroring falls behind until the link is restored at which time it catches up.
For applications that need a global view of all data you can use mirroring to provide clusters which have aggregate data mirrored from the local clusters in all datacenters. These aggregate clusters are used for reads by applications that require the full data set.
This is not the only possible deployment pattern. It is possible to read from or write to a remote Kafka cluster over the WAN, though obviously this will add whatever latency is required to get the cluster.
Kafka naturally batches data in both the producer and consumer so it can achieve high-throughput even over a high-latency connection. To allow this though it may be necessary to increase the TCP socket buffer sizes for the producer, consumer, and broker using the socket.send.buffer.bytes and socket.receive.buffer.bytes configurations. The appropriate way to set this is documented here.
It is generally not advisable to run a single Kafka cluster that spans multiple datacenters over a high-latency link. This will incur very high replication latency both for Kafka writes and ZooKeeper writes, and neither Kafka nor ZooKeeper will remain available in all locations if the network between locations is unavailable.
6.3 Kafka Configuration
Important Client Configurations
All configurations are documented in the configuration section.
A Production Server Config
Java Version
6.4 Hardware and OS
You need sufficient memory to buffer active readers and writers. You can do a back-of-the-envelope estimate of memory needs by assuming you want to be able to buffer for 30 seconds and compute your memory need as write_throughput*30.
The disk throughput is important. We have 8×7200 rpm SATA drives. In general disk throughput is the performance bottleneck, and more disks is more better. Depending on how you configure flush behavior you may or may not benefit from more expensive disks (if you force flush often then higher RPM SAS drives may be better).
We have seen a few issues running on Windows and Windows is not currently a well supported platform though we would be happy to change that.
You likely don’t need to do much OS-level tuning though there are a few things that will help performance.
Disks and Filesystem
If you configure multiple data directories partitions will be assigned round-robin to data directories. Each partition will be entirely in one of the data directories. If data is not well balanced among partitions this can lead to load imbalance between disks.
RAID can potentially do better at balancing load between disks (although it doesn’t always seem to) because it balances load at a lower level. The primary downside of RAID is that it is usually a big performance hit for write throughput and reduces the available disk space.
Another potential benefit of RAID is the ability to tolerate disk failures. However our experience has been that rebuilding the RAID array is so I/O intensive that it effectively disables the server, so this does not provide much real availability improvement.
Application vs. OS Flush Management
Kafka must eventually call fsync to know that data was flushed. When recovering from a crash for any log segment not known to be fsync’d Kafka will check the integrity of each message by checking its CRC and also rebuild the accompanying offset index file as part of the recovery process executed on startup.
Note that durability in Kafka does not require syncing data to disk, as a failed node will always recover from its replicas.
We recommend using the default flush settings which disable application fsync entirely. This means relying on the background flush done by the OS and Kafka’s own background flush. This provides the best of all worlds for most uses: no knobs to tune, great throughput and latency, and full recovery guarantees. We generally feel that the guarantees provided by replication are stronger than sync to local disk, however the paranoid still may prefer having both and application level fsync policies are still supported.
The drawback of using application level flush settings are that this is less efficient in it’s disk usage pattern (it gives the OS less leeway to re-order writes) and it can introduce latency as fsync in most Linux filesystems blocks writes to the file whereas the background flushing does much more granular page-level locking.
In general you don’t need to do any low-level tuning of the filesystem, but in the next few sections we will go over some of this in case it is useful.
Understanding Linux OS Flush Behavior
Pdflush has a configurable policy that controls how much dirty data can be maintained in cache and for how long before it must be written back to disk. This policy is described here. When Pdflush cannot keep up with the rate of data being written it will eventually cause the writing process to block incurring latency in the writes to slow down the accumulation of data.
You can see the current state of OS memory usage by doing The meaning of these values are described in the link above.
Ext4 Notes
6.6 Monitoring
The easiest way to see the available metrics to fire up jconsole and point it at a running kafka client or server; this will all browsing all metrics with JMX.
We pay particular we do graphing and alerting on the following metrics:
Description | Mbean name | Normal value |
---|---|---|
Message in rate | kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec | |
Byte in rate | kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec | |
Request rate | kafka.network:type=RequestMetrics,name=RequestsPerSec,request= | |
Byte out rate | kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec | |
Log flush rate and time | kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs | |
# of under replicated partitions (|ISR| < |all replicas|) | kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions | 0 |
Is controller active on broker | kafka.controller:type=KafkaController,name=ActiveControllerCount | only one broker in the cluster should have 1 |
Leader election rate | kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs | non-zero when there are broker failures |
Unclean leader election rate | kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec | 0 |
Partition counts | kafka.server:type=ReplicaManager,name=PartitionCount | mostly even across brokers |
Leader replica counts | kafka.server:type=ReplicaManager,name=LeaderCount | mostly even across brokers |
ISR shrink rate | kafka.server:type=ReplicaManager,name=IsrShrinksPerSec | If a broker goes down, ISR for some of the partitions will shrink. When that broker is up again, ISR will be expanded once the replicas are fully caught up. Other than that, the expected value for both ISR shrink rate and expansion rate is 0. |
ISR expansion rate | kafka.server:type=ReplicaManager,name=IsrExpandsPerSec | See above |
Max lag in messages btw follower and leader replicas | kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica | lag should be proportional to the maximum batch size of a produce request. |
Lag in messages per follower replica | kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=(2+) | lag should be proportional to the maximum batch size of a produce request. |
Requests waiting in the producer purgatory | kafka.server:type=ProducerRequestPurgatory,name=PurgatorySize | non-zero if ack=-1 is used |
Requests waiting in the fetch purgatory | kafka.server:type=FetchRequestPurgatory,name=PurgatorySize | size depends on fetch.wait.max.ms in the consumer |
Request total time | kafka.network:type=RequestMetrics,name=TotalTimeMs,request= | broken into queue, local, remote and response send time |
Time the request waiting in the request queue | kafka.network:type=RequestMetrics,name=QueueTimeMs,request= | |
Time the request being processed at the leader | kafka.network:type=RequestMetrics,name=LocalTimeMs,request= | |
Time the request waits for the follower | kafka.network:type=RequestMetrics,name=RemoteTimeMs,request= | non-zero for produce requests when ack=-1 |
Time to send the response | kafka.network:type=RequestMetrics,name=ResponseSendTimeMs,request= | |
Number of messages the consumer lags behind the producer by | kafka.consumer:type=ConsumerFetcherManager,name=MaxLag,clientId=([-.\w]+) | |
The average fraction of time the network processors are idle | kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent | between 0 and 1, ideally > 0.3 |
The average fraction of time the request handler threads are idle | kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent | between 0 and 1, ideally > 0.3 |
Quota metrics per client-id | kafka.server:type= | Two attributes. throttle-time indicates the amount of time in ms the client-id was throttled. Ideally = 0. byte-rate indicates the data produce/consume rate of the client in bytes/sec. |
New producer monitoring
The following metrics are available on new producer instances.
Metric/Attribute name | Description | Mbean name |
---|---|---|
waiting-threads | The number of user threads blocked waiting for buffer memory to enqueue their records | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
buffer-total-bytes | The maximum amount of buffer memory the client can use (whether or not it is currently used). | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
buffer-available-bytes | The total amount of buffer memory that is not being used (either unallocated or in the free list). | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
bufferpool-wait-time | The fraction of time an appender waits for space allocation. | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
batch-size-avg | The average number of bytes sent per partition per-request. | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
batch-size-max | The max number of bytes sent per partition per-request. | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
compression-rate-avg | The average compression rate of record batches. | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
record-queue-time-avg | The average time in ms record batches spent in the record accumulator. | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
record-queue-time-max | The maximum time in ms record batches spent in the record accumulator | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
request-latency-avg | The average request latency in ms | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
request-latency-max | The maximum request latency in ms | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
record-send-rate | The average number of records sent per second. | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
records-per-request-avg | The average number of records per request. | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
record-retry-rate | The average per-second number of retried record sends | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
record-error-rate | The average per-second number of record sends that resulted in errors | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
record-size-max | The maximum record size | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
record-size-avg | The average record size | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
requests-in-flight | The current number of in-flight requests awaiting a response. | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
metadata-age | The age in seconds of the current producer metadata being used. | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
connection-close-rate | Connections closed per second in the window. | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
connection-creation-rate | New connections established per second in the window. | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
network-io-rate | The average number of network operations (reads or writes) on all connections per second. | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
outgoing-byte-rate | The average number of outgoing bytes sent per second to all servers. | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
request-rate | The average number of requests sent per second. | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
request-size-avg | The average size of all requests in the window. | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
request-size-max | The maximum size of any request sent in the window. | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
incoming-byte-rate | Bytes/second read off all sockets | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
response-rate | Responses received sent per second. | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
select-rate | Number of times the I/O layer checked for new I/O to perform per second | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
io-wait-time-ns-avg | The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds. | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
io-wait-ratio | The fraction of time the I/O thread spent waiting. | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
io-time-ns-avg | The average length of time for I/O per select call in nanoseconds. | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
io-ratio | The fraction of time the I/O thread spent doing I/O | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
connection-count | The current number of active connections. | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
outgoing-byte-rate | The average number of outgoing bytes sent per second for a node. | kafka.producer:type=producer-node-metrics,client-id=([-.\w]+),node-id=(5+) |
request-rate | The average number of requests sent per second for a node. | kafka.producer:type=producer-node-metrics,client-id=([-.\w]+),node-id=(7+) |
request-size-avg | The average size of all requests in the window for a node. | kafka.producer:type=producer-node-metrics,client-id=([-.\w]+),node-id=(8+) |
request-size-max | The maximum size of any request sent in the window for a node. | kafka.producer:type=producer-node-metrics,client-id=([-.\w]+),node-id=(4+) |
incoming-byte-rate | The average number of responses received per second for a node. | kafka.producer:type=producer-node-metrics,client-id=([-.\w]+),node-id=(7+) |
request-latency-avg | The average request latency in ms for a node. | kafka.producer:type=producer-node-metrics,client-id=([-.\w]+),node-id=(1+) |
request-latency-max | The maximum request latency in ms for a node. | kafka.producer:type=producer-node-metrics,client-id=([-.\w]+),node-id=(1+) |
response-rate | Responses received sent per second for a node. | kafka.producer:type=producer-node-metrics,client-id=([-.\w]+),node-id=(3+) |
record-send-rate | The average number of records sent per second for a topic. | kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+) |
byte-rate | The average number of bytes sent per second for a topic. | kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+) |
compression-rate | The average compression rate of record batches for a topic. | kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+) |
record-retry-rate | The average per-second number of retried record sends for a topic | kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+) |
record-error-rate | The average per-second number of record sends that resulted in errors for a topic. | kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+) |
produce-throttle-time-max | The maximum time in ms a request was throttled by a broker. | kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+) |
produce-throttle-time-avg | The average time in ms a request was throttled by a broker. | kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+) |
We recommend monitor GC time and other stats and various server stats such as CPU utilization, I/O service time, etc. On the client side, we recommend monitor the message/byte rate (global and per topic), request rate/size/time, and on the consumer side, max lag in messages among all partitions and min fetch request rate. For a consumer to keep up, max lag needs to be less than a threshold and min fetch rate needs to be larger than 0.
Audit
6.7 ZooKeeper
Stable version
Operationalizing ZooKeeper
7. Security
7.1 Security Overview
7.2 Encryption and Authentication using SSL
Generate SSL key and certificate for each Kafka broker
Creating your own CA
Therefore, it is important to prevent forged certificates by signing them for each machine in the cluster. A certificate authority (CA) is responsible for signing certificates. CA works likes a government that issues passports—the government stamps (signs) each passport so that the passport becomes difficult to forge. Other governments verify the stamps to ensure the passport is authentic. Similarly, the CA signs the certificates, and the cryptography guarantees that a signed certificate is computationally difficult to forge. Thus, as long as the CA is a genuine and trusted authority, the clients have high assurance that they are connecting to the authentic machines. The generated CA is simply a public-private key pair and certificate, and it is intended to sign other certificates.
The next step is to add the generated CA to the **clients’ truststore** so that the clients can trust this CA: Note: If you configure the Kafka brokers to require client authentication by setting ssl.client.auth to be «requested» or «required» on the Kafka brokers config then you must provide a truststore for the Kafka brokers as well and it should have all the CA certificates that clients keys were signed by. In contrast to the keystore in step 1 that stores each machine’s own identity, the truststore of a client stores all the certificates that the client should trust. Importing a certificate into one’s truststore also means trusting all certificates that are signed by that certificate. As the analogy above, trusting the government (CA) also means trusting all passports (certificates) that it has issued. This attribute is called the chain of trust, and it is particularly useful when deploying SSL on a large Kafka cluster. You can sign all certificates in the cluster with a single CA, and have all machines share the same truststore that trusts the CA. That way all machines can authenticate all other machines.
Signing the certificate
Configuring Kafka Brokers
Due to import regulations in some countries, the Oracle implementation limits the strength of cryptographic algorithms available by default. If stronger algorithms are needed (for example, AES with 256-bit keys), the JCE Unlimited Strength Jurisdiction Policy Files must be obtained and installed in the JDK/JRE. See the JCA Providers Documentation for more information.
Once you start the broker you should be able to see in the server.log To check quickly if the server keystore and truststore are setup properly you can run the following command (Note: TLSv1 should be listed under ssl.enabled.protocols)
In the output of this command you should see server’s certificate: If the certificate does not show up or if there are any other error messages than your keystore is not setup properly.
Configuring Kafka Clients
7.3 Authentication using SASL
Prerequisites
Configuring Kafka Brokers
Configuring Kafka Clients
Incorporating Security Features in a Running Cluster
The specific steps for configuring SSL and SASL are described in sections 7.2 and 7.3. Follow these steps to enable security for your desired protocol(s).
The security implementation lets you configure different protocols for both broker-client and broker-broker communication. These must be enabled in separate bounces. A PLAINTEXT port must be left open throughout so brokers and/or clients can continue to communicate.
When performing an incremental bounce stop the brokers cleanly via a SIGTERM. It’s also good practice to wait for restarted replicas to return to the ISR list before moving onto the next node.
As an example, say we wish to encrypt both broker-client and broker-broker communication with SSL. In the first incremental bounce, a SSL port is opened on each node: We then restart the clients, changing their config to point at the newly opened, secured port: In the second incremental server bounce we instruct Kafka to use SSL as the broker-broker protocol (which will use the same SSL port): In the final bounce we secure the cluster by closing the PLAINTEXT port: Alternatively we might choose to open multiple ports so that different protocols can be used for broker-broker and broker-client communication. Say we wished to use SSL encryption throughout (i.e. for broker-broker and broker-client communication) but we’d like to add SASL authentication to the broker-client connection also. We would achieve this by opening two additional ports during the first bounce: We would then restart the clients, changing their config to point at the newly opened, SASL & SSL secured port: The second server bounce would switch the cluster to use encrypted broker-broker communication via the SSL port we previously opened on port 9092: The final bounce secures the cluster by closing the PLAINTEXT port. ZooKeeper can be secured independently of the Kafka cluster. The steps for doing this are covered in section 7.5.2.
7.4 Authorization and ACLs
Command Line Interface
Examples
7.5 ZooKeeper Authentication
7.5.1 New clusters
7.5.2 Migrating clusters
It is also possible to turn off authentication in a secure cluster. To do it, follow these steps:
Run this to see the full list of parameters:
Источники информации:
- http://sagarkudu.medium.com/how-to-publish-and-subscribe-messages-with-key-using-kafka-414321e9f241
- http://mydeveloperplanet.com/2019/09/25/kafka-messaging-explored/
- http://www.onlinetutorialspoint.com/spring-boot/sending-spring-boot-kafka-json-message-to-kafka-topic.html
- http://kafka.apache.org/090/documentation.html