33 minute read · December 3, 2019
Forecasting air quality with Dremio, Python and Kafka
· Dremio Team
Intro
Forecasting air quality is a worthwhile investment on many different levels, not only to individuals but also communities in general, having an idea of what the quality of air will be at a certain point in time allows people to plan ahead, and as a result decreases the effects on health and costs associated with it.
When predicting air quality, there is a large number of variables to take into account, using a machine learning model that allows us to use all these variables to predict future air quality based on current readings, brings a lot of value to the scenario.
In this tutorial, we will create a machine learning model using historical air quality data stored in Amazon S3 buckets and also ADLS, we will use Dremio to link both data sources and also curate the data before creating the model using Python, additionally we will use the resulting model along with Kafka to predict air quality values to a data stream.
Assumptions
To successfully complete this tutorial, we suggest that you have the following items ready to go:
- Dremio
- AWS account
- Microsoft Azure account
- Apache Kafka
- PyODBC
- Dremio ODBC Driver
- Azure Storage Explorer
- Jupyter Notebook environment
- Ubuntu OS
Uploading data to Amazon S3
In Amazon S3 the data should be stored inside buckets. To create a bucket (we want to call it (dremioairbucket), go to the AWS portal and select S3 from the list of services. Then click on the Create bucket button. Fill in the details and click Create:
After the bucket is created, select it and click Upload. Then specify the file that you want to upload (air_second.csv) and select Upload:
Uploading data to ADLS Gen2
Follow these steps to upload data to ADLS Gen2:
- Create a resource group
- Create a storage account which supports ADLS Gen2
- Create a file system inside the storage account.
- Use Storage Explorer software to upload files.
We want to call our resource group air_quality. To create it, go to the Resource groups tab and click Add.
After the resource group is created, go to the All services section, select Storage, then select the Storage accounts tab, click Add and specify the needed parameters, as shown on the image (NOTE: dremioairstorage is the name we chose for the storage account):
Click on Next: Networking and then choose the Advanced tab. Enable “hierarchical namespace” for ADLS Gen2:
Now you can review and create the storage account.
Before we can actually upload files to ADLS we need to create a file system. To do so, select the dremioairstorage storage account and click on the Containers icon. Then create a new file system (we called our system as airqualityfs):
Eventually, we can upload the file to ADLS Gen2. To do this, you should use Azure Storage Explorer tool. Launch it and find your storage account:
Then double click on it and select the created file system:
You should now be able to see the Upload button. Click on it, specify the file (air_first.csv), and click the Upload button in the dialog window:
After uploading, the dataset should appear in the ADLS Gen2.
Connection of Amazon S3 and Dremio
Now let’s connect Amazon S3 and Dremio. Select Amazon S3 from the list of available data sources:
Next, provide all the information required for the connection. On the image below you can see that we specified the name of the new data source (airquality_s3_source). Other important fields are AWS Access Key and AWS Access Secret. We used the credentials that we had created earlier. If you don’t have the credentials or you have forgotten their values, you should generate new in the IAM section of the AWS console.
After pressing the Save button the connection should be established and you will be able to see the part of the dataset in Dremio.
To finalize the connection set up, click on the air_second.csv file and check whether you have correct values for Field Delimiter, Line Delimiter, and Extract Field Names in the Options section (look at the image below). Then click the Save button.
Connection of ADLS Gen2 and Dremio
Setting the connection between ADLS Gen2 and Dremio is very similar to what we have done for AWS S3.
First, select the Azure Storage from the list of available data sources:
For authentication purposes, you need to specify the Shared Access Key. You can find it when you go inside your storage account in the Azure portal, and then click on the Access keys section. The key1 is what you need (see the image below).
Copy the key from the Azure portal and paste it in the corresponding field in Dremio. Specify the name of the data source (airquality_adls2_source) and the name of the Azure storage account (dremioairstorage). Then press the Save button:
Then simply check the dataset view settings (as we have shown earlier for Amazon S3). After you press the Save button there, everything should be ready for starting the data curation.
Data curation in Dremio
At this point, we are going to join both datasets coming from ADLS and S3 into a virtual dataset, to do this, first select the Join option:
The next step is the selection of the dataset with which you want to join the current dataset. We want to join air_first.csv with air_second.csv. After specifying the second dataset, click Next:
Then we need to select outer keys, i.e the columns which will be used by Dremio to join datasets. Both of our datasets have column A as a unique identifier. So, we will use these columns for joining. Then click the Apply button:
Now we have two columns with identifiers (A and A0). We don’t need them further. Let’s drop them. To do this, call the context menu of the field that you want to drop and click on the Drop option:
On the next step, we want to convert values in the columns CO(GT), T, RH, and AH from string type to float. But first, we need to replace commas with dots in the numbers of these columns. To do this, call the dropdown menu for the needed column and select the Replace Text option.
The dialog window should appear. Click on Pattern, change Edit Selection **to Contains and specify “,” in the corresponding field. Then, change **Replace to Text Selection and specify “.” in the corresponding field. Check the preview of the actions you are going to make and if everything is as expected click Apply.
Now we can change the type of column values. Click on the little Abc icon near the name of the column, then select the type to which you want to go:
We need to perform comma replacing and type conversion for all 4 aforementioned columns. Then we will save the curated dataset as air_curated in the airquality space:
It is important to note, that everything that you did with the datasets in Dremio takes its reflection in SQL code. Moreover, you can edit SQL and this will change the dataset. So, datasets can be manipulated from the side of UI and directly from SQL code. To view the current SQL query, press the SQL Editor button above the dataset. For example, here is the SQL query that was generated after all actions we did earlier:
SELECT nested_0."Date" AS "Date", nested_0."Time" AS "Time", CONVERT_TO_FLOAT(CASE WHEN regexp_like(nested_0."CO(GT)", '.*?\Q,\E.*?') THEN regexp_replace(nested_0."CO(GT)", '\Q,\E', '.') ELSE nested_0."CO(GT)" END, 1, 1, 0) AS "CO(GT)", CONVERT_TO_FLOAT(CASE WHEN regexp_like(nested_0.T, '.*?\Q,\E.*?') THEN regexp_replace(nested_0.T, '\Q,\E', '.') ELSE nested_0.T END, 1, 1, 0) AS T, CONVERT_TO_FLOAT(CASE WHEN regexp_like(nested_0.RH, '.*?\Q,\E.*?') THEN regexp_replace(nested_0.RH, '\Q,\E', '.') ELSE nested_0.RH END, 1, 1, 0) AS RH, CONVERT_TO_FLOAT(CASE WHEN regexp_like(nested_0.AH, '.*?\Q,\E.*?') THEN regexp_replace(nested_0.AH, '\Q,\E', '.') ELSE nested_0.AH END, 1, 1, 0) AS AH, "join_air_second.csv"."PT08.S1(CO)" AS "PT08.S1(CO)", "join_air_second.csv"."NMHC(GT)" AS "NMHC(GT)", "join_air_second.csv"."C6H6(GT)" AS "C6H6(GT)", "join_air_second.csv"."PT08.S2(NMHC)" AS "PT08.S2(NMHC)", "join_air_second.csv"."NOx(GT)" AS "NOx(GT)", "join_air_second.csv"."PT08.S3(NOx)" AS "PT08.S3(NOx)", "join_air_second.csv"."NO2(GT)" AS "NO2(GT)", "join_air_second.csv"."PT08.S4(NO2)" AS "PT08.S4(NO2)", "join_air_second.csv"."PT08.S5(O3)" AS "PT08.S5(O3)" FROM ( SELECT A, "air_first.csv"."Date" AS "Date", "air_first.csv"."Time" AS "Time", "CO(GT)", T, RH, AH FROM airquality_adls2_source.airqualityfs."air_first.csv" ) nested_0 INNER JOIN airquality_s3_source.dremioairbucket."air_second.csv" AS "join_air_second.csv" ON nested_0.A = "join_air_second.csv".A
We have our curated dataset saved as air_curated.
We want to test the hypothesis that the season (summer, winter, etc) and the period of the day (day, night, etc) can somehow influence the chemical composition of the air. Let’s create columns that represent the season and the period of the day based on the Date and Time columns we already have.
Call the dropdown menu for the Date column. Select the Calculated Field option. Now you can use a range of functions to extract the month number from the date. We want to take 5 characters from the left side and then 2 characters from the right side of the result to get the month:
We call the new column as Month. After this, we change its type to integer.
To extract the hour from the Time column we just need to use the LEFT function and take 2 first characters.
The resulting table we save as a temporary dataset.
Now we will go to the temporary dataset and work with SQL. Here is the query we write to create columns Season and Day_period:
import numpy as np import tensorflow as tf
As you can see, we simply use the standard SQL syntax. As a result, we got the dataset with two new columns:
This dataset we save as featured in the same Dremio space.
We will work only with a subset of columns. That’s why we use the following SQL query to form the new dataset and set it as final_curated in the same space:
import pyodbc import pandas as pd host='localhost' port=31010 uid ='dremio_username' pwd = 'dremio_password' driver = '/opt/dremio-odbc/lib64/libdrillodbc_sb64.so' cnxn = pyodbc.connect("Driver={};ConnectionType=Direct;HOST={};PORT={};AuthenticationType=Plain;UID={};PWD={}".format(driver,host,port,uid,pwd),autocommit=True) sql = "SELECT * FROM airqualityspace.final_curated" df = pd.read_sql(sql,cnxn)
Now everything is ready for exporting the curated data to Python.
Dremio and Python connection
To be able to work with Dremio’s dataset in Python we need to use ODBC driver and corresponding Python’s library. See the code below:
import pyodbc import pandas as pd host='localhost' port=31010 uid ='dremio_username' pwd = 'dremio_password' driver = '/opt/dremio-odbc/lib64/libdrillodbc_sb64.so' cnxn = pyodbc.connect("Driver={};ConnectionType=Direct;HOST={};PORT={};AuthenticationType=Plain;UID={};PWD={}".format(driver,host,port,uid,pwd),autocommit=True) sql = "SELECT * FROM airqualityspace.final_curated" df = pd.read_sql(sql,cnxn)
In the first row, we import pyodbc library. Then we specify several needed parameters (host, port, credentials, path to Dremio ODBC driver). Then we use pyodbc package to create a connection. Finally, the read_sql() Pandas function is used to fetch the dataset with the help of a simple SQL query.
Now we can build a machine learning model to predict the quality of the air.
Building a machine learning model
The univariate time series forecasting model will be built for predicting the concentration of CO in the air during the next hour based on the information about the previous 24 hours.
We will use Tensorflow 2.0 for creating a model for time series forecasting. Let’s import it, and also Numpy (remember that we have imported Pandas earlier):
import numpy as np import tensorflow as tf
After importing the dataset from Dremio we want to check its shape and look at several rows from the beginning and from the end of the dataframe:
We can see that due to certain reasons we have several rows at the end of the data frame containing only the NaN values. Here is how we can remove these rows (around 800 records will be dropped):
If we will look at the dataframe, we can notice that some values are the numbers that equal -200.0. In the documentation for this dataset we have found that these values represent missing data. We want to get rid of them too.
Firstly, we loop over the CO column and when we meet the -200.0 value which is surrounded by normal values, we replace it as the average value between these neighbors. Values which are not surrounded by normal numbers we simply replace by NaN:
df2 = list(df['CO']) for i in range(1, len(df2)): if df2[i] == -200.0 and df2[i-1] != -200.0 and df2[i+1] != -200.0: df2[i] = round((df2[i-1] + df2[i+1])/2, 2) else: continue df['CO'] = df2 df.replace(-200.0, np.nan, inplace=True)
In the next cell, we drop all NaN columns. Our dataset was decreased from 8613 to 7171 rows.
co = df['CO'] co.dropna(inplace=True)
We split the dataset on time point 6900. This means that we will train our model on the first 6900 vаlues and test on the remaining:
split_time = 6900 x_train = co[:split_time] time_train = [i for i in range(0, split_time)] time_valid = [i for i in range(split_time, len(co))] x_valid = co[split_time:]
Here is the shape of training dataset:
Before training, we need to prepare the dataset in a special way. Since this is the time series problem, we need to generate so-called windows. This means that as input we will use several time points (24 in our case), and as a label (target value) we will use the next value. The shuffle_buffer_size parameter is a parameter that controls the randomness of the input/output pairs generation.
window_size = 24 batch_size = 16 shuffle_buffer_size = 1000 def windowed_dataset(series, window_size, batch_size, shuffle_buffer): dataset = tf.data.Dataset.from_tensor_slices(series) dataset = dataset.window(window_size + 1, shift=1, drop_remainder=True) dataset = dataset.flat_map(lambda window: window.batch(window_size + 1)) dataset = dataset.shuffle(shuffle_buffer).map(lambda window: (window[:-1], window[-1])) dataset = dataset.batch(batch_size).prefetch(1) return dataset dataset = windowed_dataset(x_train, window_size, batch_size, shuffle_buffer_size)
We define the architecture of the neural network in the code below. We want to use two bidirectional LSTM layers. Each layer should consist of the 32 neurons. The final layer is the single dense neuron for predicting the target. At the beginning of the network, the Lambda layer is used to perform custom operations (in our case, the expansion of dimensions).
tf.keras.backend.clear_session() tf.random.set_seed(42) np.random.seed(42) tf.keras.backend.clear_session() model = tf.keras.models.Sequential([ tf.keras.layers.Lambda(lambda x: tf.expand_dims(x, axis=-1), input_shape=[None]), tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(32, return_sequences=True)), tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(32, return_sequences=True)), tf.keras.layers.Dense(1) ])
The code below is used to compile the model and trigger the training process. We use the Huber loss and the Adam optimizer for training. The quality of the model should be estimated after each iteration using the mean absolute error metric. The training is 50 epochs long.
model.compile(loss=tf.keras.losses.Huber(), optimizer='adam', metrics=["mae"]) history = model.fit(dataset, epochs=50)
In the output of the code above (if you execute it in Jupyter notebook) you will see the progress of the training:
At the end of the training, we obtained the MAE around 0.39.
Now let’s check how our model behaves on the testing data. For this purpose, we want to draw the plot of actual vs. predicted values using the following code:
import matplotlib.pyplot as plt def plot_series(time, series, format="-", start=0, end=None): plt.plot(time[start:end], series[start:end], format) plt.xlabel("Time") plt.ylabel("Value") plt.grid(True) forecast = [] results = [] for time in range(len(co) - window_size): forecast.append(model.predict(np.array(co[time:time + window_size]))) forecast = forecast[split_time-window_size:] results = np.array(forecast)[:, 0, 0] plt.figure(figsize=(10, 6)) plot_series(time_valid, x_valid) plot_series(time_valid, results)
As we can see, our model can predict general trends. Also, sometimes it is very accurate. But in many cases, it was unsuccessful in predicting the peaks. The fine-tuning of the model is out of the scope of this tutorial. But you can experiment with different window sizes, model architecture, feature engineering, etc.
Once we have the trained model we need to save it for later use in other Python modules:
import h5py model.save('model.h5')
Now we can use the saved model for creating a system of real-time prediction.
Using Kafka to provide real-time inference
Before going further, run the Kafka cluster. We will start ours using the following command:
sudo kafka-server-start.sh /etc/kafka.properties
Since we are using Python as our main programming language, we need to install the special Python’s package for Kafka:
pip install kafka-python
Now we will design the architecture of the system, we need to have a producer and a consumer. Think of producer as a device that constantly measures the CO concentration in the air and sends information to Kafka topic. Simply speaking, Kafka’s topic is a part of the Kafka cluster which is dedicated to receiving specific information from producers and providing this information to consumers.
The code for our producer is below. To emulate the behavior of the appliance but not generating only random numbers, we will use our dataset as a source. Note, that we load it into producer from the co.pkl file, so, if you want to replicate this approach, you should save your time series ready beforehand.
We will generate a random number that should serve as an index in the dataset. We want our model to predict the values for the next 5 days from the generated random time point. Then we will generate the next random index and continue producing the values.
The Kafka topic is called co_concentration. The producer should work with the Kafka cluster available at the localhost:9092 (keep in mind that this value might change based on your environment). Each second it should produce one value and send it to Kafka, we will convert a float number into a string before sending, as well as to encode it.
import pandas as pd import random import time from kafka import KafkaProducer co = list(pd.read_pickle('co.pkl')) period_of_days = 5 topic_name = 'co_concentration' producer = KafkaProducer(bootstrap_servers=['localhost:9092']) while True: random_point = random.randint(0+24, len(co)-24*period_of_days) start = random_point - 24 end = random_point + 24*period_of_days for index in range(start, end): val = co[index] print(val) producer.send(topic_name, value=str(val).encode()) producer.flush() time.sleep(1)
Now let’s explore the code for our Kafka consumer.
First, we create an instance of the KafkaConsumer object, the auto_offset_reset, enable_auto_commit, and auto_commit_interval_ms parameters are used to set up the behavior of the consumer after suspending and resuming its work. It should continue to process events from the last previously processed point (offset). The consumer_timeout_ms parameters say that consumer should stop its work after not receiving any events for 10 seconds.
Further, we load the Tensorflow model from the model.h5 file.
Using a loop, we iterate over messages received by the consumer. If we have enough data (24 and more values), we call the predict() method of the model and print the predicted value with the actual value for the next time point. Note, that for the prediction we always use only the last 24 points in the inputs Pandas series.
import pandas as pd import numpy as np import tensorflow as tf from kafka import KafkaConsumer consumer = KafkaConsumer('co_concentration', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, auto_commit_interval_ms=1000, consumer_timeout_ms=10000) inputs = pd.Series([]) predicted_next = 0 loaded_model = tf.keras.models.load_model('model.h5') for message in consumer: val = message.value.decode() if predicted_next != 0: print("ACTUAL: ", val, " PREDICTED: ", predicted_next) else: print(val) inputs = inputs.append(pd.Series([float(val)])) if len(inputs) >= 24: trial_pred = loaded_model.predict(np.array(inputs[len(inputs)-24:])) predicted_next = np.array(trial_pred)[0, 0]
Here is the work of our producer (we run it using python producer.py command):
Here is the work of consumer (we run it in the separate Terminal window using python consumer.py command):
Both producer and consumer work in real-time. This means that we have a real-time inference with our machine learning model.
Conclusion
In this tutorial, we have demonstrated how to create a real-time system for predicting the quality of the air. Using data stored in Amazon S3 buckets and ADLS. Then we loaded the data into Dremio and performed basic data curation there. Tensorflow 2.0 was used to build a neural network that is able to work with time-series data. Then, we have created a Kafka producer and consumer to show how Kafka can be used for creation of the real-time prediction system.