24 minute read · November 12, 2019

Analyzing Multiple Stream Data Sources using Dremio and Python

Dremio Authors: Insights and Perspectives

Dremio Authors: Insights and Perspectives · Dremio Team

Introduction

New technologies, communication systems, and information processing algorithms demand data rates, availability, and performance targets. Accordingly, the data processing procedures implemented with data (messages) calls for technologies capable of handling this high demand. One of these technologies is RabbitMQ – which is used to develop service-oriented architecture services (SOA) and distributed resource-intensive operations.

However, it is worth to note that the messages from the queue data can flow from different types of data sources and thereby have different formats. As a result, the data processing and analyses are complicated. To resolve this issue, various services are used that can combine different data types into a single database.

This article will focus on data processing and analysis system using RabbitMQ as queries service, Python as a flexible environment, AWS S3 as a fast and secured data storage, and Dremio will be used for combining all data messages from the queue into a single database.

In this tutorial we will execute the following steps:

  • Generate data of different types that will flow from various information systems, devices, etc.
  • Create a data producer and a consumer with Python-environment tools to packet data communications processes in RabbitMQ service.
  • Transmission and storage of processed messages using the RabbitMQ and AWS S3 services.
  • Curation of data using Dremio.

Python packages

image alt text

Environment

Now we will describe in more detail each step of presented algorithm with relevant tools.

Data sources

We will use random number generators to create different types of data for the required dataset to make data processing simple, clear and structured. So we are going to use scipy.stats library tools and the related probability density functions (PDF) of the random variables.

The datasets will consist of the following data types:

  • time and date data – as a time series at the time interval,
  • integer data – as data samples with Binomial and Poisson probability distributions,
  • floating-point data – as data samples with Normal and Uniform probability distribution,
  • string data – as text data received from the text generator using Deep learning models.

The time and date data will be generated with the built-in Pandas functions with time-series data for one day. Also, note that we’ll determine a number (value variable) for the resulting dataset size.

import pandas as pd
# data range
value = 1000
# time periods (day * hours * minutes)
periods = 1 * 24 * 60
# time-series data
time_data = pd.date_range('2019-11-01', periods=periods, freq='T')
time_data = time_data[:value]
image alt text

In this case, the data are presented in Timestamp format, but real data of such type are usually presented as string variables. Therefore, on the step of Pandas DataFrame formation, we will present these data in string format for further processing in Dremio.

First, let’s present floating-point values as samples and then plot the densities for these distributions.

# Normal data
# distribution parameters
mu = 2.0
sigma = 0.5
# generating normal values
norm_rv = sts.norm(loc=mu, scale=sigma)
normal_data = norm_rv.rvs(size=value)
# Uniform data
# distribution parameters
left_limit = 1
right_limit = 4
# generating uniform values
uniform_rv = sts.uniform(left_limit, right_limit-left_limit)
uniform_data = uniform_rv.rvs(value)
print(normal_data)
image alt text

Next, we’ll show the resulted data samples, as based on the probability density plots obtained from the generated data. To make the presentation simple and compact, we’ll write the code for drawing graphs only for normal data, since for other data it will be identical.

# PDF by samples of Normal data
samples = np.linspace(0, 4, value)
pdf_normal = norm_rv.pdf(samples)
plt.plot(samples, pdf_normal)
plt.title('Normal distribution')
plt.ylabel('probability')
plt.xlabel('samples')
plt.grid()
plt.show()

image alt textThe

data acquired from the normal distribution usually describes the values determined by measuring most physical quantities. And uniform data can characterize data from a simple linear system or dependencies limited by two parameters.

Similarly, we will generate samples of integer data from Binomial and Poisson probability distributions.

# Binomial data
# distribution parameters
n_binomial = 20
p_binomial = 0.2
# generating binomial values
binomial_rv = sts.binom(n_binomial, p_binomial)
binomial_data = binomial_rv.rvs(value)
# Poisson data
# distribution parameter
poisson_parameter = 15
# generating poisson values
poisson_rv = sts.poisson(poisson_parameter)
poisson_data = poisson_rv.rvs(value)
print(poisson_data)
image alt text

The plots describing these data samples are similar to the previous ones, except for using the cumulative distribution function.

image alt text

So we have two columns of integer data for our resulting dataset. These data samples can describe sequential data systems.

Next, we will generate text data using TensorFlow and TextGenRNN libraries. We will use a default Deep Learning model from the TextGenRNN library, which includes several LSTM and RNN layers.

