22 minute read · June 2, 2020
Using Kafka as a Temporary Data Store in The Data Lake
· Solutions Architect, Dremio
Introduction
Apache Kafka is a streaming platform that allows for the creation of real-time data processing pipelines and streaming applications. Kafka is an excellent tool for a range of use cases. If you are interested in examples of how Kafka can be used for a web application’s metrics collection, read our article Using Kafka for web application metrics collection article. We also have a tutorial demonstrating how Kafka can be used for creating a real-time prediction system from machine learning models: Forecasting air quality with Dremio, Python and Kafka.
Kafka is a powerful technique in a data engineer’s toolkit. When you know how and where Kafka can be used, you can improve the quality of data pipelines and process data more efficiently. In this article, we will look at an example of how Kafka can be applied for more unusual cases such as storing data in Amazon S3 and preventing data loss. As you may know, fault-tolerance and durability in data storing are among the crucial requirements for most data engineering projects. So, it is important to know how to use Kafka in a way that meets those needs.
Here we will use Python as the programming language. To interact with Kafka from Python, you also need to have a special package. We will use the kafka-python library. To install it, you can use the following command:
pip install kafka-python
Kafka as a datastore
Kafka can be used for storing data. You may be wondering whether Kafka is a relational or NoSQL database. The answer is that it is neither one nor the other.
Kafka, as an event streaming platform, works with streaming data. At the same time, Kafka can store data for some time before removing it. This means that Kafka is different from traditional message queues that drop messages as soon as they are read by the consumer. The period during which the data is stored by Kafka is called retention. Theoretically, you can set this period to forever. Kafka also can store data on persistent storage, and replicates data over the brokers inside a cluster. This is just another trait that makes Kafka look like a database.
Why then isn’t Kafka used widely as a database, and why aren’t we addressing the idea that this might be a data storage solution? The simplest reason for this is because Kafka has some peculiarities that are not typical for general databases. For example, Kafka also doesn’t provide arbitrary access lookup to the data. This means that there is no query API that can be used to fetch columns, filter them, join them with other tables, and so on. Actually, there is a Kafka StreamsAPI and even an ksqlDB. They support queries and strongly resemble traditional databases. But they are like scaffolds around Kafka. They act as consumers that process data for you after it’s consumed. So, when we talk about Kafka in general and not its extensions, it’s because there isn’t a query language like SQL available within Kafka to help you access data. By the way, modern data lake engines like Dremio can solve this issue. Dremio supports interactions using SQL with data sources that don’t support SQL natively. So, for example, you can persist data from Kafka streams in AWS S3, and then access it using Dremio AWS edition.
Kafka is also focused on the paradigm of working with streams. Kafka is designed to act as the core of applications and solutions that are based on streams of data. In short, it can be seen as a brain that processes signals from different parts of the body and allows an organ to work by interpreting those signals. The aim of Kafka is not to replace more traditional databases. Kafka lives in a different domain, and it can interact with databases, but it is not a replacement for databases. Kafka can be easily integrated with databases and cloud data lake storage such as Amazon S3 and Microsoft ADLS with the help of Dremio.
Keep in mind that Kafka has the ability to store data and the data storage mechanism is quite easy to understand. Kafka stores the log of records (messages) from the first message up till now. Consumers fetch data starting from the specified offset in this log of records. This is the simplified explanation of what it looks like:
Message 1 → | Message 2 → | Message 3 → | Message 4 → | Message 5 → (offset here) | ... |
The offset can be moved back in history which will force the consumer to read past data again.
Because Kafka is different from traditional databases, the situations where it can be used as a data store are also somewhat specific. Here are some of them:
- To repeat the processing of the data from the beginning when the logic of processing changes;
- When a new system is included in the processing pipeline, and it needs to process all previous records from the very beginning or from some point in time. This features helps avoid copying the full dump of one database to another;
- When consumers transform data and save the results somewhere, but for some reason, you need to store the log of data changes over time.
Later in this article, we will look at an example of how Kafka can be used as a data store in a use case similar to the first one described above.
Kafka as a data loss prevention tool
A lot of developers choose Kafka for their projects because it provides a high level of durability and fault-tolerance. These features are achieved by saving records on disk and replicating data. Replication means that the same copies of your data are located on several servers (Kafka brokers) within the cluster. Because the data is saved on disk, the data is still there even if the Kafka cluster becomes inactive for some period of time. Thanks to the replication, the data stays protected even when one or several of the clusters inside the broker are damaged.
After data is consumed, it is often transformed and/or saved somewhere. Sometimes data can become corrupt or lost during data processing. In such cases, Kafka can help restore the data. If needed, Kafka can provide a way to execute operations from the beginning of the data stream.
You should be aware that the two main parameters used to control the data loss prevention policy are the replication factor and the retention period. The replication factor shows how many redundant copies of data for the given topic are created in the Kafka cluster. To support fault-tolerance you should set the replication factor to a value greater than one. In general, the recommended value is three. The greater the replication factor, the more stable the Kafka cluster. You can also use this feature to place Kafka brokers closer to the data consumers while having replicas on geographically remote brokers at the same time.
The retention period is the time during which Kafka saves the data. It is obvious that the longer the period, the more data you will save, and the more data you will be able to restore in case something bad happens (for example, the consumer goes down due to power failure, or the database loses all data as the result of an accidental wrong database query or hacker attack, etc.).
Example
Here’s an example of how Kafka’s storing capabilities can be very helpful when the business logic of data processing changes suddenly.
Suppose we have an IoT device that collects weather metrics such as temperature, humidity, and the concentration of carbon monoxide (CO). The device has limited computation power, so all it can do with the metrics is to send them somewhere. In our setup, the device will send data to the Kafka cluster. We will send one datapoint per second, measuring every second as a day (in other words, the IoT device collects information on a per-day basis). The diagram that visualizes this flow is demonstrated below:
A consumer subscribes to the topic to which the producer sends its messages. The consumer then aggregates the data in a specified way. It accumulates data during the month, then calculates the average metrics (average temperature, humidity, and the concentration of CO). Information about each month should be written into the file. The file contains only one line and the values are separated by commas. Here is an example of what the file might look like:
Let’s start from the creation of the Kafka producer. The producer will be located in the producer.py file. At the beginning of the file, we should import all the libraries we’ll need and create the KafkaProducer instance, which will work with the Kafka cluster located on the localhost:9092:
import random import json import time from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
Below you can see the code that generates data and sends it to the Kafka cluster. At the top, we defined the initial values for temperature, humidity and CO concentration (prev_temp, prev_humidity, prev_co_concentration). The counter i is used for record indexing. For this simulation, we want generated values to be random, but we also want to avoid very inconsistent results (like changing the temperature for 40 degrees in just one day). So we don’t just generate random values; we also need to take into account values from the previous day when generating values for the next day. Also, on each iteration, we check whether the generated numbers are in acceptable intervals.
topic_name = 'weather' i = 0 prev_temp = round(random.uniform(-10, 35), 1) prev_humidity = random.randint(1, 100) prev_co_concentration = random.randint(50, 1500) while True: i += 1 lower_temp_bound = -10 if (prev_temp-5) < -10 else (prev_temp-5) upper_temp_bound = 35 if (prev_temp+5) > 35 else (prev_temp+5) temperature = round(random.uniform(lower_temp_bound, upper_temp_bound), 1) lower_humid_bound = 1 if (prev_humidity-20) < 1 else (prev_humidity-20) upper_humid_bound = 100 if (prev_humidity+20) > 100 else (prev_humidity+20) humidity = random.randint(lower_humid_bound, upper_humid_bound) lower_co_bound = 50 if (prev_co_concentration-100) < 50 else (prev_co_concentration-100) upper_co_bound = 1500 if (prev_co_concentration+100) > 1500 else (prev_co_concentration+100) co_concentration = random.randint(lower_co_bound, upper_co_bound) weather_dict = { "record_id": i, "temperature": temperature, "CO_concentration": co_concentration, "humidity": humidity } producer.send(topic_name, value=json.dumps(weather_dict).encode()) producer.flush() prev_temp = temperature prev_humidity = humidity prev_co_concentration = co_concentration time.sleep(1)
After generating all of the required data for the current timestamp, the script creates the weather_dict variable from the dictionary with data. After that, the producer sends the JSON-encoded weather_dict to the specified topic of the Kafka cluster. We assign the current values to the corresponding variables that represent the data from the previous timestamp. In the end, we wait for one second before executing the next iteration of the loop.
Now let’s explore the consumer.py file. At the top of the file, we define the consumer with several parameters. The first and second parameters are the names of the topics for subscription and the URL where the Kafka server is located. The auto_offset_reset argument defines the behaviour when the OffsetOutOfRange error occurs. The ‘earliest’ value means that the offset should be moved to the oldest available record. So, all messages (records) will be consumed once again after the error. The consumer_timeout_ms parameter is responsible for turning off the consumer when there are no new messages during the specified period of time. The -1 value means that we don’t want to turn off the consumer.
You can read more about these and other parameters of KafkaConsumer in the documentation.
import json from kafka import KafkaConsumer consumer = KafkaConsumer('weather', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', consumer_timeout_ms=-1)
Let’s move on to the most important part of the consumer.py file. In the beginning, we define the counter i *and the lists where we will store data during the month. On each iteration, we will decode the message (extract *weather_dict from it) and append values for the current day to the corresponding lists. If the i counter is equal to 30, we calculate the average values of the metrics for the month.
i = 0 month_temperatures = [] month_humidities = [] month_co = [] month_id = 1 for message in consumer: i += 1 value = message.value.decode() weather_dict = json.loads(value) month_temperatures.append(weather_dict['temperature']) month_humidities.append(weather_dict['humidity']) month_co.append(weather_dict['CO_concentration']) if i == 30: month_aggregation = {'month_id': month_id, 'avg_temp': round(sum(month_temperatures)/len(month_temperatures), 1), 'avg_co': round(sum(month_co)/len(month_co)), 'avg_humidity': round(sum(month_humidities)/len(month_humidities)) } with open('weather_aggregation.txt','a') as file: data = [str(month_aggregation['month_id']), str(month_aggregation['avg_temp']), str(month_aggregation['avg_co']), str(month_aggregation['avg_humidity'])] file.write(",".join(data)) file.write(",") i = 0 month_id += 1 month_temperatures = [] month_humidities = [] month_co = []
Next, we open the file weather_aggregation.txt and write data into it. The data is written in a single line, without line breaks. So, the programs that will need to read the file should be aware that every 5th value is the start of the new data point.
Before running the producer and consumer, you should run the Kafka cluster. You can do this using the following command (assuming that you have already installed Kafka):
sudo kafka-server-start.sh /etc/kafka.properties
Here is how the output file looks:
Suppose now that the time flew and after some period of time, the business requirements have changed. Now we need to process the weather data in a different way.
First, the aggregation metrics (averages) should be calculated on a per-week basis rather than on a per-month basis, as per the previous requirements. Secondly, we need to convert Celsius temperature degrees into Fahrenheit scale. Finally, we want to change storing logic. Instead of creating a .txt file and writing all data into one file, we need to create a CSV file with columns and rows. Each row should represent one data point (one week’s data). In addition, we want to save the same data into an AWS S3 bucket.
Changing the code to implement these changes is not a problem. But we collected a lot of data earlier and we don’t want to lose it. In the ideal situation, we want to recalculate all the metrics from the very beginning. So, in the result, we need to receive data in the new format, but ensure it includes those time periods for which we used a different processing approach earlier. Kafka’s storing capabilities will help us.
Let’s explore the changes in the code we needed to make (file consumer.py). First, we need to import the boto3 library, specify AWS credentials, and instantiate the resource (S3). Also, we changed the names of the variables with lists to make them reflect the fact that they accumulated weekly data rather than monthly. The next change is that we are looking at each 7th record now in order to execute aggregation (earlier we waited for each 30th record). Also, we implemented the conversion from Celsius to Fahrenheit formula ((c * 1.8) + 32).
import boto3 ACCESS_KEY = "<AWS_ACCESS_KEY>" SECRET_KEY = "<AWS_SECRET_KEY>" s3 = boto3.resource('s3', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY) i = 0 week_temperatures = [] week_humidities = [] week_co = [] week_id = 1 for message in consumer: i += 1 value = message.value.decode() weather_dict = json.loads(value) week_temperatures.append(weather_dict['temperature']) week_humidities.append(weather_dict['humidity']) week_co.append(weather_dict['CO_concentration']) if i == 7: week_aggregation = {'week_id': week_id, 'avg_temp': round((sum(week_temperatures)/len(week_temperatures) * 1.8), 1)+32, 'avg_co': round(sum(week_co)/len(week_co)), 'avg_humidity': round(sum(week_humidities)/len(week_humidities)) } if week_id == 1: with open('weather_aggregation.csv','a') as file: data = ['week_id', 'avg_temp', 'avg_co', 'avg_humidity'] file.write(",".join(data)) file.write("\n") with open('weather_aggregation.csv','a') as file: data = [str(week_aggregation['week_id']), str(week_aggregation['avg_temp']), str(week_aggregation['avg_co']), str(week_aggregation['avg_humidity'])] file.write(",".join(data)) file.write("\n") s3.Object('s3-bucket_name', 'weather_aggregation.csv').put(Body=open('weather_aggregation.csv', 'rb')) i = 0 week_id += 1 week_temperatures = [] week_humidities = [] week_co = []
Other changes are related to the saving of processed data. Now we work with CSV files, and if it is the first week, we write the column names to the file. Also, the script adds a new line character after each data point to write information about each week on a new row. Finally, we insert the weather_aggregation.csv file into AWS S3 storage as well. The bucket s3-bucket-name was created earlier.
To recalculate aggregations from the start of our IoT device life (when it started to send data to Kafka), we need to move the offset to the beginning of the message queue. In kafka-python it is as simple as using the seek_to_beginning() method of the consumer:
consumer.seek_to_beginning()
In other words, if we place the calling method before the code that we described above, we move the offset to the beginning of the queue for the consumer. This will force it to read messages again that it has already read and processed. This demonstrates the fact that when Kafka stores messages, it doesn’t remove data after the consumer reads it once. Here is the weather_aggregation.csv file generated by the updated consumer:
This example shows that Kafka was useful as a data storing system. The benefits that Kafka gives in terms of data loss prevention are easy to see. Suppose that the server where our consumer is located was down for some time. If Kafka didn’t have data storing capabilities, all messages sent by the producer would be lost. But we know that when the consumer will be alive again, it will be able to fetch all messages that were accumulated by the Kafka cluster during the consumer downtime. No additional actions are required to use this feature. You don’t need to move the offset. It will be located on the message that was consumed the last time the consumer was live; it will start to consume data right from the place where it stopped.
The example we demonstrated is simple, but it allows us to understand how useful Kafka is. At the moment, we have the CSV file with the weekly-aggregated weather data. We can use it for data analysis (for example, look at the Integrating Tableau with Amazon S3 tutorial), machine learning model creation (Creating a Regression machine learning model using ADLS Gen2 data), or for internal purposes of an application. Dremio also allows us to join data sources in the data lake and work with it using SQL (even if the original data sources have no support for SQL - see Combining Data From Multiple Datasets or A Simple Way to Analyze Student Performance Data with Dremio and Python tutorials). Dremio is a useful instrument in the data engineering toolkit.
Conclusion
In this article, we explored how Kafka can be used for the storing of data and as a data loss prevention tool for streaming applications. We provided an overview of these features, listed the use cases where they are useful, and explained why Kafka isn’t a replacement for traditional databases. We demonstrated a case where a different approach for data processing and transformation was implemented. At the same time, stakeholders wanted to have the results of data processing computed according to the new approach on all data that we processed starting from the very beginning. With Kafka, this issue was solved easily.
To learn more about Dremio, visit our tutorials and resources as well as Dremio University, our free online learning platform, where you can deploy your own Dremio virtual lab. Also, checkout Dremio AWS Edition to launch Dremio in your AWS account and start querying your data in minutes. If you have any questions, visit our community forums, where we are all eager to help.