h2h2

13 minute read · January 16, 2025

Delivering Effortless Data Workflows with Dremio Pipes in AWS

Ashley Farrugia

Ashley Farrugia · Master Principal Solutions Architect

This guide will equip you with the expertise to easily configure Pipes in Dremio Software, revolutionising your data workflows. By automating the detection and ingestion of new files, Dremio Pipes ensures your data remains consistently up-to-date and readily accessible for exploration within your data consumption layer. This empowers your organisation to gain faster insights and make more informed decisions.

Auto-Ingest Pipes offer a comprehensive solution designed to optimise every stage of your data processing pipeline. They transcend the limitations of traditional data ingestion tools, such as scalability constraints, the need for manual intervention, and the persistent challenges of ensuring data quality. With Dremio Pipes you will experience the following key advantages:

  • Continuous Data Observability: Enhance data reliability and trust by monitoring the health of data ingestion pipelines, minimising disruptions to critical business processes.
  • Unparalleled Efficiency: Maximise your team's productivity by leveraging Auto-Ingest Pipes to streamline the creation, management, and monitoring of your data ingestion pipelines.
  • Unwavering Reliability: Benefit from exactly-once semantics; leveraging AWS SQS FIFO, and robust file deduplication, ensuring data integrity and eliminating the risk of hidden errors or duplicate entries.
  • Effortless Scalability: Event-driven architecture empowers Auto-Ingest Pipes to seamlessly adapt to the ever-growing volume and complexity of your data, ensuring your pipeline remains agile and responsive to your evolving business needs.

Figure 1.0 - Dremio Pipe’s Architecture

By following this guide, we will create the following components:

  • AWS SQS Queue - This will serve as a message queue that will store event notifications from S3 that new files have become available.
  • Dremio Pipe  - This will act as a listener of the SQS queue to discover new events and run the ingestion procedure to load the new data into the Iceberg table.

Before we embark on this journey of data automation, let's ensure you have the following prerequisites.

  • Dremio Cluster - this cluster will serve as the processing engine for the new data destined for ingestion into an Iceberg table within an S3 data lake.
  • AWS IAM Role - An AWS IAM Role will be provisioned with the necessary permissions to facilitate the creation of SQS queues and S3 Buckets.
  • AWS S3 Bucket - This designated S3 bucket will function as the central repository for all uploaded files, awaiting loading into the Iceberg table.

Configuring SQS Queue

  1. Navigate to the AWS region that you want to deploy your SQS Queue within, for the purpose of this document we are going to use N.Virginia (us-east-1).
  2. Search for the SQS Service, select “Queues” and click “Create queue”.
  3. Create a name for the queue e.g., exampleingestion.
  4. Set the “Type” to Standard.
  5. Click “Advanced” on the Access Policy section and insert the following config updating the placeholders with your values.
{
  "Version": "2012-10-17",
  "Id": "example-ID",
  "Statement": [
    {
      "Sid": "example-statement-ID",
      "Effect": "Allow",
      "Principal": {
        "Service": "s3.amazonaws.com"
      },
      "Action": "SQS:SendMessage",
      "Resource": "arn:aws:sqs:us-east-1:{AWS_ACCOUNT}:exampleingestion",
      "Condition": {
        "StringEquals": {
          "aws:SourceAccount": "{AWS_ACCOUNT}"
        },
        "ArnLike": {
          "aws:SourceArn": "arn:aws:s3:*:*:{S3_Bucket}"
        }
      }
}]}
  1. Finally, Click “Create queue”.

Configuring S3

  1. Locate the S3 bucket that you wish to use for the Autoingestion Pipe and navigate to the “Properties Tab”. 
  2. Scroll down until you get to “Event notifications”
  3. Click “Create event notification”
  4. Create name e.g., “ingestion_demo”
  5. Optional: if necessary set the “Prefix” to a folder within the S3 bucket. For the purpose of this example I’m going to set the Prefix to be “ingestion_demo”.
  6. Under “Event types” and “Object creation”, tick Put and Post checkboxes separately.
  7. Scroll down to “Destination” and select “SQS Queue”
  8. Click the “SQS Queue” dropdown and locate the SQS Queue created above.
  9. Click “Save changes” to create the event notification.

Configuring the Ingestion Pipe in Dremio

  1. Before we create the Pipe, we need to create an example Iceberg Table called “ingestion_demo” in Dremio using the following code.
CREATE TABLE s3.ingestion_demo (
    "loc_id" int,
    "sensor_id" int, 
    "measure" varchar,
    "ts" timestamp,
    "measurement_period_type" varchar,
    "value" DECIMAL(10,5)
);
  1. Insert example data into the Iceberg Table using the following code.
