h3h3h3

25 minute read · May 26, 2020

Collecting App Metrics in your cloud data lake with Kafka

Dremio Authors: Insights and Perspectives

Dremio Authors: Insights and Perspectives · Dremio Team

In this article, we will demonstrate how Kafka can be used to collect metrics on data lake storage like Amazon S3 from a web application. Once collected, these metrics can be used for monitoring purposes, data analysis using a data lake engine such as Dremio and machine learning models using Python.

Metrics collection

Metrics are the indicators (values) that reflect the state of a process or a system. When we have a sequence of values, we can also make a conclusion about trends or seasonality. In summary, metrics are indicators of how the process or the system evolves. Metrics can be generated by applications, hardware components (CPU, memory, etc.), web servers, search engines, IoT devices, databases and so on. Metrics can reflect the internal state of the system and even some real-world processes. Examples of real-world metrics include an e-commerce website that can generate information about the number of new orders over any given period, air quality devices that can collect data about the concentration of different chemical substances in the air, and CPU load, which is an example of the metrics pertaining to the internal state of the computer system.

The collected metrics can be analyzed in real time or stored for batch analysis later. Collected metrics can also be used to train machine learning models.

Collecting metrics can be a complex process because it depends on many parameters and conditions. The source of the metric produces the values, then those values are either delivered to a cloud data lake storage or used in real time. The method in which metrics are delivered from source to storage, as well as the approach for storing, can vary significantly from one case to another.

One of the tools that can help with the collection of metrics is Apache Kafka.

Kafka overview

Apache Kafka is a tool used for building real-time data processing pipelines and streaming applications. Kafka is a distributed system, which means it can operate as a cluster from several sources. The Kafka cluster is the central hub used by different data generators (called producers) and data consumers (called consumers). Applications (desktop, web, mobile), APIs, databases, web services and IoT devices are all typical examples of producers. The producer is the entity that sends data to the Kafka cluster. The consumer is the entity that receives data from the Kafka cluster. The Kafka cluster can consist of one or more Kafka brokers.

Kafka uses topics to organize data. The topic is the category for streams of data. Each data point in the topic has its own unique timestamp, key and value. Producers can write data into specific topics, while consumers can subscribe to the desired topics to receive specific sets of data. Kafka supports data replication, which is the creation of copies of the same data on different brokers. This prevents data loss when one of the brokers is damaged or out for some reason.

Kafka is one of the most popular event streaming platforms and messaging queues. It is used by many large companies to manage their data pipelines. Here are several of the most important advantages that Kafka provides:

  • Scalability (due to the support for distributed operation)
  • Fault tolerance
  • Durability
  • Fast operation
  • High throughput
  • Real-time mode as well as the ability to work in batch mode

Let’s take a look at how Kafka can be used for collecting metrics.

How Kafka can be used for collecting metrics

Usually, collecting metrics is done in real time. This means that the source of the metrics constantly generates data and can send it as a data stream. As we know, Kafka is a good tool for handling data streams, which is why it can be used for collecting metrics.

In this example, we will use a simple Flask web application as a producer. It will send metrics about its activity to the Kafka cluster. The consumer will be a python script which will receive metrics from Kafka and write data into a CSV file. This script will receive metrics from Kafka and write data into the CSV file. On its own, the Python app can enrich data, and send metrics to cloud storage. At this stage, the data is available for a range of best-of-breed data lake engines like Dremio to query and process.

Here’s a tip: If you want to perform metrics monitoring, you can use tools like Prometheus, Grafana, Kibana, etc. The pipeline is the same: the web application sends data into the Kafka cluster after which the metrics should be delivered to the aforementioned platforms where they are visualized and analyzed. It is also possible to set up alerts and notifications if some events occur.

Example

Let’s look at the example of metrics collection with the help of Kafka. We will use a Flask web application as a source of metrics. Using the app, people can create orders and buy essential goods. Here is the main page of the website:

image alt text

It is very simple: when the user clicks on the New order button, they will go to the next page where they can place the order.

image alt text

When the user checks the checkbox field, this means they want to pay for the order immediately. If not, the goods will be supplied under credit conditions. After the user clicks on the Make an order button, the next page is loaded:

image alt text

On this page, the user can review the details of the created order. The new element here is the total price, which is calculated by multiplying the price for the 1 unit times the ordered amount.

Let’s now look at the code of the application. It has several files including forms, templates, configs, database (SQLite), etc. But we will demonstrate only the files that play a role in generating and sending metrics to the Kafka cluster.