The generating texts process will start with the keywords that we will define in advance – all other parameters of the model are set by default.

from textgenrnn import textgenrnn
# deep learning model
textgen = textgenrnn()
def generate_text(text, sentences):
	'''generate number of sentences by key-text'''
	generated_texts = textgen.generate(n=sentences,
                                       prefix=text,
                                       return_as_list=True)
	return generated_texts
# key words
key_words = ['features','parameters','values','keys','numbers']
# generating process
text_list = []
for word in key_words:
	text_by_word = generate_text(word, round(value/len(key_words)))
	text_list.append(text_by_word)
# to the list and shuffle
text_list = np.array(text_list).flatten()
np.random.shuffle(text_list)
print(text_list[:5])
image alt text

As a result, we have obtained a list of texts containing in the beginning the keywords that we can process with Dremio. The resulting data set we will present in the form of the Pandas Dataframe.

df = pd.DataFrame(list(zip(time_data, normal_data, uniform_data,
                           binomial_data, poisson_data, text_list)),
           	   columns = ['time_data','normal_data', uniform_data',
                        	'binomial_data', 'poisson_data', 'text'])
df['time_data'] = df.time_data.astype(str)
df.head(5)
image alt text

Now, we’ll form this data in the data packages for transferring by RabbitMQ queues service. So let’s present them as JSON-packages.

# row to JSON
json_data = df.apply(lambda row: json.loads(row.to_json()), axis=1)
# to the list of JSON- packages
send_data = json_data.to_numpy()
#example of package
send_data[0]
image alt text

Now we have all data for processing with RabbitMQ service.

Data processing with RabbitMQ

RabbitMQ provides a queuing service where different applications, users, or other systems components can connect to, and send or receive data messages.

The RabbitMQ service includes the following components:

  • Data producer – procedures for formation and sending data to messages-queue in the RabbitMQ service
  • Messages-handler – component for message processing and queue formation. Several settings can be assigned, such as Direct, Topic, Fanout, and Headers, that define the rules for redirecting messages to the queue
  • Queue – queue messages received from data producer
  • Data consumer – procedures of receiving data from RabbitMQ service

The general structure of the data queuing system with RabbitMQ service is shown below.

image alt text

Algorithm of presented system will include the following steps:

  • Data producer gets data messages
  • Data producer sends data messages to the RabbitMq service
  • Data messages reach messages-handler and then are redirected to the queue according to the message processing rule
  • Data message remains in the queue until the data consumer processes it
  • Data consumer processes the messages

Usually, the RabbitMq service is deployed on the high-performance server. But in our case, we will use a local server with the direct processing rule for messages-handler. To do this, we need a pre-installed RabbitMq service which will be also used to create applications for data producer and consumer.

We are going to use the pika library to create applications for data processing with RabbitMq service.

Data producer

The Data producer algorithm includes the following steps:

  • create connection to the RabbitMq service,
  • initialize connection to a specific RabbitMq channel,
  • send messages to the queue defining the default setting for messages-handler,
  • close connections to the RabbitMq service after completing the message processing.

Using this algorithm the application for Data producer can be visualized by the following python script.

import pika
# create connection
pika_connector = pika.ConnectionParameters(host='localhost')
connection = pika.BlockingConnection(pika_connector)
# initialize channel connection
channel = connection.channel()
# define queue
channel.queue_declare(queue='channel')
# send messages to the queue
for ind, message in enumerate(send_data):
    channel.basic_publish(exchange='',
                          routing_key='channel',
                          body=message)
# close connection
connection.close()
print(f'{ind+1} messages was sent to queue')
image alt text

As a result, we get the data messages uploaded into RabbitMq service. And they will be stored there until the data consumer requests data from the queue.

Data consumer

Data producer and consumer algorithms consist of similar steps, so let’s consider just those that are different:

  • Data consumer is set to standby mode for data messages
  • The callback function used for outputting the data message from the queue The script for a data consumer is shown below.
# create connection
connection = pika.BlockingConnection(pika_connector)
# initialize channel connection
channel = connection.channel()
# define queue
channel.queue_declare(queue='channel')
print('Waiting for messages')
receive_data = []
def callback(ch, method, properties, body):
	'''call back for messages'''
	received_data.append(body)
	print(f'Received message: {body,}')
# define queue parameters
channel.basic_consume('channel', callback, auto_ack=True)
# receive messages
try:
	channel.start_consuming()
except KeyboardInterrupt:
	channel.stop_consuming()
