May 2, 2024

Lessons Learned from Running Merge-on-Read Iceberg Pipelines at Scale

Organizations are leveraging merge-on-read Apache Iceberg operations to efficiently handle sparse updates. This talk will share insights from running such operations on tables with tens of petabytes of data. You’ll learn when to choose merge-on-read over copy-on-write execution mode, how to optimize the write performance, and the best practices for maintaining such tables using Apache Iceberg’s built-in tools. This presentation will benefit engineers considering Apache Iceberg adoption, as well as those who already use it and seek to enhance their existing production environments.

Topics Covered

DataOps and ELT/ETL
Iceberg and Table Formats

Sign up to watch all Subsurface 2024 sessions

Transcript

Note: This transcript was created using speech recognition software. While it has been reviewed by human transcribers, it may contain errors.

Anton Okolnychyi:

So I’m Anton, I’m an Iceberg PMC member, working on the data infrastructure at Apple. This session will be about running Merge and Read Iceberg pipelines at scale. I’ll try to share what I learned building a lot of this stuff, as well as helping others deploy it in production. So hopefully by the end of the presentation, you will understand what the Merge and Read is appropriate for your use cases and how to best leverage it. 

Materialization Strategies

If you need to modify records in an Iceberg table, you have to understand that Iceberg offers multiple materialization strategies, and each strategy has its own trade-offs. Lazy materialization, which is also known as copying-write, assumes that changes are handled by rewriting and swapping data files that need to be modified. For example, if you need to delete 10 records in a file, you replace that file with another one that doesn’t have those 10 records. An alternative to that is lazy materialization, which we also call Merge and Read. Instead of rewriting data files, this one works with differences, and it relies on delete files to mark particular records as logically remote. Deletes have to be merged with existing data every time you read the table to get a consistent representation of the dataset. And there are two types of deletes, there are quality deletes and there are position deletes. So all in all, this gives us three different ways to encode our changes, and each of them has their own trade-offs. 

If you handle changes using copy-and-write, there would be no overhead on read, and the table would not require any special table maintenance afterward. Copy-and-write excels when you need to modify a large number of records in a subset of files, which is fairly common in bad-choice cases, I would say. At the same time, it really struggles if you need to handle very sparse changes that are scattered across the entire dataset, and for instance, modifying a single record in every file of the dataset requires rewriting the entire dataset, which is simply impractical at scale. Merge and Read, on the other hand, offers a much faster way to handle sparse changes, but requires additional work during read. Position deletes tend to have a fairly small overhead, but they require writers to find positions of the files, positions of the records that have to be changed. Equality deletes are designed for streaming upstarts, and the primary benefit here is that they allow Iceberg to produce differences without scanning the target table. And it’s really important for the stability of the write operation in your streaming pipeline. For example, absorbing 100 records in a table with 20 petabytes and a table with 1 gigabyte of data is exactly the same, it has the exact same cost. At the same time, equality deletes are the most expensive to apply on read, and you would probably have to deal with more aggressive compaction to mitigate that. 

Push Down Filters

Apart from choosing the right materialization strategy, you also need to think about filter pushdown, because it enables engines to discard all irrelevant metadata, which means faster job planning, and also all irrelevant data, which means you have faster reads. And as much as we want to automate this process and make it seamless to the user, for now, many query engines, including Spark, require you to provide the predicates for pushdown manually. So, our more statement on this slide looks for matches on_id, and Spark allows users to provide a predicate on the target table as part of the on condition. In this example, our table is bucketed by id, and we know that our changes belong to buckets 1 and 2. This predicate will be pushed down before the merge operation starts. So, I highly encourage you to think about your partition strategy and about the sort order of your table, so that you can narrow down the scope of your operations. And in a lot of cases, you will not have this predicate available to you, but you can try to compute it by looking at the changes. For example, you can compute min-max values for the sort keys that are in your source relation. You can try to come up with a partition predicate, and if that predicate is selective, it will have a tremendous impact on the performance. 

Tune Metric Collection

If you’re working with tables that contain hundreds of columns or millions of files, I would also recommend tuning metrics collection to avoid any performance issues associated with that. I usually start by looking at the workloads and anticipated query patterns to come up with a list of columns for which it would be most beneficial to keep the column level statistics. So, in this example, we’re setting the default metrics collection mode to none, meaning that iceberg will not collect metrics for columns. But then we override this for columns C1, C2, and C3. In most cases, you should be fine with truncated metrics, which would mean that iceberg would keep track of truncated lower and upper bounds. But if your values have a very long common prefix, like URL for instance, you may need to persist the full value, but that’s rarely the case. Usually you want to store only a few columns that you can benefit from, and those are usually your sort keys or something that has natural ordering in them. If you’re working with tables that contain tens of petabytes of data, then planning will be a challenge. And iceberg supports both local planning and distributed, and can automatically transition between these two modes based on the size of the metadata that it needs to process. In most cases, it would do the right thing automatically, so you shouldn’t care about this one, but you can also force a particular planning mode if you need to. If you opt for local planning, make sure you have enough memory on the driver because you don’t want to be in constant garbage collection. For instance, you also need to give enough cores to the driver because this would impact how many manifests iceberg can process at the same time. So the more cores you give, the faster your job planning will be. And if you want to go low level, you can also play around with the number of threads that is used for planning. In a lot of cases, planning is I/O intensive operation, and you can give more threads than the number of cores, and it may be beneficial, but in some cases it doesn’t really help. So it’s not as important as the first two points. 

