29 minute read · December 19, 2017
Arrow C++ Roadmap and pandas2
· Creator of the Python pandas project, Co-creator of Apache Arrow, CTO and Co-founder of Voltron Data
Webinar Transcript
Wes McKinney:So try to set some context, like how all these things we’re talking about are relevant to the date science world, which is kinda where I’m coming from. So I’ll try to give you a little bit of background about that.So as some historical context about … So it was this fall four years ago, it was at PyData 2013. Was anyone there? A couple people. As kind of the backstory, I started The Pandas project in April 2008, so it’s been almost 10 years ago. And at that time, we built, I built Pandas based on NumPy and SiPy and the Python stack as it existed in 2008 and so four or five years later I started the DataPad companies … And you see this is a slide deck from when I was working on my startup. So what we tried to do was to use Panda’s in the Python data stack to build essentially a low latency column store that would run on the cloud, and it would power our application, which was designed for data exploration and visual analytics and we very quickly ran into a lot of performance and scalability issues working with very large data sets with Panda’s being able to deliver the kinds of interactive performance, and the ability to work with large datasets.And one of the biggest issues that we faced was that the cost of data access in Python was just too high, so then basically there’s always this dichotomy between either the data’s on disk, or it’s in the cloud bucket or the data’s in memory and so you’re spending all this time loading data into memory only then to evict that data from memory load and load other data’s, so you can perform quarries on it. And so I think this whole experience left me with this like deep longing for cheap or free data access and being able to shuttle data to disk very efficiently, get access to data on disk at no cost and to be able to very freely work with very large data sets in Python.And so this was kinda what I was thinking about this time four years ago, and so I wrote this talk subtitled “10 Things I Hate About pandas” and to give you kind of the super fast summary of why I … So essentially the core problem with the Python data stack is that there’s been all of this innovation that’s happened in the last 10 or 15 years in the analytic database world, the very little of that innovation has trickled down into the data science world, so if you think about Panda’s are … All the tools we use really there built on technology which architecturally has not changed that much in the last 20 years so if you go back to numeric, which is the predecessor to NumPi it’s pretty much the same project, architecturally in terms of how the run time works that was in the late ’90’s, that in turn is no different from Fortran 77, Fortran 90, Matlab you know stuff from the ’70’s and ’80’s, so we’re really working with kind of a pretty dated foundation of computational technology in the Python data world and so Panda sent me down this path of thinking how could we bring the learning and knowledge from the analytic database world and the big data world with the data science world.So the kind of things that I was complaining about in Panda’s were that there’s a lot of bloat kind of separating in between Panda’s data frames and how the data’s represented in memory so if you’re dealing with a column of strings in a data frame and if you work with very large data sets you may know, if you work with a lot of string data you may have suffered from the fact that processing strings in Panda’s is just very expensive and part of that is we represent strings as Python objects and so effectively when your performing a scan operation on a column in a data frame you’re interacting with keep strings which are located all over the ram and could be anywhere so you have essentially no guarantee at all of data locality when you’re doing analytics on non numeric data in Panda’sBeing unable to memory map data and access arbitrarily large data sets on disk is a huge eyesore if you have a gigabyte of data it costs, however much effort it takes to load that gigabyte into memory, you then are using up a gigabyte of ram to be able to address that data set. If you have 10 gigs of data it costs 10 times as much, you can use things like HTFI to load the data faster, but you’d really like to be able to say I have the arbitrarily complex and large data set on disk, I can access any value at any column, anywhere in the data at a constant time without having to load the entire data set into memory. So I guess four years ago I also said HTFI was partial solution.Panda’s doesn’t integrate that well with databases, okay we have a lot of problems with missing data, very inconsistent, so to have a consistent way to do a missing data would be really nice. There’s not much transparency into memory use so some of the things that Jacque was talking about around like hierarchal memory management having a very precise accounting of how much memory a data structure takes up and when you’re performing a unit of work somewhere in the Panda’s run time, you’d like to say hey you have 100 megabytes of Ram to work with here don’t exceed that. And if you need to shuttle data to disk in order to make space in memory then you have to do that but you cannot exceed however much memory your given.So in Panda that’s such a huge problem, where you may run some operation, you get a memory error and then you kinda try to track down, well you know where’s the memory, where’s the memory blowing up. You do have categorical data now, this point is not as pure as it used to be, still hard to write GroupBy operations. You cannot append very cheaply to data frames so, we don’t have any sort of notion of chunked data sets, this is supposed to be nice, so if you had, supposed you were receiving stream of data across the wire, maybe a 1000 rows at a time or a million rows at a time and you accumulate 100 million rows, what happens now is you accumulate all that data and then you have to stitch it together in order to do the Panda software operations on it. And so that can step causes the memory doubling or tripling in memory. And that’s very expensive.It’s not very easy to expand the data types to add user find data types and represent those incoherently throughout the systems so this partially is related to NumPi’s kind of metadata system. Another big issue is that we don’t have a logical query plan or a query engine that can do that multi course scheduling and query planning to essentially pipe line operators and figure out the most efficient way to minimize memory allocations and to perform the set of chronical data manipulation and relational algebra as efficiently as possible. So and related to that is the fact on top all this, almost all the algorithms are single threaded.Anyway this was four years ago and so, when I at CloudAir at the end of 2014, I started looking at how can we bring Python into the big data world and was like in addition to solving all these problems we have to figure out how to make Python more interoperable with the big data world. And so the seed of error was not too far off from that point, but it’s taken a couple of years to kind of get all the right, get the band together so to speak. So if you’re interested in reading more about all these topics, I wrote a long blog post about six weeks ago kind of explaining how arrow specifically address the things that are in this blog post. So I would like to now, kind of talk a little more specifically about the arrow C++ library and what are some of the, lets see here, current slide okay. So explain how some of, how all of these kind of, my wish list of things I would like to have available in Python, are playing out in real code that we can use.So you’ve heard about arrow from Jacque so from the Python perspective the kind of high level pitch of having an efficient data representation for doing analytics. The ability to access single values in constant time so the arrow library both in Java and in C++ a substantial part of what we’ve done over the past year and a half is building out what we call a messaging system or interprocess communication system and then this allows us to put a data set on disk which may consist of a single batch of rows, or it may consist of a sequence of batches of rows and we can address. So if I say zero copy that means I can memory map or reach it on disk or to get a reference to a pointer to short memory region and then I can immediately begin processing that data without doing an extra copy set.And so building out this messaging layer in a way that is sane to work with at a code level has been where a lot of the time that we spent, simply because to build like if you wanna build a data processing engine, that can run on top of data in short memory or data on disk you need to have a coherent object model essentially data structures and tools for dealing hierarchal memory management, cause it might be that you have a memory mapped buffer but then you gonna create a sequence of arbitrarily complex data structures that all have references back to that original mapped memory buffer. So you need to be able to deal with chains of references between different chunks of memory where they might be apparent some place that is actually the owner of the buffer and it may be that, that is a memory mapped pointer to some region and it might be data that’s been allocated out of the heap. So all of that’s been extracted away in the implementations.So in the C++ library the, this is the slide deck from March, so this has been a little while, but there are a couple of things that are out of date, but the first thing that we built was a memory management and IO layer for dealing with, essentially to have a nice API for dealing with zero copy references from top to bottom. So on top of that memory management and IO we have arrows typed meta data so describing all of the field names and logical data types things like dates and time, schemas for tables and on top of the type meta data we have containers for table like data structures. So those are record batches which are collections of contiguous memory and in C++, there’s the notion of a table which is a chunked collection of record batches. So that kind of solves the append problem where if you need to append to a data set you’re just adding a chunk to a table and so you don’t actually need to copy any memory.So at the memory management level if you’re using the C++ library the core primitive is the buffer object so this is the base C++ type that deals with the reference to memory, there’s many different buffer implementations, there’s buffers that reference other peoples’ memory, there’s buffers for memory maps, there’s buffers’ that reference heap allocation. But when you’re working with the data in C++, you’re almost always working with a smart pointer to a base reference of buffer, and so from your perspective all you need to be able to ask the buffer is can you give me the address to your memory and how big is the memory. It also has some additional properties, like are you mutable, does your memory belong to someone else, so these can enable you to reason about whether or not that memory is safe to mutate or whether or not if you create a slice or reference that buffer, are you going to be creating something that’s more complicated to manage.All the allocations in memory come out of memory pool so if you have like a unit of work that you want to perform, you can create a memory and give that to the operator, which will then record all of the allocations that take place, keep track of the maximum allocations, so you’ll know how much memory was used. You could use the memory pool concept to eliminate, to enforce limits, lets say maybe you could put a cap of like 100 megabytes or something like that, do that through the memory pool. At the type metadata level there’s the notion of a data type which is the base for all types, the field which is a named type so that would be like a column name and a type and a schema is a collection of fields so that would be a full description of a table.So at the array in so just to make things more confusing in C++, the vectors are called rays, you know … In my brain like vectors are re sizeable and arrays are fixed sides so you know kill me. So the arrays are all one dimensional so there array classes for each type of, each logical type in arrow, they can be dictionary encoded so if you have a string array you can extract the unique values and then represent the data as integers which reference the dictionary and that can be make for a lot more efficient analytical processing. And then we have record batches which are collections of arrays and then table which is not part of the arrow specification but is a strictly a C++ concept, it’s a collection of record batches so you can have a column and a table which is composed of 100 chunks and that is a first class object which you can use in analytics without having to do any concatenation.So we have a variety of IO interfaces which understand arrows memory management machinery so for certain kinds of files, if you had like a normal operating system file when you do a read from that file, you need to actually allocate memory and see if you would get one kind of buffer if you’re using a normal, if you have a memory mapped file, you get a different kind of buffer. So in that case, the buffer reference is the memory map and so there is no memory allocation required or any copying. So we’ve built IO interfaces for variety of different IO subsystems, we have one for HTFS, I would like to see one get built for cloud, blog stores and other interfaces so you could interact with different sorts of file systems in a uniformed way. So if you use TensorFlow or any of the modern deploying frameworks, they must all have IO subsystems which look nearly identical to this one. So one top of memory management IO, we have interfaces for reading or writing collections of arrow data to different IO interfaces, so if you have a memory mapped file you can write a record batch to that memory map file, and when you read it back, rather than reading that data back into memory, your just constructing a bunch of buffers that reference the data in the memory map, so that’s what makes it zero copy.We make extensive use of the google flat buffers library so when people hear about zero copy they say hey have you heard about flat buffers and capnproto, I say yeah of course we use flat buffers. And so flat buffers I would describe as an alternative to protocol buffers that is zero copy so you can access individual elements in the message using the flat buffers API. They are both zero copy tools, flat buffers is a bring your own schema, you have to run a schema compiler it’s a lot more like [inaudible 00:17:10] so as far as a tool for dealing with data, flat buffers is significantly more general, it does not have a calmer data model, it does not have type meta data and many, so kind of all the stuff that arrow does is not part of, not something you could find in flat buffers.We’ve also some built some other memory containers this is no longer in development, we do have a container for dealing with tensors coming from other frame works and this is mostly so that we can accommodate writing more complex types of objects using arrow’s memory management machinery so if you received a tensor from TensorFlow or from Torch and you wanted to write that to arrow’s IO’s interface and be able to read back with zero copy and then give that data back to TensorFlow or back to Torch, you can do that using arrow and we’ve seen this employed to great effect in the ray-project at the UC Berkeley Rise lab, so as I mentioned here.So earlier this year the ray-project which is part of the Rise lab if you don’t know is the Berkeley successor to the Amp lab and there more focused on real time secure data science and in that theme of being able to react and build kind of real time models, reinforcement learning that sort of thing. Shared memory and serialization becomes a very important part of making that all work and be performant at low latency and so one of the things that the ray-project built was a shared memory object store, they call plasma and they very graciously donated that component to the arrow so that now ships as part of arrow C++ library, it’s part of PyAr when you install the Python bindings. And the idea is that plasma runs as I don’t know if you can see but so in every note when you use the ray library it runs a plasma demon, it’s like a server process that runs a third party.And so the idea is when you have two processes which are performing units of work in different Python processes, rather than sending around data between nodes, you’re just sending around object references to data that is found in the object store and so you’ll see you know, you wanna run f of x and f rather than x being data, x is just an object id, so what the frame work does is part of the arrow project is it looks in the object store it says give me the memory location for that object. So it gets a pointer to that memory location, it finds its meta data, and that metadata is arrow meta data describing the structure of what’s there in the objects’ store. That could be something complicated like a collection of Tensors’ or a dictionary of some arbitrary Python objects, and that’s all read and written using arrows kind of IO and memory management interfaces.So it’s very cool and once you see the performance wins from shared memory and zero copy you can’t unsee it. So kind of the next like major stage of work which is going to be ongoing for the next couple of years is basically rebuilding the internals of Panda’s to be able to take advantage of all of this machinery that we were building up and so at a high level the idea is you would like to be able to as a Panda’s user to get a reference to arbitrarily large on this data sets and to be able to describe a sequence of Panda’s operations and evaluate those operations directly on the data on disk at maximum performance given your available hardware.So if you have an eight core or a 16 core machine based on the characteristics of your hardware, how much memory it has, how many cores it has that the internal Panda’s sort of career planner, scheduler would use your available hardware to the best of its abilities so if you have Syndey operations, ABX2 available it would opt to use ABX2 variants at different kernel implementations, if you have a GPU there might be certain operators where based on the expected cost of moving the data to the GPU you can move the data to the GPU and perform the work using GPU based operators and we’ve already been seeing in vedia, and mapd and some other anaconda is also involved in building gpu analytics on top of arrow memory on the gpu so the pieces for this are already falling into place.A big part of this will also be building essentially what I am describing is a, not in this slide, but I think of it as a virtual table so rather than having to, for things other than arrow data on disk for things like parquet files or hdf5 files. And rather than having to load all of that data in one slurp into memory, you would construct a synthetic object which references the data on disk and so only at the point where you access a column in that data set for the first time, is it de serialized and loaded into memory and so in the case of using parquet, you could avoid a lot of IO by only loading a single column from all of the files from the dataset but if your dealing with CSP files then you might under the hood the csv converter can convert all that csv data to arrow format that is left on disk and that is the memory mapped and then the computations are performed on that memory map converted csv file.And so I expect what would happen in a lot of cases is when your working with a large data set which is in something inefficient like Jayson or csv that you will first decide, infer the schema of the data set and make sure that you’re comfortable with the inferred data types with the columns, you know if it’s Jayson you might have to do a bit of lunging but then convert all of that data to arrow format and leave it in arrow format throughout the duration of your analysis, you aren’t having to go through okay I’m going to rerun my analysis, I’ve got to re parse every csv file, re parse every Jayson file, you only have to perform that conversion once and after that point the data access is free from then onward.So at a high level there hasn’t been a lot of collaboration between data science, eco systems, like Python programmers have no idea what our programmers are doing and mostly they just curse at them or our programmers curse at the Python programmers that the [inaudible 00:23:48] folks are like oh those data science people doing statistics are going to do some like super fast Monte Carlo simulations so you know every one is kind of minding the problems that are right in front of them and you know, so it’s the way that things have developed that’s been organic and without a great deal of collaboration between these eco systems and when we look at what we’re actually doing with the data, like it’s pretty similar to all of these worlds so the fact that, we are not working that much together to think about architecture and how we can design better, faster more scalable systems, that part of kind of just of a purely design point of view and also how we could share code between the environments.And part of what prevented code sharing between these environments is the fact that basically, skip all of these slides, okay so if you think about like the data frame problem like everyone has their own data frame, so when you write an algorithm that processes a data frame, if you write that algorithm for Python it’s going to look different than the version you would write for R and so it’s no surprise we’re not sharing code because we’re processing different data structures. So one of the biggest purposes that long term the arrow serves is to define a standardized data format so that we can begin to build reusable libraries of native code that we could use in every programming language including Java because Java can build Jand I bindings, it can give its buffers to the C++ library, so everyone can use the code that we build by having the zero copy memory sharing, that means I can have data in Python and give it to the JBM or I could give it to R without any costs, and so we can have these poly block programs for maybe you need to reach into R to do something that’s only available as an R library without paying this heavy penalty.We gotta build all of our data collectors of course that would take time, but from my point of view, the most interesting thing, I am friend with Hadley I just saw him a few weeks ago. So the question is well how do you get from a standardized memory format for data frames to a better, faster implementation of Panda’s so if you think about when you write data frame codes this is my pseudo codes for adding to arrays and taking the logarithm, so suppose A and B are columns in a data frame so we have to get access to data so lets assume that it’s in memory, so the way I think about the world is that you have this logical graph of operators, like my data inputs so the edges are data dependencies and the nodes are operators so depending on where A and B, you might have to perform some IO or do some memory mapping or something.And so but to perform this work at first we need what I call kernel functions, which are atomic units of vectorized computation and so the idea is if you need to add to arrays the kernel is a specific implementation for a combination data type, so if you wanna add an N32 array to a float 64 array, you have a kernel which knows how to add N32 to float 64, for many other operators that might have some overflow checking or other kind of logic that is implemented on a per type basis, and you have operators that know about the operators take input and tell you what the output type is going to be. It also based on the input type and the hardware characteristics will select the appropriate kernel to use, so the add operator could decide well this machine that I’m on has ABX2 so I’m gonna use ABX2 ab for two doubles or for a N32 and a float 64.So from kernels you have operators which decide which kernel to use, we can traditionally if you’re using Panda’s you would compute A + B which would allocate a temporary array and write the output of A + B into that array and then would call log on the temporary and create a new array and write the output of log into that output array and so if you know the whole operator graph upfront and you know that you have a whole bunch of threads and these are both element wise operators, you can say okay, first of all I don’t need that temporary array because the temporary array and the out put array are the same type and the same size, so I’m just gonna get rid of it and I’m gonna do add and log in the same thread and I’m gonna do it for as many, I’m gonna create one task for each core on my processor so you could divide the arrays into chunks of divide by A, and then you can perform AB and log in a pipeline and so in practice you know of course it’s faster because you avoid allocations.And in a lot of analytics if you look at Panda’s we’re dealing with a lot of, we aren’t doing crazy like big O, optimizations like we’re largely optimizing one linear timed algorithm to another linear timed algorithm. So there largely a lot of memory bound computations so avoiding the extra memory band width is really what gives you that and the parallelism is what gives you better, better performance. So there’s lot of fun stuff we can do basically we’re starting to build a kind of embedded in memory database which is a library and you can really use any native code application, but you would like it to run on all of your cores and be able to use whatever hardware you have available.If you can eliminate temporary allocations and into operator pipe lining, you can do that and I think in the future if we get some folks who really want to compilers and lodm I think we should be able to have plenty of fun doing a code generation and generating a specialized operators which have essentially things that fuse well or that can be performed more efficiently by not moving data out of registers and get even more processing efficiency through code generation. So basically in the long, from the way I see and who knows how many years it would take but if you think about what we been building, we’ve spent the last year and a half hardening and standardizing the memory format and the meta data for arrow, we’ve been building data connectors, things like parquet format, guess we need to start building some more data connectors so if anyone wants to write an arrow csv reader, you’ll be my friend forever and ever.So on top of that we need to be, we’re going to begin building an execution engine which runs as an embedded C++ library so that it can be deployed anywhere if you wanna run it on your phone, you’ll be able to run arrow on your phone and process data at the edge if you wanna run it on a big machine with a terabyte of ram and 128 cores it will run well there and on top of that we’ll make you know our nice hyphen API and luckily we already have some ones that we seem to like in the Panda’s library and so I think the challenge really would be how can we have a more scalable kind of sane API for working with very large data sets or I think Panda’s largely excels at working with small data sets.so we’ve been able to do a lot of things that work well with a giga byte of data but maybe not so well with a terra byte so for scalable operations, somethings we may not be able to preserve in Panda’s two land and maybe that’s okay, but I think we’ve learned a lot as far as API design and what makes for effective data analysis in Python so to the extent that we could be backwards compatible in Panda’s two land in the future I think we like to preserve as much of the Panda’s API that we know and love but to have a more powerful engine inside that’s working with much larger data sets and getting as much juice as possible out of the hardware.