24 minute read · November 12, 2019
Analyzing Multiple Stream Data Sources using Dremio and Python
· Dremio Team
Introduction
New technologies, communication systems, and information processing algorithms demand data rates, availability, and performance targets. Accordingly, the data processing procedures implemented with data (messages) calls for technologies capable of handling this high demand. One of these technologies is RabbitMQ – which is used to develop service-oriented architecture services (SOA) and distributed resource-intensive operations.
However, it is worth to note that the messages from the queue data can flow from different types of data sources and thereby have different formats. As a result, the data processing and analyses are complicated. To resolve this issue, various services are used that can combine different data types into a single database.
This article will focus on data processing and analysis system using RabbitMQ as queries service, Python as a flexible environment, AWS S3 as a fast and secured data storage, and Dremio will be used for combining all data messages from the queue into a single database.
In this tutorial we will execute the following steps:
- Generate data of different types that will flow from various information systems, devices, etc.
- Create a data producer and a consumer with Python-environment tools to packet data communications processes in RabbitMQ service.
- Transmission and storage of processed messages using the RabbitMQ and AWS S3 services.
- Curation of data using Dremio.
Python packages
Environment
- Python (3.6 version)
- Amazon account and AWS S3 bucket
- Dremio and Dremio ODBC Driver
- Pandas and Numpy for data operations.
- Scipy.stats, TensorFlow and TextGenRNN for data generation procedures.
- Pika as a RabbitMQ client.
- PyODBC and AWS SDK Python for AWS and Dremio data processes.
Now we will describe in more detail each step of presented algorithm with relevant tools.
Data sources
We will use random number generators to create different types of data for the required dataset to make data processing simple, clear and structured. So we are going to use scipy.stats library tools and the related probability density functions (PDF) of the random variables.
The datasets will consist of the following data types:
- time and date data – as a time series at the time interval,
- integer data – as data samples with Binomial and Poisson probability distributions,
- floating-point data – as data samples with Normal and Uniform probability distribution,
- string data – as text data received from the text generator using Deep learning models.
The time and date data will be generated with the built-in Pandas functions with time-series data for one day. Also, note that we’ll determine a number (value variable) for the resulting dataset size.
import pandas as pd # data range value = 1000 # time periods (day * hours * minutes) periods = 1 * 24 * 60 # time-series data time_data = pd.date_range('2019-11-01', periods=periods, freq='T') time_data = time_data[:value]
In this case, the data are presented in Timestamp format, but real data of such type are usually presented as string variables. Therefore, on the step of Pandas DataFrame formation, we will present these data in string format for further processing in Dremio.
First, let’s present floating-point values as samples and then plot the densities for these distributions.
# Normal data # distribution parameters mu = 2.0 sigma = 0.5 # generating normal values norm_rv = sts.norm(loc=mu, scale=sigma) normal_data = norm_rv.rvs(size=value) # Uniform data # distribution parameters left_limit = 1 right_limit = 4 # generating uniform values uniform_rv = sts.uniform(left_limit, right_limit-left_limit) uniform_data = uniform_rv.rvs(value) print(normal_data)
Next, we’ll show the resulted data samples, as based on the probability density plots obtained from the generated data. To make the presentation simple and compact, we’ll write the code for drawing graphs only for normal data, since for other data it will be identical.
# PDF by samples of Normal data samples = np.linspace(0, 4, value) pdf_normal = norm_rv.pdf(samples) plt.plot(samples, pdf_normal) plt.title('Normal distribution') plt.ylabel('probability') plt.xlabel('samples') plt.grid() plt.show()
The
data acquired from the normal distribution usually describes the values determined by measuring most physical quantities. And uniform data can characterize data from a simple linear system or dependencies limited by two parameters.
Similarly, we will generate samples of integer data from Binomial and Poisson probability distributions.
# Binomial data # distribution parameters n_binomial = 20 p_binomial = 0.2 # generating binomial values binomial_rv = sts.binom(n_binomial, p_binomial) binomial_data = binomial_rv.rvs(value) # Poisson data # distribution parameter poisson_parameter = 15 # generating poisson values poisson_rv = sts.poisson(poisson_parameter) poisson_data = poisson_rv.rvs(value) print(poisson_data)
The plots describing these data samples are similar to the previous ones, except for using the cumulative distribution function.
So we have two columns of integer data for our resulting dataset. These data samples can describe sequential data systems.
Next, we will generate text data using TensorFlow and TextGenRNN libraries. We will use a default Deep Learning model from the TextGenRNN library, which includes several LSTM and RNN layers.
The generating texts process will start with the keywords that we will define in advance – all other parameters of the model are set by default.
from textgenrnn import textgenrnn # deep learning model textgen = textgenrnn() def generate_text(text, sentences): '''generate number of sentences by key-text''' generated_texts = textgen.generate(n=sentences, prefix=text, return_as_list=True) return generated_texts # key words key_words = ['features','parameters','values','keys','numbers'] # generating process text_list = [] for word in key_words: text_by_word = generate_text(word, round(value/len(key_words))) text_list.append(text_by_word) # to the list and shuffle text_list = np.array(text_list).flatten() np.random.shuffle(text_list) print(text_list[:5])
As a result, we have obtained a list of texts containing in the beginning the keywords that we can process with Dremio. The resulting data set we will present in the form of the Pandas Dataframe.
df = pd.DataFrame(list(zip(time_data, normal_data, uniform_data, binomial_data, poisson_data, text_list)), columns = ['time_data','normal_data', uniform_data', 'binomial_data', 'poisson_data', 'text']) df['time_data'] = df.time_data.astype(str) df.head(5)
Now, we’ll form this data in the data packages for transferring by RabbitMQ queues service. So let’s present them as JSON-packages.
# row to JSON json_data = df.apply(lambda row: json.loads(row.to_json()), axis=1) # to the list of JSON- packages send_data = json_data.to_numpy() #example of package send_data[0]
Now we have all data for processing with RabbitMQ service.
Data processing with RabbitMQ
RabbitMQ provides a queuing service where different applications, users, or other systems components can connect to, and send or receive data messages.
The RabbitMQ service includes the following components:
- Data producer – procedures for formation and sending data to messages-queue in the RabbitMQ service
- Messages-handler – component for message processing and queue formation. Several settings can be assigned, such as Direct, Topic, Fanout, and Headers, that define the rules for redirecting messages to the queue
- Queue – queue messages received from data producer
- Data consumer – procedures of receiving data from RabbitMQ service
The general structure of the data queuing system with RabbitMQ service is shown below.
Algorithm of presented system will include the following steps:
- Data producer gets data messages
- Data producer sends data messages to the RabbitMq service
- Data messages reach messages-handler and then are redirected to the queue according to the message processing rule
- Data message remains in the queue until the data consumer processes it
- Data consumer processes the messages
Usually, the RabbitMq service is deployed on the high-performance server. But in our case, we will use a local server with the direct processing rule for messages-handler. To do this, we need a pre-installed RabbitMq service which will be also used to create applications for data producer and consumer.
We are going to use the pika library to create applications for data processing with RabbitMq service.
Data producer
The Data producer algorithm includes the following steps:
- create connection to the RabbitMq service,
- initialize connection to a specific RabbitMq channel,
- send messages to the queue defining the default setting for messages-handler,
- close connections to the RabbitMq service after completing the message processing.
Using this algorithm the application for Data producer can be visualized by the following python script.
import pika # create connection pika_connector = pika.ConnectionParameters(host='localhost') connection = pika.BlockingConnection(pika_connector) # initialize channel connection channel = connection.channel() # define queue channel.queue_declare(queue='channel') # send messages to the queue for ind, message in enumerate(send_data): channel.basic_publish(exchange='', routing_key='channel', body=message) # close connection connection.close() print(f'{ind+1} messages was sent to queue')
As a result, we get the data messages uploaded into RabbitMq service. And they will be stored there until the data consumer requests data from the queue.
Data consumer
Data producer and consumer algorithms consist of similar steps, so let’s consider just those that are different:
- Data consumer is set to standby mode for data messages
- The callback function used for outputting the data message from the queue The script for a data consumer is shown below.
# create connection connection = pika.BlockingConnection(pika_connector) # initialize channel connection channel = connection.channel() # define queue channel.queue_declare(queue='channel') print('Waiting for messages') receive_data = [] def callback(ch, method, properties, body): '''call back for messages''' received_data.append(body) print(f'Received message: {body,}') # define queue parameters channel.basic_consume('channel', callback, auto_ack=True) # receive messages try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() connection.close()
Next, we will perform additional processing of received messages (JSON conversion) and present them in Pandas dataframe for further analysis.
# data processing processed_messages = [] def after_procession(message): '''process received message''' # convert to string message_string = str(message, 'utf-8') # convert to dic message_dic = json.loads(message_string.replace("'", "\"")) processed_messages.append(message_dic) # JSON-format For message in receive_data: after_procession(message) # load to Dataframe df = pd.DataFrame.from_records(processed_messages) df.to_csv(path_or_buf='data_proccesed.csv', index=False) df.head(5)
Now, the received data is ready for storage in AWS S3 and further analysis with the Dremio platform.
AWS S3
So we are going to use this service as storage for data messages received from the RabbitMq service. The main data processes in AWS S3 are presented in the following diagram.
Accordingly, the AWS S3 data processing algorithm consists of three steps:
- Creating an AWS S3 bucket or connect to an existing one
- Uploading data messages to AWS S3 bucket
- Configuring access and credentials settings These steps you can perform with the AWS GUI interface as well as with the AWS SDK API service. The first way has been described in the previous tutorials so that we will focus on the second one with the AWS SDK Python.
The script for creating an AWS S3 Bucket with SDK Python is as follows.
import boto3 from botocore.exceptions import ClientError def create_bucket(bucket_name, region=None): """Create an S3 bucket""" try: if region is None: s3_client = boto3.client('s3') s3_client.create_bucket(Bucket=bucket_name) else: s3_client = boto3.client('s3', region_name=region) location = {'LocationConstraint': region} s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration=location) except ClientError: return False create_bucket('rabbitmq-data', region='eu-north-1')
This script allows you to specify the AWS region, which is important for optimal data performance. The results of its execution are shown on the AWS control console.
We’ll also implement the uploading data-messages process with a python script (AWS SDK python).
def upload_file(file_name, bucket, object_name=None): """Upload a file to an S3 bucket""" # If S3 object_name was not specified, use file_name if object_name is None: object_name = file_name # Upload the file s3_client = boto3.client('s3') try: response = s3_client.upload_file(file_name,bucket,object_name) except ClientError: return False # data upload upload_file('data_proccesed.csv','rabbitmq-data','rabbitmq-messages')
As a result, we’ll upload data-messages to the specified AWS S3 bucket.
Next, we’ll configure the access settings for the AWS S3 bucket. In our case, we will select public access. You should choose another setting and configure private access. Otherwise your data will be available to everyone.
Also, we need to define the credentials options in the Identity and Access Management console. Important credentials parameters to access the uploaded data are Access keys and Access key ID.
At this stage, we have loaded data-messages and defined the access to them. Next, we will preprocess and analyze them using the Dremio platform.
Data curation with Dremio
To connect a data source to Dremio, we need to select the type of data source and specify credential parameters
Also, this stage requires entering AWS S3 credentials parameters received earlier. Next, it is necessary to determine the type of connected data and save the result as a Dremio’s virtual dataset.
It is important to select the data format, delimiter (for CSV-file), and the data column’s name extraction option. We will define the data format for each data column. We will show the transformation of data format for the datetime and float data.
At this stage, it is important to define null positions and delete the ones that do not match and are duplicated in the data. After all the transformations, the data will be presented as follows.
Next, let’s extract the keywords from the text column by which we generated the sentences. For this purpose, we need to define the patterns using a regular expression.
As a result, we will get a text column with keywords.
At this point the dataset can be saved and the resulting data can be analyzed with any BI or data science tool.