INSERT INTO s3.ingestion_demo VALUES
    (1, CAST((RAND() * (2000 - 1) + 1) AS INT), 'oil_temperature', '2023-10-30 00:00:00', 'Instantaneous', round(RAND() * (200 - 50) + 50, 5)),
    (1, CAST((RAND() * (2000 - 1) + 1) AS INT), 'oil_temperature', '2023-10-30 00:00:00', 'Instantaneous', round(RAND() * (200 - 50) + 50, 5)),
    (1, CAST((RAND() * (2000 - 1) + 1) AS INT), 'oil_temperature', '2023-10-30 00:00:00', 'Instantaneous', round(RAND() * (200 - 50) + 50, 5)),
    (1, CAST((RAND() * (2000 - 1) + 1) AS INT), 'oil_temperature', '2023-10-30 00:00:00', 'Instantaneous', round(RAND() * (200 - 50) + 50, 5)),
    (2, CAST((RAND() * (2000 - 1) + 1) AS INT), 'oil_temperature', '2023-10-31 00:00:00', 'Instantaneous', round(RAND() * (200 - 50) + 50, 5));
  1. Create a Pipe in Dremio using this example code.
CREATE PIPE ajf_ingestion_demo_refmeasures_v4
NOTIFICATION_PROVIDER AWS_SQS
NOTIFICATION_QUEUE_REFERENCE "arn:aws:sqs:us-east-1:{AWS_ACCOUNT}:ingestion_demo"
AS COPY INTO "s3".ingestion_demo
FROM '@s3/"{S3_Bucket}"/ingestion_demo'
FILE_FORMAT 'csv'
(EXTRACT_HEADER 'true', EMPTY_AS_NULL 'true', TIMESTAMP_FORMAT 'YYYY-MM-DD HH24:MI:SS.FFF')

Once you have successfully configured the Pipe in Dremio, then you can simply insert the following sample csv data as a file into your S3 bucket and watch as it gets loaded into your new Iceberg table.

loc_id,sensor_id,measure,ts,measurement_period_type,val
3,337,oil_temperature,2024-04-05 14:41:23.042,Instantaneous,105.51163
3,917,oil_temperature,2024-04-05 14:41:23.042,Instantaneous,82.37603
3,1699,oil_temperature,2024-04-05 14:41:23.042,Instantaneous,121.61005
2,1582,oil_temperature,2024-04-05 14:28:33.258,Instantaneous,139.75838
2,801,oil_temperature,2024-04-05 14:28:33.258,Instantaneous,121.00335
2,153,oil_temperature,2024-04-05 14:28:33.258,Instantaneous,

Tips

  1. To change how regularly Dremio will process the new data file events you can set the  dremio.ingestion.schedule.period.mins support key to any value in minutes. The default value of 5 is there to provide a balanced performance without overloading the cluster.
  2. Validate that when files are uploaded to S3 the messages are produced to SQS before configuring the Pipe in Dremio. This can be done via CloudWatch logs, AWS console or even through the AWS CLI. 
  3. Use the following system tables sys.copy_file_history and sys.copy_errors_history to monitor the health of Dremio pipes.

Troubleshooting

File has been uploaded to S3 but the data is not available 

This might seem simple but it might be worth validating that the event_notification has been configured correctly to the right SQS Queue. If this is configured correctly then please validate that the message is in the queue. This can be done via CloudWatch logs, AWS console or CLI. If the message queue has been populated correctly with the new event message then please refer to SQS queue has been updated but Dremio is failing to load the data  for further troubleshooting.

SQS queue has been updated but Dremio is failing to load the data 

To get information about what might be causing this behaviour then you should query the sys.copy_file_history table. This table will provide any error messages under the first_error_message column as a result of running the pipe. This should provide enough information for you to troubleshoot the underlying issues. For example, “Failure parsing the formatting string at column 11 of: YYYY-MM-DDTHH24:MI:SS.FFF Invalid date format string 'YYYY-MM-DDTHH24:MI:SS.FFF' at position 11”, would signify that there is an issue with the expected data format, so you might need to adjust the TIMESTAMP_FORMAT attribute when defining the pipe.

Conclusion

To conclude, deploying Dremio Pipes within your organisation offers significant enhancements to your data workflows. Auto-ingest Pipes provide a robust and automated solution that overcomes the limitations of traditional data ingestion methods, ensuring reliability and minimising disruptions. By streamlining ingestion processes, guaranteeing data integrity, and eliminating errors, Dremio Pipes maximise data team productivity and free up valuable resources for revenue-generating activities. Furthermore, Dremio Pipes offer effortless scalability, enabling seamless adaption to the ever-growing volumes and complexity of your data. This agility and responsiveness ensure your organisation can leverage valuable insights and make more informed decisions with unprecedented speed and efficiency.

References

  1. Introducing Auto Ingest Pipes: Event-Driven ingestion made easy
  2. Autoingesting Data into Apache Iceberg
  3. Create Pipe
  4. Dremio COPY INTO command

Ready to Get Started?

Enable the business to create and consume data products powered by Apache Iceberg, accelerating AI and analytics initiatives and dramatically reducing costs.