It is important to note that for this article, we will use the kafka-python package. It allows us to work with Kafka directly from Python code. We installed it using the following command:

pip install kafka-python

Below, you can see the code from the models.py file. This file describes the structure of the database. We have just one table there called Order. It is represented by a Python class, where each attribute is the column in the database. But the most interesting part of this file is the send_order_info_to_kafka() function.

import json
from app import db
from sqlalchemy import event
from kafka import KafkaProducer
class Order(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    customer_email = db.Column(db.String(120),
                               nullable=False, default="")
    amount = db.Column(db.Integer)
    total_price = db.Column(db.Integer)
    is_prepaid = db.Column(db.Boolean, default=False)
@event.listens_for(Order, 'after_insert')
def send_order_info_to_kafka(mapper, connection, target):
   assert target.id is not None
   producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
   topic_name = 'new_orders'
   order_dict = {
       "order_id": target.id,
       "order_amount": target.amount,
       "order_total_price": target.total_price,
       "is_paid": target.is_prepaid
   }
   producer.send(topic_name, value=json.dumps(order_dict).encode())
   producer.flush()

This function is enhanced by the event.listens_for() decorator (imported from the sqlalchemy library). This decorator monitors the event when the record about the new order is inserted into the database. When this occurs, the function creates a KafkaProducer instance (which points to the URL where the running Kafka cluster is located) and specifies the name of the Kafka topic - new_orders. Then the function creates the body of the message. We want to send statistics about the orders to Kafka. That’s why for each order, we create the dictionary with information about the order amount, its total price, and whether it is prepaid or not. Then we transform this dictionary into JSON format, encode it, and send it to Kafka using the producer’s methods send() and flush(). So, this function is triggered every time that users create a new order. The purpose of the function is to send information about the created order to the Kafka cluster.

We want to collect one more set of metrics - the amount of requests for a certain period of time. It is a common metric to monitor for any web application. So each time someone visits a page on our website, we need to send the notification about this to our Kafka cluster. Here is how we can implement this behavior. In the file utils.py we define the function called ping_kafka_when_request(). The body of this function is very similar to the function that we saw before. It creates the instance of the producer, defines the name of the topic where the producer should commit messages (web_requests), and then uses the send() and flush() methods to send messages. This function is a little bit simpler because we don’t need to create a complex body for the message. We just send value=1 each time a new request occurs.

from kafka import KafkaProducer

def ping_kafka_when_request():

   producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

   topic_name = 'web_requests'

   producer.send(topic_name, value="1".encode())

   producer.flush()

To make this function work, we need to call it in the view functions for each of our pages. This can be done in the routes.py file (see the code below). There are three functions: (index(), create_order(), and order_complete()). Each of these functions are responsible for executing some logic while rendering pages on the website. The most complex function is the create_order() function because it should process form posting and the insertion of new records into the database. But if we talk about interaction with Kafka, you should pay attention to the fact that we import the ping_kafka_when_request() function from the utils file and call it inside each of the view functions (before executing all remaining code in that function).

from app import app, db

from app.models import Order

from flask import render_template, redirect, session

from app.forms import OrderForm

from .utils import ping_kafka_when_request

@app.route('/')

def index():

   ping_kafka_when_request()

   return render_template('index.html')

@app.route('/order-new', methods=['GET', 'POST'])

def create_order():

   ping_kafka_when_request()

   form = OrderForm()

   if form.validate_on_submit():

       price = 15

       customer_email = form.email.data

       amount = form.amount.data

       is_prepaid = form.is_paid_now.data

       total_price = amount * price

       order = Order(customer_email=customer_email,

                     amount=amount,       

                     total_price=total_price,

                     is_prepaid=is_prepaid)



       db.session.add(order)

       db.session.commit()

       session['order'] = {"email": customer_email,

                           "amount": amount,

                           "total_price": total_price,

                           "is_paid": is_prepaid}

       return redirect('/order-new-complete')

   return render_template('new_order.html',

                           title='Make a new order', form=form)

@app.route('/order-new-complete')

def order_complete():

   ping_kafka_when_request()

   return render_template('new_order_complete.html',

                           order=session['order'])

Those were the producer sides of our architecture. We explained that the code needed to be located inside the web application in order to send metrics to the Kafka cluster. Now let’s look at another side - consumers.

The first file is consumer_requests.py. Let’s examine it by chunks. At the beginning of the file, we import all the packages we’ll need and create the instance of Kafka consumer. We will apply several parameters so it can work the way it was intended. You can read about them in the documentation. The most important parameters are the names of the topics to which we want to subscribe the consumer (web_requests) and the bootstrap_servers parameter that points to the server where the Kafka cluster is located.

import time

import threading

import datetime

from kafka import KafkaConsumer

consumer = KafkaConsumer('web_requests',

                         bootstrap_servers=['localhost:9092'],

                         auto_offset_reset='earliest',

                         enable_auto_commit=True,

                         auto_commit_interval_ms=1000,

                         consumer_timeout_ms=-1)

Next, we need to create a function which will poll the Kafka cluster once a minute and process the messages which Kafka will return. The name of the function is fetch_last_min_requests() and you can see it in the code sample below. It needs two parameters as inputs. The next_call_in parameter shows when the next call of this function should occur (remember that we need to fetch new data from Kafka every 60 seconds). The is_first_execution parameter is not required. By default, it is equal to False.

At the beginning of the function, we fix the time when the next call of this function should occur (60 seconds from now). Also, we initialize the counter for requests. Then, if it is the first execution, we create the file requests.csv and write a row with headers to it. The structure of this dataset will be simple. It should have two columns - datetime and requests_num. Each row will have the timestamp in the datetime column as well as the number of requests that were processed by the website during the given minute in the requests_num column.

def fetch_last_min_requests(next_call_in, is_first_execution=False):

    next_call_in += 60
    counter_requests = 0
    if is_first_execution:

        with open('requests.csv','a') as file:

            headers = ["datetime", "requests_num"]

            file.write(",".join(headers))

            file.write('\n')

    else:

        batch = consumer.poll(timeout_ms=100)

        if len(batch) > 0:

            for message in list(batch.values())[0]:

                counter_requests += 1

        with open('requests.csv','a') as file:

            data = [datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(counter_requests)]

            file.write(",".join(data))

            file.write('\n')

    threading.Timer(next_call_in - time.time(),

                    fetch_last_minute_requests,

If this is not the first execution of the function, we will force the consumer to poll the Kafka cluster. You should set the timeout_ms parameter of the poll() method to a number greater than zero, because otherwise, you can miss some messages. If the poll() method returns the non-void object (batch) we want to loop over all fetched messages and on each iteration increment the count_requests variable by 1. Then we open the request.csv file, generate the row (string from the current datetime and counter_requests values joined by comma), and append this row to the file.

The last line in the given function is the timer setup. We insert three parameters into the Timer object. First is the period of time after which the function (the second parameter) should be triggered. The period of time is calculated dynamically by subtracting the current timestamp from the time stored in the next_call_in variable, which we computed at the beginning of the function. The third parameter of the Timer object is the list with arguments which should be passed into the function which we want to execute. We immediately start the timer using its start() method.

Why do we need such a tricky way of defining the time where the next function call will occur? Can’t we just use the more popular time.sleep() method? The answer is no. We used this approach because the execution of the logic that is located inside the function takes some time. For example, the Kafka cluster polling will take at least 100 milliseconds. Moreover, we then need to count requests and write the result into the file. All these things could be time consuming, and if we simply pause the execution using time.sleep(), the minute period will drift every next iteration. This can corrupt the results. Using the threading.Timer object is a slightly different and more suitable approach. Instead of pausing for 60 seconds, we compute the time when the function should be triggered by subtracting the time that was spent on the execution of the code inside the function’s body.

Now we can use the defined function. Just initialize the next_call_in variable by the current time and use the fetch_last_minute_requests() function with this variable as the first parameter and the True flag as the second (to mark that this is the first execution).

next_call_in = time.time()

fetch_last_minute_requests(next_call_in, True)

That is all for the *consumer_requests.py *file. But before you can execute it, you should run the Kafka cluster. Here is how you can do it locally from the Terminal (assuming that you already have it installed):

sudo kafka-server-start.sh /etc/kafka.properties

Now you can run the file. Then go to the web application (you can run the Flask application using the command flask run) in your browser and try to browse it - visit its pages. After a while, you should have the file requests.csv in the folder where your consumer file is located. The file could look like this (actual data will be different and depends on the number of times you visited the pages of your app):

image alt text

What we did was to build the pipeline, allowing us to collect a web application metric (number of requests) using Kafka and Python.

Now let’s look at another consumer. We called this file as consumer_orders.py. The first part of the file is very similar to the previous file. The one difference is that we import the json library because we will need to work with JSON-encoded messages. Another difference is that the Kafka consumer is subscribed to the new_orders topic.

import json

import time

import datetime

import threading

from kafka import KafkaConsumer

consumer = KafkaConsumer('new_orders',

                         bootstrap_servers=['localhost:9092'],

                         auto_offset_reset='earliest',

                         enable_auto_commit=True,

                         auto_commit_interval_ms=1000,

                         consumer_timeout_ms==-1)

The main function is the fetch_last_minute_orders(). The difference from the function with the previous consumer is that this function has six counters instead of just one. We want to count the total number of orders created over one minute, the total amount of items ordered, the total cost for all orders, as well as how many orders are prepaid, how many items were in prepaid orders, and the total price for all prepaid orders. These metrics could be useful for further analysis.

Another difference is that before starting the calculation of the aforementioned values, we need to decode the message fetched from Kafka using the json library. All other logic is the same as for the consumer that works with requests. This file to which the data should be written is called orders.csv.

def fetch_last_minute_orders(next_call_in, is_first_execution=False):

    next_call_in += 60



    count_orders = 0

    count_tot_amount = 0

    count_tot_price = 0

    count_orders_paid = 0

    count_tot_paid_amount = 0

    count_tot_paid_price = 0



    if is_first_execution:

        with open('orders.csv','a') as file:

            headers = ["datetime", "total_orders_num",

                       "total_orders_amount", "total_orders_price",

                       "total_paid_orders_num",

                       "total_paid_orders_amount",

                       "Total_paid_orders_price"]

            file.write(",".join(headers))

            file.write('\n')

    else:

        batch = consumer.poll(timeout_ms=100)

        if len(batch) > 0:

            for message in list(batch.values())[0]:

                value = message.value.decode()

                order_dict = json.loads(value)

                # all orders

                count_orders += 1

                count_tot_amount += order_dict["order_amount"]

                count_tot_price += order_dict["order_total_price"]

                if order_dict["is_paid"]:

                    # only paid orders

                    count_orders_paid += 1

                    count_tot_paid_amount += order_dict["order_amount"]

                    count_tot_paid_price += order_dict["order_total_price"]

        with open('orders.csv','a') as file:

            data = [

               datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),

               str(count_orders), str(count_tot_amount),

               str(count_tot_price), str(count_orders_paid),

               str(count_tot_paid_amount),

               str(count_tot_paid_price)

                    ]

            file.write(",".join(data))

            file.write('\n')

    threading.Timer(next_call_in - time.time(),

                    fetch_last_minute_orders,

                    [next_call_in]).start()

The last part of the file is the same: getting the current time and triggering the function defined above:

next_call_in = time.time()

fetch_last_minute_orders(next_call_in, True)

Given that you already have the Kafka cluster running, you can execute the consumer_orders.py file. Next, go to your Flask app and create some orders for several minutes. The generated orders.csv file will have the following structure:

image alt text

You can see that our Python scripts (especially those that work with order data) perform some data enrichment.

Data engineers can customize this process. For example, if the app is very large and high-loaded, the Kafka cluster should be scaled horizontally. You can have many topics for different metrics, and each topic could be processed in its own way. It is possible to track new user registrations, user churns, the number of feedbacks, survey results, etc.

Also, you can set up a collection of some low-level metrics like CPU load or memory consumption. The basic pipeline will be similar. It is also worth mentioning that writing data into CSV files is not the only option you can also make use of open data formats such as Parquet and land all this data directly on your data lake.

Once you have the metrics collected, you can use Dremio to directly query the data, as well as to create and share virtual data sets that combine the metrics with other sources in the data lake, all without any copies. Look at our tutorials to learn more.

Conclusion

In this article, we built a data pipeline for the collection of metrics from the Flask web application. The Kafka cluster is used as the layer between data producers (deployed in the web application) and data consumers (Python scripts). Python scripts act as apps that fetch metrics from the Kafka and then process and transform data. The examples given are basic, but you can use it to build more complex and diverse pipelines for metrics collection according to your needs. You can then use Dremio, the industry’s leading data lake engine to query and process the resulting datasets.

To learn more about Dremio, visit our tutorials and resources as well as Dremio University, our free online learning platform, where you can deploy your own Dremio virtual lab. If you have any questions, visit our community forums, where we are all eager to help.

Ready to Get Started?

Enable the business to create and consume data products powered by Apache Iceberg, accelerating AI and analytics initiatives and dramatically reducing costs.