-
Notifications
You must be signed in to change notification settings - Fork 44
/
Copy pathdata-storage.qmd
444 lines (326 loc) · 18.8 KB
/
data-storage.qmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
---
title: "Part 3: Data Storage"
---
```{r, message=FALSE, echo=FALSE}
library(arrow)
library(dplyr)
library(dbplyr)
library(duckdb)
library(stringr)
library(lubridate)
library(palmerpenguins)
library(tictoc)
library(scales)
library(janitor)
library(fs)
library(ggplot2)
library(ggrepel)
library(sf)
```
```{r, include=FALSE}
# save the built-in output hook
hook_output <- knitr::knit_hooks$get("output")
# set a new output hook to truncate text output
knitr::knit_hooks$set(output = function(x, options) {
if (!is.null(n <- options$out.lines)) {
x <- xfun::split_lines(x)
if (length(x) > n) {
# truncate the output
x <- c(head(x, n), "...\n")
}
x <- paste(x, collapse = "\n")
}
hook_output(x, options)
})
```
In this session we'll talk about reading and writing large data sets. There are a few interrelated topics that arise here, so it's really helpful to understand that -- in terms of the read/write capabilities of the **arrow** package -- we're focusing almost entirely on the highlighted path in the diagram below:
```{r}
#| echo: false
#| out-width: 100%
knitr::include_graphics("img/arrow-read-write-dataset.svg")
```
Why this specific pathway?
- Arrow Datasets are the only data object suitable for larger-than-memory data. Arrow Tables and R data frames both attempt to represent the entire data set in-memory, which simply won't work here
- Apache Parquet is a modern data format optimized to make life easier when you're working with big data (as opposed to CSV and Feather, which both have their uses but aren't ideal for this situation)
- Local storage because it's easier. Let's not complicate the story by talking about S3 buckets!
## Parquet files
If you work with large data sets already, you may have encountered parquet files before and none of this will be new material for you. But for folks new to this world it's not as well known, so we'll talk a little about the [Apache Parquet project](https://parquet.apache.org/). Although they are both associated with the Apache Software Foundation, and the **arrow** package is (as far as we know!) the easiest way to work with parquet files, Apache Parquet is an entirely different project to Apache Arrow.
Parquet files structure a tabular data set in a format that is "row-chunked, column-arranged, and paged". Here's what we mean by that. First, we take a table and partition it row-wise into a set of distinct "chunks", as shown below:
```{r}
#| echo: false
#| out-width: 100%
knitr::include_graphics("img/parquet-chunking.svg")
```
Then, for every chunk and for every column in each chunk, that column is split into a collection of "pages":
```{r}
#| echo: false
#| out-width: 100%
knitr::include_graphics("img/parquet-paging.svg")
```
When the table is written to disk, we start writing from chunk 1, column 1, writing each page in order. We then repeat this for every column in this chunk; and then move to the next chunk. This continues until every page has been written:
```{r}
#| echo: false
#| out-width: 100%
knitr::include_graphics("img/parquet-organisation.svg")
```
Importantly for our purposes each chunk, column, and page is preceded by relevant metadata. In addition, a metadata block is written at the end of the file containing information about the table, and the locations of the various constituent parts.
There are two key features to this format that make it desirable when working with large data sets:
- Parquet file readers can use metadata to scan the file intelligently: if we know in advance that only some subset of the table is needed for a query, the reader can skip to the relevant pages
- Data are stored in a compressed binary format, which reduces file size relative to an uncompressed text format such as CSV.
That being said, you probably won't be surprised to hear that we're glossing over a lot of details here. You wouldn't be able to code a parquet reader on the basis of this simplified description! However, that's not our goal here: the purpose of this description is to give you enough of a sense of how a parquet file is organized, so you can understand *why* they're handy when working with large data! You can find a lot more information about these details in the Apache Parquet documentation.
A single parquet file generally contains a quantity of data small enough to load into memory. For example, let's say I want to load the NYC taxi data for September 2019. This subset of the data can be stored in a 122MB parquet file, and I can load the whole thing into R as a conventional data frame. It's about 6.5 million rows, but that's not too much of a challenge:
```{r basic-parquet-read, message=FALSE}
parquet_file <- "~/Datasets/nyc-taxi/year=2019/month=9/part-0.parquet"
nyc_taxi_2019_09 <- read_parquet(parquet_file)
nyc_taxi_2019_09
```
One thing to highlight here is that the columnar structure to parquet files makes it possible to load only a subset of the columns:
```{r selective-parquet-read}
parquet_file |>
read_parquet(col_select = matches("pickup"))
```
Better yet, the file reader is faster when only a subset of the columns is needed
```{r timing-parquet-reads}
tic()
parquet_file |>
read_parquet() |>
invisible() # suppress printing
toc()
tic()
parquet_file |>
read_parquet(col_select = matches("pickup")) |>
invisible()
toc()
```
This property is handy when dealing with larger-than-memory data: because we can't load the whole thing into memory, we're going to have to iteratively read small pieces of the data set. In the next section we'll talk about how large data sets are typically distributed over many parquet files, but the key thing right now is that whenever we're loading one of those pieces from a parquet file, an intelligently designed reader will be able to speed things up by reading only the relevant subset each parquet file.
::: {.callout-tip #exercise-parquet}
## Exercises
::: {.panel-tabset}
## Problems
1. Let's start with some baseline data. Take the data currently stored in `nyc_taxi_2019_09` and write it to a CSV file using `write_csv_arrow()` function supplied by the **arrow** package. Using the `tic()` and `toc()` functions from **tictoc** package, record how long it took to write the file. Similarly, using the `file_size()` function from the **fs** package, see how large the file is.
2. Repeat the previous exercise, but this time write the data to a parquet file using `write_parquet()`. For folks who are new to parquet files: use `.parquet` as the file extension. How does this compare to the previous exercise?
3. Try reading both files into R using `read_csv_arrow()` and `read_parquet()`, and compare load times. As a bonus, try the same thing with `as_data_frame = FALSE` for both files: that way the data will be read into Arrow memory rather than R memory. Is there a difference in elapsed time?
## Solution 1
```{r csv-write, cache=TRUE}
tic()
write_csv_arrow(nyc_taxi_2019_09, "data/nyc_taxi_2019_09.csv")
toc()
file_size("data/nyc_taxi_2019_09.csv")
```
## Solution 2
```{r parquet-write, cache=TRUE}
tic()
write_parquet(nyc_taxi_2019_09, "data/nyc_taxi_2019_09.parquet")
toc()
file_size("data/nyc_taxi_2019_09.parquet")
```
Writing data from R to a parquet file is faster than writing a CSV file. The end result is much smaller too. The difference in file size is mostly because parquet files are a binary compressed format, whereas CSV files are stored as uncompressed plaintext.
## Solution 3
Recall that in exercises 1 and 2 I saved the data to `"data/nyc_taxi_2019_09.csv"` and `"data/nyc_taxi_2019_09.parquet"`. You may have chosen different file paths!
The first part of the problem asks us to read the CSV file and the parquet file into R, and record the time taken:
```{r timing-reads-exercise-1, cache=TRUE}
tic()
"data/nyc_taxi_2019_09.csv" |>
read_csv_arrow() |>
invisible()
toc()
tic()
"data/nyc_taxi_2019_09.parquet" |>
read_parquet() |>
invisible()
toc()
```
The parquet file is substantially faster to read.
The second part of the problem asks us to repeat the exercise, loading the data as an Arrow Table rather than an R data frame. Here's how we do that:
```{r timing-reads-exercise-2, cache=TRUE}
tic()
"data/nyc_taxi_2019_09.csv" |>
read_csv_arrow(as_data_frame = FALSE) |>
invisible()
toc()
tic()
"data/nyc_taxi_2019_09.parquet" |>
read_parquet(as_data_frame = FALSE) |>
invisible()
toc()
```
Read times are fairly similar for Arrow as they are for R. Again, the parquet file is faster than the CSV file.
:::
:::
## Multi-file data sets
In our hands-on exercises we've been working with the [NYC Taxi data](packages-and-data.html), a single tabular data set that is split across 158 distinct parquet files. It's time to take a closer look at how this works. Let's start by opening the data:
```{r, out.lines = 5}
nyc_taxi <- open_dataset("~/Datasets/nyc-taxi/")
nyc_taxi
```
The `...` in the output indicates truncation: I'm only showing the first few lines of output because it's the first line that's important. The `nyc_taxi` object is an Arrow Dataset represented by 158 files. We can inspect the `files` field of this object to find the paths to these files:
```{r, out.lines = 5}
nyc_taxi$files
```
Notice that the filenames are structured. They're organised by year and month and -- as you might expect -- the data for September 2016 can be found in the `year=2016/month=9/` folder. Not only that, the folder names correspond to actual field-value pairings in the data. The `nyc_taxi` data set has variables named `year` and `month`, and those variables can take values `2016` and `9` respectively. This convention, in which folders are named using the relevant filed-value pairs, is referred to as "Hive partitioning", based on the [Apache Hive project](https://hive.apache.org/).
Partitioning the data set in a thoughtful way is one of those tricks used to make large data sets manageable. If I want to compute some quantity based only on the rides that took place in September 2019, I can ask the operating system to open the one file containing that data. My query never has to touch the other 157 files. To give a sense of how much of a difference this makes, let's compare two different queries. The first one extracts a subset of about 10 million rows, based on the partitioning variables:
```{r taxi-subset-a}
nyc_taxi |>
filter(year == 2016, month == 9) |>
nrow()
```
The second one extracts a subset of about the same size, again about 10 million rows, based on the pickup location zone:
```{r taxi-subset-b}
nyc_taxi |>
filter(pickup_location_id == 138) |>
nrow()
```
Neither of these queries do very much with the data: they're just inspecting the metadata to count the number of rows. However, the first query only needs to look at the metadata in one file, whereas the second one has to extract and aggregate data from all 158 files. The difference in compute time is striking:
```{r taxi-subset-filtering-times}
tic()
nyc_taxi |>
filter(year == 2016, month == 9) |>
nrow() |>
invisible()
toc()
tic()
nyc_taxi |>
filter(pickup_location_id == 138) |>
nrow() |>
invisible()
toc()
```
Admittedly, this is a bit of a contrived example, but the core point is still important: partitioning the data set on variables that you're most likely to query on tends to speed things up.
This leads to a natural question: how many variables should we partition on? The `nyc_taxi` data set is partitioned on `year` and `month`, but there's nothing stopping us from defining a `weekday` variable that takes on values of Sunday, Monday, etc, and using that to define a third level of partitioning. Or we could have chosen to drop `month` entirely and partition only on `year`. Which approach is best?
The answer, boringly, is that it depends. As a general rule, if you break the data up into too many small data sets, the operating system has to do too much work searching for the files you want; too few, and you end up with some very large and unwieldy files that are hard to move around and search. So there's often a sweet spot where you partition based on small number of variables (usually those that are used most often in queries) and end up with a manageable number of files of a manageable size. These rough guidelines can help avoid some known worst cases:
- Avoid files smaller than 20MB and larger than 2GB.
- Avoid partitioning layouts with more than 10,000 distinct partitions.
As an aside, you can apply the same guidelines when thinking about how to structure groups within file types such as parquet that have a notion of row chunks etc, because the same tradeoffs exist there.
## An example
Okay, enough theory. Let's actually do this. We've already seen the "read" functionality in action, but I'm going to do it again with some additional arguments specified. This time around, I'm going to open a smaller subset, corresponding only to the 2016 data:
```{r, out.lines=5}
nyc_taxi_2016a <- open_dataset(
sources = "~/Datasets/nyc-taxi/year=2016/",
format = "parquet",
unify_schemas = TRUE
)
```
In this version of the command I've explicitly stated that I'm looking for parquet files, though I didn't really need to do this because `format = "parquet"` is the default. I've also set `unify_schemas` to `TRUE` rather than the default value of `FALSE`. What this argument refers to is the way `open_dataset()` aggregates the data files. When `unify_schemas = TRUE`, it examines every data file to find names and data types for each column (i.e., the schema for the data set), and then seeks to aggregate those into a coherent whole. This can be time consuming, and is usually unnecessary because the data are written in the exact same format in every file. As a consequence, when `unify_schemas = FALSE` (the default), the scanner will just look at the first file and assume that every data file has the same schema.
Okay, so let's have a look at the data:
```{r, out.lines=5}
nyc_taxi_2016a
```
As expected, this is a multi-file Dataset object constructed from 12 files.
Next, let's imagine that we're about to write an application whose primary function is to look at the different vendors who provide the raw data on a monthly basis. That's a highly specialized use of this data set, and it may be advantageous to partition by `month` and `vendor_name` because those are the variables we're likely to be querying on later. Because the philosophy of the **arrow** package is to try to preserve **dplyr** logic to the greatest possible extent, the default behaviour of `write_dataset()` is to inspect the grouping variables for the data and use those to construct a Hive-style partition. So if I want to write this Dataset to file using month and vendor as my partitioning variables I would do this:
```{r writing-dataset-a, cache=TRUE}
tic()
nyc_taxi_2016a |>
group_by(month, vendor_name) |>
write_dataset("~/Datasets/nyc-taxi_2016")
toc()
```
As you can see, this write operation does take a little while to finish, but half a minute isn't too bad.
In any case, I can open the new data set the same way as before:
```{r, out.lines=5}
nyc_taxi_2016b <- open_dataset("~/Datasets/nyc-taxi_2016")
nyc_taxi_2016b
```
Notice the difference between `nyc_taxi_2016a` and `nyc_taxi_2016b`. The both refer to the same data conceptually (i.e., all the taxi rides from 2016), but they're linked to different files and they carve up the dataset in different ways:
```{r, out.lines=5}
nyc_taxi_2016a$files
```
```{r, out.lines=5}
nyc_taxi_2016b$files
```
To give you a sense of the difference between the two, here's an example of a (somewhat) realistic query, computed on the `nyc_taxi_2016b` data:
```{r}
nyc_taxi_2016b |>
filter(vendor_name == "CMT") |>
group_by(month) |>
summarize(distance = sum(trip_distance, na.rm = TRUE)) |>
collect()
```
Here's the time taken for this query:
```{r querying-subset-b, echo=FALSE}
tic()
nyc_taxi_2016b |>
filter(vendor_name == "CMT") |>
group_by(month) |>
summarize(distance = sum(trip_distance, na.rm = TRUE)) |>
collect() |>
invisible()
toc()
```
and for the same query performed on the `nyc_taxi_2016a` data:
```{r querying-subset-a, echo=FALSE}
tic()
nyc_taxi_2016a |>
filter(vendor_name == "CMT") |>
group_by(month) |>
summarize(distance = sum(trip_distance, na.rm = TRUE)) |>
collect() |>
invisible()
toc()
```
The difference is not quite as extreme as the contrived example earlier, but it's still quite substantial: using your domain expertise to choose relevant variables to partition on can make a real difference in how your queries perform!
::: {.callout-tip #exercise-dataset}
## Exercises
::: {.panel-tabset}
## Problems
1. (Preliminary) Write a query that picks out the 2019 NYC Taxi data and -- in addition to the `month` and `year` columns already existing in the data -- adds columns for `monthday` and `yearday` specifying the day of the month and the day of the year on which the pickup took place (note: **lubridate** functions `day()` and `yday()` are both supported). Check that your query works by selecting the `pickup_datetime` column and your newly-created `monthday` and `yearday` columns and then collecting the first few rows.
2. Using this query, write the 2019 NYC Taxi data to a multi-file dataset -- twice. The first time you do it, partition by `month` and `monthday`. The second time you do it, partition by `yearday`. Notice that both of these produce 365 files, each of which contain the exact same subset of data!
3. Using *only* the datasets that you have just written to disk (i.e., you'll have to reopen them using `open_dataset()`), calculate the total amount of money charged (as measured by the `total_amount` variable) each day, for the 81st through 90th day of the year (using the `yearday` variable). Do this for *both* versions of the dataset that you just created, and record how long it took to finish in both cases. What do you notice?
## Solution 1
The query:
```{r}
nyc_taxi_2019_days <- nyc_taxi |>
filter(year == 2019) |>
mutate(
monthday = day(pickup_datetime),
yearday = yday(pickup_datetime)
)
```
The check:
```{r}
nyc_taxi_2019_days |>
select(pickup_datetime, monthday, yearday) |>
head() |>
collect()
```
## Solution 2
```{r write-2019-data, cache=TRUE}
tic()
nyc_taxi_2019_days |>
group_by(month, monthday) |>
write_dataset("data/nyc_taxi_2019_monthday")
toc()
tic()
nyc_taxi_2019_days |>
group_by(yearday) |>
write_dataset("data/nyc_taxi_2019_yearday")
toc()
```
## Solution 3
First, we'll open the two datasets:
```{r}
nyc_taxi_2019_monthday <- open_dataset("data/nyc_taxi_2019_monthday")
nyc_taxi_2019_yearday <- open_dataset("data/nyc_taxi_2019_yearday")
```
Here's the solution for the month/day version:
```{r monthday-query, cache=TRUE}
tic()
nyc_taxi_2019_monthday |>
filter(yearday %in% 81:90) |>
group_by(yearday) |>
summarize(gross = sum(total_amount)) |>
collect()
toc()
```
Repeating the same exercises for the yearday version:
```{r yearday-query, cache=TRUE}
tic()
nyc_taxi_2019_yearday |>
filter(yearday %in% 81:90) |>
group_by(yearday) |>
summarize(gross = sum(total_amount)) |>
collect()
toc()
```
The difference is... not subtle.
:::
:::