20 minute read · September 1, 2022
The Life of a Write Query for Apache Iceberg Tables
· Senior Tech Evangelist, Dremio
Apache Iceberg is an open data lakehouse table format that provides your data lake with critical features like time travel, ACID transaction, partition evolution, schema evolution, and more. You can read this article or watch this video to learn more about the architecture of Apache Iceberg. In this article, we’d like to explain how write queries work for Apache Iceberg.
In order to understand how write queries work we first need to understand the metadata structure of Apache Iceberg, and then examine step by step how that structure is used by query engines to plan and execute the query.
Apache Iceberg 101
Apache Iceberg has a tiered metadata structure and it’s key to how Iceberg provides high-speed queries for both reads and writes. Let’s summarize the structure of Apache Iceberg to see how this works. If you are already familiar with Iceberg’s architecture then feel free to skip ahead to the section titled “How a Query Engine Processes the Query.”
Data Layer
Starting from the bottom of the diagram, the data layer holds the actual data in the table. It’s made up of two types of files:
Data files – store the data in file formats such as Parquet or ORC.
Delete files – track records that still exist in the data files, but that should be considered as deleted.
Metadata Layer
Apache Iceberg uses three tiers of metadata files which cover three different scopes.
Manifest files – A subset of the snapshot. These files track the individual files in the data layer in that subset along with metadata for further pruning.
Manifest lists – Defines a snapshot of the table and lists all the manifest files that make up that snapshot with metadata on those manifest files for pruning.
Metadata files – Defines the table, and tracks manifest lists, current and previous snapshots, schemas, and partition schemes.
The Catalog
Tracks a reference/pointer to the current metadata file. This is usually some store that can provide some transactional guarantees like a relational database (Postgres, etc.) or metastore (Hive, Project Nessie, Glue).
How a Query Engine Processes the Query
So let’s run an INSERT
, DELETE
, and MERGE INTO
against a table called orders
to see the behavior.
Running an Insert Query
INSERT INTO orders VALUES (...);
1. Sending the query to the engine
The query is submitted to the query engine that parses the query. The engine now needs the table data to begin planning the query.
2. Checking the catalog and planning
The engine first reaches out to the catalog to find the current metadata file’s path and reads that metadata file. Even though it’s just appending data to the table, it does this to determine two things:
- The table's schema to make sure the data it’s going to write fits that schema, as well as which fields can versus can’t have null values.
- The partitioning of the table to determine how to organize the data to be written.
3. Writing the data files
Since no existing data files will be affected because it’s an INSERT, we can begin writing new data files. At this point, all new records will be written into files based on the partitioning scheme of the table. If the table has a configured sort order and the engine writing the data supports it, then the records will be sorted prior to writing the records to data files.
4. Writing the metadata files
Now that data files have been written, we will group these files into manifest files. For each manifest file, we’ll include the file paths to the data files it tracks and statistics about each data file, such as minimum and maximum values for each column in the data file. We’ll then write these manifest files out to the data lake.
Then, we will include all new or existing manifests associated with this snapshot in a new manifest list. In the manifest list, we’ll include the file paths to the manifest files and statistics about the write operation, such as how many data files it added, and information about the data referenced by the manifest files, such as the lower and upper bounds of their values for partition columns. We’ll then write this manifest list out to the data lake.
Finally, we will create a new metadata file to summarize the table as it stands based on the metadata file that was current before our INSERT, including the file path and details of the manifest list we just wrote as well as a reference to note that this manifest list is now the current manifest list/snapshot.
Now we just need to commit this to the catalog.
5. Committing the changes
The engine goes to the catalog again to ensure that no new snapshots appeared while the files were being written. This check prevents concurrent writers from overwriting each other’s changes. Writes will always be safe when done concurrently, as the writer that completes first gets committed and any other writes that conflict with the first writer’s changes will go back to step 3 or 4 and reattempt until successful or quit retrying and decide to fail.
Readers will always plan based on the current metadata file, so they won’t be affected by concurrent writes that have not yet committed.
Running a Delete Query
DELETE FROM orders WHERE order_ts BETWEEN '2022-05-01 10:00:00' AND '2022-05-31 10:00:00';
1. Sending the query to the engine
The query is submitted to the query engine that parses the query, and the engine now needs the table data to begin planning the query.
2. Checking the catalog and planning
The engine first reaches out to the catalog to find the current metadata file. It does this to determine a few things:
- The partitioning of the table to determine how to organize the data to be written.
- The current sequence ID of the table and what its transaction’s sequence ID will be. It is assumed that no other writes that conflict with this
INSERT
will be complete before this job is done since Iceberg leverages optimistic concurrency control (assume everything is good till it isn’t).
3. Writing the files
How the files are written is determined by whether the table has the delete strategy set to either “copy-on-write” (COW) or “merge-on-read” (MOR).
If the table is set to copy-on-write, it will go through the following process:
- Use the metadata to identify which files contain the data to be deleted.
- Read these files in their entirety to determine what is deleted.
- Write a new file to replace this file in the new snapshot, where this new file doesn’t have the data requested to be deleted.
If the table is set to merge-on-read, the process avoids rewriting data files to speed up the write, but the specific steps it will go through depends on the type of delete file the table is configured to use to track deleted records.
Position deletes
- It will scan the metadata to determine which data files have records that will be deleted.
- It will scan the data files to determine the positions of the deleted records.
- It will write a delete file for the partition that lists which records by position are deleted from which files.
Equality deletes
- It will not need to read any files, it will just skip to writing a delete file listing the values that specify which rows are to be deleted.
- During reads, the reader will have to reconcile the delete file against all records in the partition which may be expensive.
Which type of delete file will be written based on the table settings? The original data file will still be used in the snapshot, and the engine will reconcile the delete file in future reads of that snapshot. Read this article on COW vs. MOR to understand what situations fit each strategy best.
4. Writing the metadata files
Now that data files/delete files have been written, we will group these files into manifest files. For each manifest file, we’ll include the file paths to the data files it tracks and statistics about each data file, such as minimum and maximum values for each column in the data file. We’ll then write these manifest files out to the data lake.
Then, we will include all new or existing manifests associated with this snapshot into a new manifest list. In the manifest list, we’ll include the file paths to the manifest files and statistics about the write operation, such as how many data files it added, and about the data referenced by the manifest files, such as the lower and upper bounds of their values for partition columns. We’ll then write this manifest list out to the data lake.
Finally, we will create a new metadata file to summarize the table as it stands based on the metadata file that was current before our DELETE
, including the file path and details of the manifest list we just wrote as well as a reference to note that this manifest list is now the current manifest list/snapshot.
Now we just need to commit this to the catalog.
5. Committing the changes
The engine goes to the catalog again to ensure that no new snapshots appeared while the files were being written. This check prevents concurrent writers from overwriting each other’s changes. Writes will always be safe when done concurrently, as the writer that completes first gets committed and any other writes that conflict with the first writer’s changes will reattempt until successful or quit retrying and decide to fail.
Readers will always plan based on the current metadata file, so they won’t be affected by concurrent writes that have not yet committed.
Running an Upsert/Merge Query
MERGE INTO orders o USING (SELECT * FROM orders_staging) s ON o.id = s.id WHEN MATCHED … WHEN NOT MATCHED
1. Sending the query to the engine
The query is sent to the query engine that parses the query, so the engine now needs the table data for both the target and staging table to begin planning the query.
2. Checking the catalog and planning
The engine first reaches out to the catalog to find the current metadata file. It does this to determine a few things:
- The table's schema to make sure the data it’s going to write fits that schema, as well as which fields can versus can’t have null values.
- The partitioning of the table to determine how to organize the data to be written.
- The current sequence ID of the table and what its transaction’s sequence ID will be. It is assumed that no other writes that conflict with this
INSERT
will be complete before this job is done since Iceberg leverages optimistic concurrency control (assume everything is good till it isn’t).
3. Writing the files
At this point, the engine will scan and load the relevant data from both the target and stage tables into memory and begin the matching and updating process. It will go through each row in the staging table to see if there is a match in the target table (via the id
field matching, since that’s what we specified). What will be tracked in memory as it identifies matches will be based on the configured writing mode for table merges, copy-on-write or merge-on-read.
If copy-on-write
, any data files where at least one record was updated will be rewritten to include updates and new records in the same partition. This means as updated files are read, the entire file must be saved to memory for the eventual writing of a new file.
If merge-on-read
, delete files will be generated so the older versions of records will be ignored on future reads, then new data files will be written with only the newly updated or inserted records. Only the records to be updated will be tracked in memory which can be a lot less memory intensive over the entire transaction.
If there is a match, it will carry out any instructions inside the WHEN MATCHED
clause typically to update certain fields.
If no match is found, it will carry out any instructions inside the WHEN NOT MATCHED
clause, typically to insert the record from the stage table into the target table.
4. Writing the metadata files
Now that data files/delete files have been written, we will group these files into manifest files. For each manifest file, we’ll include the file paths to the data files it tracks and statistics about each data file, such as minimum and maximum values for each column in the data file. We’ll then write these manifest files out to the data lake.
Then, we will include all new or existing manifests associated with this snapshot into a new manifest list. In the manifest list, we’ll include the file paths to the manifest files and statistics about the write operation, such as how many data files it added, and information about the data referenced by the manifest files, such as the lower and upper bounds of their values for partition columns. We’ll then write this manifest list out to the data lake.
Finally, we will create a new metadata file to summarize the table as it stands based on the metadata file that was current before our MERGE, including the file path and details of the manifest list we just wrote as well as a reference to note that this manifest list is now the current manifest list/snapshot.
Now we just need to commit this to the catalog.
5. Committing the changes
The engine goes to the catalog again to ensure that no new snapshots appeared while the files were being written. This check prevents concurrent writers from overwriting each other’s changes. Writes will always be safe when done concurrently, as the writer that completes first gets committed and any other writes that conflict with the first writer’s changes will reattempt until successful or quit retrying and decide to fail.
Readers will always plan based on the current metadata file, so they won’t be affected by concurrent writes that have not yet committed.
Conclusion
Apache Iceberg using optimistic concurrency and snapshot isolation can ensure ACID guarantees so you can write data to a table without affecting concurrent readers or writers. If you are considering adopting Apache Iceberg into your data lakehouse, check out this tutorial on how to create Apache Iceberg tables using AWS Glue and Dremio.