May 2, 2024
Syncing the Iceberg: Real-time Sailing at Terabyte Latitudes
Apache Iceberg, along with other table formats, promise ACID properties atop read-optimized and open file formats like Apache Parquet. But, is achieving this promise feasible when synchronising tables in near real-time? Will optimistic concurrency remain the optimal choice? What trade-offs will we encounter? Let’s embark on a journey across glacial seas and find out!
Topics Covered
Sign up to watch all Subsurface 2024 sessions
Speaker
Transcript
Note: This transcript was created using speech recognition software. While it has been reviewed by human transcribers, it may contain errors.
Antonio Murgia:
Hi, everyone. I’m happy to be here. Well, it can be a subsurface talk for me if I don’t have some technical difficulties. Anyway, thank you, everyone, for having me. I’ll share my screen and start my presentation. Fine? Okay. Let’s go. I hope you see the presentation. So since in the iceberg, or we could also have called this talk, how we’re cutting out 30 petabytes of shuffle data every day. As just said, I’m Antonio. I’m a software architect. And I love to get my hands dirty with high volume data problems. In my free time, I like to play a bunch of sports so that if you beat me, I’ll claim I’m the champion of something else. I am part of Agile Lab, a consulting firm that specializes in large-scale data management solutions, large both in terms of organizational structure and data volume.
We will briefly present today, we will briefly present the use case at the end. We will discuss our first solution and its related issues. And then we will propose announcements focusing on file pruning and shuffle strategies. We will see how those help us to reach our goals. Finally, we will draw some conclusions and give you a taste of the upcoming work that I will definitely dedicate a couple of talks to. So let’s keep in touch.
The Use Cases
The use case from a high level point of view is fairly simple. We’ve got a 100 terabyte HB stable. And thanks to a component known as Kafka proxy, we get all of the mutation happening in the original table as soon as they happen into a Kafka topic. The daily volume of the data that transits on that topic is around 98 gigabytes per day. And our goal is to replicate the changes to an iceberg table that will be a little bit lighter thanks to its columnar format. We also had some non-functional requirements or targets that was to achieve one hour replication latency and keep our compute spending under 10K euros a month, which is more or less eight M5D8X large EC2 instances running full time. And keep in mind that these data sizes are related to data at rest, which is very well compressed. The compression ratio for our data set is about 20X.
So let’s see. Let’s have a look at some sample data that we will find on our Kafka topic. So for every change that happens in the original table, we get a record that contains the timestamp at which the change happened on each base, the key of the row that changed, and the columns that changed in that operation with obviously their new value. Unlike usual change data capture technologies, we don’t get what’s called the after image, but only the new changed values. The reason for this resides in our HBase works, and I won’t go deeper into that.
Another challenge we need to overcome is that those CDC messages that we get on Kafka can be out of order. This means that we need to take into account the operation timestamp and apply only changes that are more recent than the last change. But we need to do that on each field separately, because we don’t get all the columns on every message. So for example, let’s say we start with values Antonio and 0.2. And let’s say they were both set a long time ago. So then we get a message that reports an update for column A equal to my surname, Morja, and that this change happened on the 2nd of April. Therefore we apply it, and our resulting row has Morja as column A. Later, we get another message with an update happening the 1st of April, which will mutate both column A and column B. But we know that we should only update column B, because column A was already updated on the 2nd of April, which is after 1st of April. So the final state of our table should be Morja and 1.2, and not Antonio and 1.2. So in order to apply this logic, we need one technical column that keeps the operation timestamp for each data column. Thanks to this data model, we are able to make our pipeline idempotent, and this will greatly simplify its operationalization, obviously.
We also analyzed both existing data distribution and update distribution, and we know that in our dataset every day has roughly the same amount of rows. And we also know that on any given day, we will receive mostly data from that day and the day before. Some from the past week, and then not much from the previous five years. So let’s recap. We know that mutation affects a subset of columns. Mutation can be propagated out of order. Data is uniformly distributed with regard to the date column. And the recent data is more prone to changes.
First Example
So let’s go with our first attempt. For this, we just create the table and run this SQL statement continuously. So let’s run through the SQL command. First, we define the merge condition, which is simply key equality. Then we say that if there is a match, the target data column will be set to the freshest value between the one present in the source table, which is our Kafka data, and the one in the target table. And also, we need to set the update timestamp to the greatest between the source and the target ones. We should repeat this to, let’s say, assignment for each data column we have. If there’s no match, we simply insert the row as it is. So under the hood, updates in Iceberg, by default, work in copy-on-write fashion. This means that when one row needs to be updated, the parquet file that contains that row is read and rewritten, changing the updated row.
A Spark application that performs a copy-on-write looks like this. First, we have a write semi-join between source and target table. This join is needed to understand which files need to be rewritten. Therefore, it’s lighter than a usual join, because all columns except the keys can be skipped when reading the underlined parquet files. In order to perform the actual write, we need a full outer join between the two tables. But we know the files affected, so we can skip all the other ones. Unlike the previous join, this one cannot prune any column. So let’s run our pipeline, consuming 10 gigabytes of mutation from Kafka. And given our data distribution, we get — we end up affecting roughly 10% of the target table underlying files. For the first join, we will read the whole table, keys, so 50 gigabytes, thanks to parquet column pruning, and then — and shuffle them in order to join them with the Kafka mutation. This will lead to one terabyte of shuffle because of that 20x compression rate. The second join is that we read 10% of the table, but cannot do any column pruning. So this will amount to about 6.5 terabyte of data to be read, 130 terabytes of data to be shuffled, and again, 6.5 terabyte to be written. Read and write amplification for this pipeline is quite huge, is more than 600x. But it’s the shuffle amplification that is too big. In order to shuffle all this data alone, we will need more than 100 nodes, and we are way out of button.
What About Merge on Read?
So what about merge and read? We know that for use cases where updates are sparse, like ours, Iceberg spec v2 introduced so-called merge and read write mode. Merge and read works in a way so that only changed or new rows are written as new parquet files, and in order to ignore older versions of rows, we write tomstones in the form of delete files. So the Spark app with merge and read mode enabled will look like this. We have only one job that performs the left outer join and outputs both new data file and delete files. We haven’t changed anything on the target table, so we will still eat 10% of files, and the mutations are still 10 gigabytes. So that’s because we don’t want to compare apples with oranges, obviously. The problem is that we run the join over the full table, reading all those 65 terabytes of data. That converts to 1.3 petabyte of shuffle data. The write amplification is now minimal, as you can see. We read 10 gigabytes and write out 10 gigabytes, but we have a 6,000x of write amplification, of read amplification, and 130,000x shuffle amplification, which are definitely unbearable. We go up to 1.4 million euros each month, which is way over beyond our budget.
This is just a quick recap of the results, and for comparison, in the real world, on the left, you have a small Italian car, and on the right, you have the biggest container ship in the world. The last data pipeline that we ran was like moving the entire ship on the right to make space for the little car on the left, something you wouldn’t never consider in the real world. So we know that merging read is not the solution, so how do we optimize this job? Well, as we optimize most data pipelines, taking advantage of our knowledge of data distribution.
File Pruning and Join Strategy
When working with Spark and Iceberg, we have two weapons primarily, file pruning and join strategy. File pruning means minimizing data reading by selectively scanning only relevant files based on query predicates and table layout. The idea is that if we read less data, we will also shuffle and write less data. On our current table layout, when selecting a given day, we end up scanning all the files to check if data of that day is part of the file. So, if we partition our table by year, month, and day, and we also apply bucket partitioning to the day column. For now, please ignore this partitioning scheme. It will make sense later. When running a query like the previous one, we will minimize the amount of files read because we can skip all the files that do not fall into the affected partition, so we just read the green ones.
So, how did we choose that kind of partitioning? First of all, we decided to partition by day because we know that most updates will need a couple of days, and also that some days will not be updated at all in most iterations. We also chose to partition the ID over 64 buckets because one day of data is roughly 60 gigabytes, and one gigabyte as a partition size is a good number, so we just split by 64. We get partitions that are not too big, neither too small. So, we just run the pipeline again still using copy and write because margin read doesn’t make much sense for us, and we are able to affect only 3 per cent of files. Therefore, all other measures are reduced by 300 per cent. Thanks to this change, we’ve achieved 70 per cent reduction in fine read and write and shuffle, and our spending goes down to 45,000 euros per month, which gets us closer to our goal. So, we’ve done our best with file pruning.
Let’s move to optimise the join strategy. Spark has four join strategies, sort merge, shuffle hash, broadcast, and storage partition join. Also, keep in mind that the data coming from Kafka is much smaller than the target table, so one side of the join is much smaller than the other. Shuffle hash and sort merge join work very similarly with regard to shuffling. Given each dataset and partition, we redistribute data across nodes to ensure that matching records from different datasets and partitions are located on the same node. The difference between sort and dash is the technique used to match the data locally. Sort merge is less memory-intensive but more CPU-intensive due to sorting, and shuffle hash is less CPU-intensive and more memory-intensive. Bottom line is that both need to shuffle the entire dataset. That is what we were seeing before.
Then we have broadcast join, which is a clever trick if one dataset is much smaller than the other. In this join strategy, the smaller dataset is replicated and sent to each executor, reducing overall network traffic and improving performance by avoiding most of the shuffle. The pros are obviously minimal shuffle, but one side of the join must fit into memory. You must know the size of the data beforehand so that you can broadcast that side, and it simply does not work for full outer joins. Our second job is to add a full outer join, so that cannot work.
Storage Partition Join
Storage partition join is the new kid on the block and is a join operation that works when both datasets are partitioned using a compatible partitioning scheme. In this way, Spark will collocate matching partitions by reading them directly, and the join operation can happen locally. The pros are there is zero shuffle, but the cons are both datasets must be materialised, so must be files, tables. Datasets must share the storage layout, and this job, this kind of join will limit the parallelism of your job to the number of partition joins, and you don’t want to limit the parallelism when using Spark because that’s the power of Spark, massive parallelism.
So, to recap, sort merge and shuffle hash are kind of already tested because they shuffle too much. Broadcast is not applicable, so we are left with the storage partition join which is complex to activate, but we definitely need to try it out.
So, we need to change our job a bit, because we need to materialise data. So, we read data from Kafka, write it to a staging table that has the same partitioning as the target table, and then our run merge, run our merge operation. The resulting job will need to feed the staging table, therefore, we have one step more that generates 200 gigabyte of shuffle to partition the data before writing it. Everything else is exactly the same as the previous experiments, but we have exactly zero shuffle. Let’s see how this pipeline works and performs. So, we are on Spark 3.4 and Iceberg 1.5. Our cluster can be as small as four nodes, and we run with the snap compression enabled. Each task which is now read from S3, the files related to one partition, join them together and write them out takes more or less one minute. We have 2,000 tasks since we have 2,000 partitions of one gigabyte size, two terabytes, as before. And, therefore, we run for 2,000 minutes, which is the task time, but since we have 120 cores running, we go down to 17 minutes as world clock time. So, we are definitely less than one hour and definitely less than 10,000 euros or dollars a month because we are able, with this setup, to go down to as low as 6,000 euros a month.
What Happens When an Executor is Lost?
So, we used both our weapons, and now the job is something we can think about running for real. But we have one more thing. We all know that Spark is resilient, but resilient doesn’t mean fast. So, what happens when an executor is lost? Cache and shuffle files are actually lost. And lost shuffle files means stages to be uncomputed and times lost. And we know that time equals money on any cloud vendor. You pay when you use your EC2 instances, not when they are shut down. But you can’t lose shuffle files if you don’t shuffle at all. So, using the storage partition join, we both eliminate the need for shuffle and save 40 terabytes of network communication, but also, we, our executor can crash any time without any impact to the performance of our job. Given that, we can leverage spot instances, and spot instances cost roughly 60% less than on-demand instances, and therefore, we can go down to 2.5 thousand euros or dollars more or less every month.
So, let’s jump to conclusions. Writing SQL queries is the easy part. Understanding the physical plan and guiding query engines is a different beast. Secondly, Apache Iceberg is just the icing on the cake. You need to nail the basics, which is Parquet, distributed query engines to get the most out of it. Third is, well, Spark still rocks because we understood it’s not about reading, writing, or shuffling data faster, so running on native run times that are faster than the J1, but it’s a lot about smart planning, and I think this use case greatly tells you that. It’s not going faster. It’s doing as less, the minimum amount of effort that you need to do. And, finally, yes, CDC or 100 terabyte HBase tables that receive 100 gigabytes of changes every day is possible, and it’s possible with one-hour application latency and spending as much as 3K euros per month.
There are other things I would love to discuss, but today there is no time, so definitely let’s keep in touch. If you are curious in how we optimized small writes, like we are able to choose copy and write and margin read based on the percentage of changes on each partition. How to run table maintenance and keep the table read-optimized while updating it continuously, because now we have a streaming job, and it’s hard to fit some table maintenance, like write data files, if your pipeline still runs, because optimistic concurrency will get in the way. And, finally, we implement kind of an autopilot in order to squeeze every last bit of our compute cluster, because, well, this job costs so much in the end that we don’t want to waste any single CPU cycle. And that was it.