Distributed Planning

If you’re working with distributed planning, you have to keep an eye on the max result site to avoid job failures. It basically, you need this value high enough to be able to collect all the pre-aggregated results from the executors after you’ve done the filtering. I also recommend enabling cryo-serialization, it’s just much faster than Java serialization. Also be mindful of the number of driver cores and the number of tasks, the number of threads that fetch task results on the driver. This would control how quickly you can fetch these pre-aggregated task results to the driver and start the actual job. If you’re doing a full table scan, then this part alone, fetching everything back to the driver is probably the most expensive part of the planning. 

Iceberg is designed to work well in the cloud, and in particular with object stores like S3. If you’re storing data there, benefit from S3 file IO and object store layout. Here’s a command that basically enables object store layout in a table. And if set, Iceberg will try to organize files in a way that’s beneficial for object stores. So your locations would look something like this. You would have a prefix for your data, this is where your data is located. Then you would have some random part in it. And after that, you would have your partition paths and the file name. The random part here is important because it allows S3 and other object stores to better balance the workload and kind of avoid hotspotting particular nodes. And that in turn means reduced latency and improved throughput. So your jobs will be just faster, it would be not rate limited as much as you otherwise would. 

Similarly, S3 file IO is something that you can program at the catalog level. What it essentially does is it enables Iceberg to communicate with S3 directly and not with it through the Hadoop file system implementation, which has a lot of overhead because S3 is not a file system as such, it’s an object store. So by going and talking to S3 directly, Iceberg is capable of delivering a much better read and write performance, basically. 

Distribution Mode

The next topic is fairly important. We’ll talk about distribution mode. And if you ever have a problem that your job produces tiny files, it’s probably related to the distribution mode and how you distribute your data before it’s being written by Iceberg. There are three different options you can choose. There is none, there is hash, and there is range. None means that the data should be passed to Iceberg as is, and no extra operations need to happen before that. You should use this mode only if you’re confident that the data in your tasks is aligned with the partitioning. If you don’t have that confidence, you shouldn’t be using this mode. Hash means that the data has to be clustered by partition before it’s passed to Iceberg. I recommend this as a default. Start with this. And it is actually the default right now. So stick with this one unless you’re not happy with the performance. 

Range means that the data has to be range partitioned by the partition and the sort key. This is kind of a composite key that’s used to divide your data in ranges. I don’t recommend using it in delete, update, or merge because it’s the most expensive and it involves sampling. And sampling means that parts of your plan have to be evaluated twice, and it’s just expensive for a merge operation to evaluate some part of it twice. It may be beneficial for appends, sure, but for role-level operations, probably you should try to avoid that if possible. 

Iceberg allows you to configure the distribution modes for different types of operations. So you can pick one mode for append, another one for update, and something else for merge based on the use case and based on the workload. This topic alone is fairly complicated. So I can spend an hour talking about this one, but Russell gave a really good presentation that goes into detail here. So I’ll just leave a link here. And I encourage you to spend a little bit more time. And I think this would have a big impact on how you write to your iceberg tables. 

I’ll just give a few examples to highlight why it is important. So suppose you have a table with two partitions, partition A and partition B. And each has a data file in it. You execute a merge statement, and Spark determines that there are four deletes and that there are four inserts that have to be encoded. Deletes are red, and then inserts are green on this slide. This is a happy scenario because all of the changes for partition A are in task one, and all of the changes for partition B is in task two. So once we write this out, each task will only write to one partition. So the number of files will be reasonable. But unfortunately, that’s not how the data is always distributed. So we sometimes don’t know how that’s done. And it can happen that your data is all over the place, that each task may have some changes for each partition of the table. And if that happens, the number of files that you will produce will explode. So in this example, we had two tasks, and each task wrote each partition. And you can imagine how bad this problem would be if you have like a thousand tasks, right? It would just, the number of files would explode. And that’s something you don’t want to have. That’s why it’s important to figure out what’s the right distribution mode for you, and how to write as few files as possible. 

Shuffling The Target Table