connection.close()

Next, we will perform additional processing of received messages (JSON conversion) and present them in Pandas dataframe for further analysis.

# data processing
processed_messages = []
def after_procession(message):
'''process received message'''
	# convert to string
	message_string = str(message, 'utf-8')
	# convert to dic
	message_dic = json.loads(message_string.replace("'", "\""))
    processed_messages.append(message_dic)
# JSON-format
For message in receive_data:
	after_procession(message)
# load to Dataframe
df = pd.DataFrame.from_records(processed_messages)
df.to_csv(path_or_buf='data_proccesed.csv', index=False)
df.head(5)
image alt text

Now, the received data is ready for storage in AWS S3 and further analysis with the Dremio platform.

AWS S3

So we are going to use this service as storage for data messages received from the RabbitMq service. The main data processes in AWS S3 are presented in the following diagram.

image alt text

Accordingly, the AWS S3 data processing algorithm consists of three steps:

  • Creating an AWS S3 bucket or connect to an existing one
  • Uploading data messages to AWS S3 bucket
  • Configuring access and credentials settings These steps you can perform with the AWS GUI interface as well as with the AWS SDK API service. The first way has been described in the previous tutorials so that we will focus on the second one with the AWS SDK Python.

The script for creating an AWS S3 Bucket with SDK Python is as follows.

import boto3
from botocore.exceptions import ClientError
def create_bucket(bucket_name, region=None):
	"""Create an S3 bucket"""
	try:
    	if region is None:
        	s3_client = boto3.client('s3')
        	s3_client.create_bucket(Bucket=bucket_name)
    	else:
        	s3_client = boto3.client('s3', region_name=region)
        	location = {'LocationConstraint': region}
            s3_client.create_bucket(Bucket=bucket_name,
                       	         CreateBucketConfiguration=location)
	except ClientError:
    	return False
create_bucket('rabbitmq-data', region='eu-north-1')

This script allows you to specify the AWS region, which is important for optimal data performance. The results of its execution are shown on the AWS control console.

image alt text

We’ll also implement the uploading data-messages process with a python script (AWS SDK python).

def upload_file(file_name, bucket, object_name=None):
	"""Upload a file to an S3 bucket"""
	# If S3 object_name was not specified, use file_name
	if object_name is None:
    	object_name = file_name
 	# Upload the file
	s3_client = boto3.client('s3')
	try:
    	response = s3_client.upload_file(file_name,bucket,object_name)
	except ClientError:
    	return False
# data upload
upload_file('data_proccesed.csv','rabbitmq-data','rabbitmq-messages')

As a result, we’ll upload data-messages to the specified AWS S3 bucket.

image alt text

Next, we’ll configure the access settings for the AWS S3 bucket. In our case, we will select public access. You should choose another setting and configure private access. Otherwise your data will be available to everyone.

image alt text

Also, we need to define the credentials options in the Identity and Access Management console. Important credentials parameters to access the uploaded data are Access keys and Access key ID.

image alt text

At this stage, we have loaded data-messages and defined the access to them. Next, we will preprocess and analyze them using the Dremio platform.

Data curation with Dremio

To connect a data source to Dremio, we need to select the type of data source and specify credential parameters

image alt text

image alt text

Also, this stage requires entering AWS S3 credentials parameters received earlier. Next, it is necessary to determine the type of connected data and save the result as a Dremio’s virtual dataset.

image alt text
image alt text

It is important to select the data format, delimiter (for CSV-file), and the data column’s name extraction option. We will define the data format for each data column. We will show the transformation of data format for the datetime and float data.

image alt text
image alt text

At this stage, it is important to define null positions and delete the ones that do not match and are duplicated in the data. After all the transformations, the data will be presented as follows.

image alt text

Next, let’s extract the keywords from the text column by which we generated the sentences. For this purpose, we need to define the patterns using a regular expression.

image alt text

As a result, we will get a text column with keywords.

image alt text
image alt text

At this point the dataset can be saved and the resulting data can be analyzed with any BI or data science tool.

get started

Get Started Free

No time limit - totally free - just the way you like it.

Sign Up Now
demo on demand

See Dremio in Action

Not ready to get started today? See the platform in action.

Watch Demo
talk expert

Talk to an Expert

Not sure where to start? Get your questions answered fast.

Contact Us

Ready to Get Started?

Enable the business to create and consume data products powered by Apache Iceberg, accelerating AI and analytics initiatives and dramatically reducing costs.