How to clean kafka topic
How to clean kafka topic
How to delete records from a Kafka topic
Every now and then I get a request from my colleagues who would like to delete some or all the records from a Kafka topic. The request usually comes after someone has produced the wrong data in a test topic while playing around or due to a bug in the producer code. Or simply because they want a clean slate.
Whatever the reason, today I’ll show you a few ways to delete some or all the records from a Kafka topic.
It should go without saying that you should use your best judgment and check (at least) twice before using the methods described below in a production environment.
Kafka-delete-records
This command is available as part of Kafka CLI tools. It requires two parameters:
The command allows you to delete all the records from the beginning of a partition, until the specified offset.
NOTE: It is not possible to delete records in the middle of the topic.
The JSON file specifies one or more partitions from which we want to remove the records. Let’s create delete-records.json file as below:
Here we’ve specified that for the partition 0 of the topic “my-topic” we want to delete all the records from the beginning until offset 3.
Now we’re ready to delete records. Execute:
After the command finishes the start offset for the partition 0 will be 3.
Deleting all the records in a topic
NOTE: This will not work for compacted topics
If you want to prune all the messages, another option is to reduce the retention of the topic to a small value (e.g. 100ms), wait for the brokers to remove all the records from the topic and then set the topic retention to its original value. Here’s how to do it.
First, set the retention.ms to 100 milliseconds.
Then, wait for the brokers to remove the messages with expired retention (that is, all of them). To know if the process is finished, check whether the start offset and end offset are the same. This means there are no more records available on the topic. Depending on your setup, it might take few minutes for Kafka to clean up the topic, so keep checking the start offset.
Once the topic has been purged, return the retention.ms to its original value:
Delete a topic and create it again
Not as elegant as the previous two approaches, yet it might be an easier solution in some cases (e.g. if topic creation is scripted).
Then create it again:
Few things to be aware of when using this approach
Make sure the deletion of topics is enabled in your cluster. Set delete.topic.enable=true. From Kafka 1.0.0 this property is true by default.
Make sure all consumers have stopped consuming the data from the topic you want to delete. Otherwise, they will throw errors like:
One more thing that might happen if you have consumers up and running is that the topic will get auto-created if the cluster-wide property auto.create.topics.enable is true (and by default it is). Not bad per se, but it will use a default number of partitions (1) and a replication factor (1), which might not be what you wanted.
Moral of the story is – make sure to stop your consumers before using this approach 🙂
Would you like to learn more about Kafka?
I have created a Kafka mini-course that you can get absolutely free. Sign up for it over at Coding Harbour.
Kafka Delete Topic and its messages
Kafka Delete Topic – Every message Apache Kafka receives stores it in log and by default, it keeps the messages for 168 hrs which is 7 days. To delete the topic or all its messages can be done in several ways and the rest of the article explains these.
Before you try the below examples, make sure you have at least one Kafka topic created and have some messages in it. And when you run these commands on production be extra cautious as these are permanent and you can’t retrieve the messages back.
1. Delete all messages from the topic
Apache Kafka distribution comes with bin/kafka-configs.sh script which provides many useful options to modify Kafka configuration. Among different options, we will use “–alter” and “–add-config retention.ms” options to temporarily change the retention policy to 1 sec which will delete all messages from a topic.
First, let’s run below kafka-configs.sh command to get the retention value.
Verify if the retention policy value changed by running the below command.
Wait for a few seconds, It should have deleted all your old messages from the Kafka topic. Now remove the retention policy using «—delete-config» config which will set it back to default.
Note: This is a very important step, make sure you run this without fail. Otherwise, you would be losing all future messages.
2. Delete Kafka topic and re-create
In the recent versions of Apache’s Kafka, deleting a topic is easy. You just need to set one property in the configuration to ‘true’, and issue a command to delete a topic.
Before we remove an existing topic, first get the partition and replica of the existing topic as you would need these to re-create with the same configuration. You can get this information by running “ kafka-topics.sh “ script with option “–describe” on topic “text_topic”
Topic “text_topic” has 1 replication factor and 1 partition.
a. Remove Kafka topic
First, log in to the server where Kafka runs and go to your config directory which is located under Kafka home/installation “$KAFKA_HOME/config/”. Here, you’ll see a server.properties file, open the properties file in your favorite text editor, add “delete.topic.enable=true” line, or change the value of the property to true if the property already present:
Now, got to bin directory and run the kafka-topics.sh command with “–delete” option to remove topic “text_topic”
This removes text_topic from all Kafka brokers partitions.
b. Re-create Kafka topic
Run “bin/kafka-topics.sh” command to re-create the topic with replication and partition details you got it from earlier command. Refer Creating Kafka Topic article for more detail descriptions with examples.
This creates Kafka topic text_topic with replication-factor 1 and partitions 1
Is there a way to delete all the data from a topic or delete the topic before every run?
Is there a way to delete all the data from a topic or delete the topic before every run?
Can I modify the KafkaConfig.scala file to change the logRetentionHours property? Is there a way the messages gets deleted as soon as the consumer reads it?
I am using producers to fetch the data from somewhere and sending the data to a particular topic where a consumer consumes, can I delete all the data from that topic on every run? I want only new data every time in the topic. Is there a way to reinitialize the topic somehow?
16 Answers 16
Trending sort
Trending sort is based off of the default sorting method — by highest score — but it boosts votes that have happened recently, helping to surface more up-to-date answers.
It falls back to sorting by highest score if no posts are trending.
Switch to Trending sort
Tested in Kafka 0.8.2, for the quick-start example: First, Add one line to server.properties file under config folder:
then, you can run this command:
Don’t think it is supported yet. Take a look at this JIRA issue «Add delete topic support».
To delete manually:
For any given topic what you can do is
This is NOT a good and recommended approach but it should work. In the Kafka broker config file the log.retention.hours.per.topic attribute is used to define The number of hours to keep a log file before deleting it for some specific topic
Also, is there a way the messages gets deleted as soon as the consumer reads it?
The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time. For example if the log retention is set to two days, then for the two days after a message is published it is available for consumption, after which it will be discarded to free up space. Kafka’s performance is effectively constant with respect to data size so retaining lots of data is not a problem.
In fact the only metadata retained on a per-consumer basis is the position of the consumer in in the log, called the «offset». This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads messages, but in fact the position is controlled by the consumer and it can consume messages in any order it likes. For example a consumer can reset to an older offset to reprocess.
For finding the start offset to read in Kafka 0.8 Simple Consumer example they say
Kafka includes two constants to help, kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data in the logs and starts streaming from there, kafka.api.OffsetRequest.LatestTime() will only stream new messages.
You can also find the example code there for managing the offset at your consumer end.
A Short Guide to Removing Messages from a Kafka Topic
Apache Kafka is a powerful data streaming platform that has become an important part of the data fabric in many companies. While it has many features that make it an important component in reactive architectures, one of the most powerful is its ability to keep a persistent log of messages even after they have been processed by downstream consumers.
In this article, we’ll look at two strategies:
Option 1: Message Expiry
The intended way of removing data from Kafka is to use one of the several configurable options for message expiry. Expiry conditions are controlled by configuration parameters, and that can be based on how old messages are (sometimes called time to live or TTL) or the size of the topic. The performance of Kafka is not affected by the data size of messages, so retaining lots of data is not a problem.
Here’s how the lifecycle of a message works when expiry conditions have been enabled: a message is sent to a Kafka cluster by a producer and appended to the end of a topic, consumers process the topic and read the message, the message stays in the topic until the expiration conditions met, after which it is removed.
Expiry conditions apply to all messages within a given topic and can be set when the topic is first created or modified later for topics that already exist. There are three time-based configuration parameters that can be used. Higher-precision values such as ms take precedence over lower precision values such as hours .
Viewing and Modifying Expiry Configuration Values
Example: Create a Topic and Set Expiration Options
In this example, we’ll show how to create a new topic, view the default options, and modify the retention configuration.
Manually delete Apache Kafka topics
Using Zookeeper CLI
In the last few versions of Apache’s Kafka, deleting a topic is fairly easy. You just need to set one property in the configuration to ‘true’, and just issue a command to delete a topic. It’ll be deleted in no time. But sometimes, for several reasons unknown to mere mortals such as ourselves, the deletion of a topic doesn’t happen automatically. If this is happening to you, don’t sweat just yet; there’s another easy way to delete a topic.
First, let’s see how to configure Kafka to delete a topic with just a command. ‘cd’ into your Kafka installation directory, then into the ‘config’ directory. Here, you’ll find a server.properties file (the file name could be different if you’ve renamed your copy). Open the properties file in your favorite text editor, for me it’s Vim. Add the following line, or change the value of the property to true:
Now go to the ‘bin’ directory, where you’ll find a file named ‘kafka-topics.sh.’ This is the file we’ll be using to delete a topic. The command to delete a topic is this:
Manually delete a topic with Zookeeper
After you issue the delete command, the topic will be “marked for deletion,’ and you’ll have to wait till it gets deleted. Sometimes, it doesn’t happen. When you face this problem, you can use Zookeeper to delete a topic. First, log into the Zookeeper CLI console using the proper ‘zkCli.sh’ file, you’ll find this in the ‘bin’ directory in your Zookeeper installation.
Once you’re in, make sure the topic is not deleted by issuing the following command:
If you don’t get an error, it means that topic is not yet deleted. Now run the following two commands to delete the topic completely from the system:
Now if you try to ‘get’ the topic using the previous command, it’ll throw an error. And that’s it, you’re done. The topic is now deleted.
Follow me on Twitter for more Data Science, Machine Learning, and general tech updates. Also, you can follow my personal blog as I post a lot of my tutorials, how-to posts, and machine learning goodness there before Medium.
If you like my posts here on Medium or on my personal blog, and would wish for me to continue doing this work, consider supporting me on Patreon.
Источники информации:
- http://sparkbyexamples.com/kafka/kafka-delete-topic/
- http://stackoverflow.com/questions/17730905/is-there-a-way-to-delete-all-the-data-from-a-topic-or-delete-the-topic-before-ev
- http://www.oak-tree.tech/blog/kafka-admin-remove-messages
- http://contactsunny.medium.com/manually-delete-apache-kafka-topics-424c7e016ff3