All right. If you want to remember a single point out of this presentation, it should probably be this one. If you’re shuffling the target table, you’re doing it wrong. And let’s take a look at how the merge is executed. If you look at the physical plan, it can be divided into three big sections. But first of all, you have to scan the target table and the source table, then you need to join them to determine the differences. You need to compute the records that you need to delete, update, and append. Once you know your differences, you have to write them as efficiently as possible. And this is where your distribution comes in. You may have an extra exchange node to change the distribution of the data. Then you may have an optional local sort. If your table has a sort key, this is where you would see your local sort. If it doesn’t, the local sort will be absent. And then finally, at the end, you have the actual write that happens. So all in all, we have these three different sections. But the problem here is, you have two exchanges, you need to shuffle the data twice. And this would totally dominate the runtime of this operation. 

This is by far the most expensive part of it. Therefore, you should try to avoid this and have shuffles operations. And you can do this with storage partition join, and it basically would avoid all the computational I/O and network overhead that is associated with the shuffle. And it gives you an order of magnitude difference when it comes to performance. And it requires less resources. So it’s a win-win situation. And this is the key to have an efficient and stable emergency pipeline. This is also a very big topic. So I’m going to reference another presentation that Chao and I did that contains all the information how to enable it, what’s the actual benefit you’re getting, and compare how the plan changes with storage partition join and without that. But I have to say that most of our bigger tables in production actually use this. And we encourage this for every new use case as well. 

If for whatever reason you are not using storage partition join, even though you should, it may mean that you need to play around with the adapter pre-execution configuration. AccuE is a great technology. It allows Spark and Iceberg to handle SKU and to call S records if needed. But the problem is that it may also lead to small files. And the reason for that is Spark processes 64 MB of the shuffle data in a single task. And that shuffle data is usually row-oriented, without any encoding. Maybe LZ work compressed, maybe something else. But it’s row-oriented. And once you take 64 MB of shuffle data and you write it as a parquet file, it will be stored in columnar format, will be encoded. You may apply another compression codec. And in most cases, the file on disk would be much smaller than the number of shuffle data that you write. And that’s what causes the output files to be small. We’ve invested pretty heavily in fixing this in Spark and Iceberg. So in 3.5 and after Iceberg 1.4, the data source should try to attempt– should try to write 128 MB files by default. And it does this by looking at the shuffle config and at the file format config. And it tries to estimate the compression ratio and adjust the advisory partition size based on that. If this is not enough, you can still tune that manually and increase it to a larger value. And the reason why it’s 128 MB is not to hurt parallelism too much. We don’t know what’s the preference for the user, whether the user prefers faster write but then compaction in the background, or write as big files as possible during the merge and then not deal with the compaction. So it’s a personal preference in a lot of cases. 

And a naive approach to change the advisory partition size would be to set the Spark SQL property for that. If you do that, it would be a big mistake because it would affect all of the jobs and it would affect all of the stages of the operation. Instead, what you want to have is as parallel join as possible and then trade off some of that parallelism for bigger output data files. And in order to achieve that, you should play around with this property in Iceberg. The difference here is that it would only affect the final exchange that happens before the write. It would have no impact on your joins and it’s a win-win situation. So you have all the parallelism you need in the first part of your plan and then you coalesce and you write properly sized output files at the end. 

Table Maintenance

Last but not least, you have to understand that margin read requires more maintenance than copy and write. So you have to deal with minor and major compaction. You have to compact your deletes, otherwise your performance will degrade quickly. Iceberg luckily provides all the tools you need. You can just use plain SQL to perform all of that maintenance efficiently. One of the procedures that may be applicable to you is rewrite positional deletes, which Zeehan worked on. Basically what it does, it takes all of the delete files that you have, discards all of them that no longer apply to the table, and then compacts them and writes in an optimal layout. This one is really cheap because we’re not touching the data. We’re just reading the deletes and all is done in parallel. So each partition is compacted concurrently. So it should be fairly efficient. 

Then you also need to deal with data, because you would have some differences in the data files as well. So what I recommend is do a set of minor compactions where you just compact the differences, and then after a while, do one major compaction. Most operations can be done with rewrite data files. You just need to configure the file size thresholds appropriately. Do not pick up any of those large files. The default behavior should be good enough, but there are a lot of knobs to tune here. You can do BINPACK, you can do SORT, you can have a partition predicate, you can compact some partitions. You can also say, “Hey, I want to compact data files that have a lot of deletes.” So all of that is already possible, and I encourage you to check the documentation. And also you can do ZORDER, and in the future you’ll be able to do HELP or CURSE. So all the good stuff. 

And finally, don’t forget about the metadata, because you need to compact that too. And all of it can be done through a rewrite manifest. It’s a distributed operation, it should be very efficient to do, and it would read all of the metadata that you currently have, would re-cluster these on the partition values, and would replace it, basically. Yeah, that’s pretty much it on my side.