Skip to content

Latest commit

 

History

History
865 lines (689 loc) · 17.1 KB

README.md

File metadata and controls

865 lines (689 loc) · 17.1 KB

historian

library(conflicted)
library(tidyverse)
## ── Attaching core tidyverse packages ──────────────────────── tidyverse 2.0.0 ──
## ✔ dplyr     1.1.4     ✔ readr     2.1.5
## ✔ forcats   1.0.0     ✔ stringr   1.5.1
## ✔ ggplot2   3.5.0     ✔ tibble    3.2.1
## ✔ lubridate 1.9.3     ✔ tidyr     1.3.1
## ✔ purrr     1.0.2
conflicts_prefer(dplyr::filter)
## [conflicted] Will prefer dplyr::filter over any other package.

Problem

We want to keep track of the state of a table at different points in time. The table has a primary key id and a column x that we want to keep track of. The id column is essential to identify rows across different points in time, and the x column is a proxy for arbitrary payload data. In this example, V1 is the initial state of the table, V2 is the state of the table after adding a row, V3 is the state of the table after modifying a row, and V4 is the state of the table after deleting a row.

V1 <- tibble(id = 1L, x = letters[1])
V1
## # A tibble: 1 × 2
##      id x    
##   <int> <chr>
## 1     1 a
# Adding a row
V2 <- tibble(id = 1:2, x = letters[1:2])
V2
## # A tibble: 2 × 2
##      id x    
##   <int> <chr>
## 1     1 a    
## 2     2 b
# Modifying a row
V3 <- tibble(id = 1:2, x = letters[3:2])
V3
## # A tibble: 2 × 2
##      id x    
##   <int> <chr>
## 1     1 c    
## 2     2 b
# Deleting a row
V4 <- tibble(id = 2L, x = letters[2])
V4
## # A tibble: 1 × 2
##      id x    
##   <int> <chr>
## 1     2 b

History (temporal) table

At each point in time, there is a table H that contains the history of the table V at that point in time. The table H has columns from and to that define the time interval for which the row is valid. The table H also contains the details from table V at that point in time.

H0 <- tibble(from = integer(), to = integer(), V1[integer(), ])
H1 <- tibble(from = 1L, to = NA_integer_, V1)
H2 <- tibble(from = 1:2, to = NA_integer_, V2)
H3 <- tibble(
  from = 1:3,
  to = c(3L, NA_integer_, NA_integer_),
  bind_rows(V1[1, ], V3[2:1, ])
)
H4 <- tibble(
  from = 1:3,
  to = c(3L, NA_integer_, 4L),
  bind_rows(V1[1, ], V3[2:1, ])
)

H4 is smaller than V1, V2, V3, and V4 combined because we do not store the same data multiple times:

nrow(H4)
## [1] 3
nrow(V1) + nrow(V2) + nrow(V3) + nrow(V4)
## [1] 6

With that, we can define a function at_time() that takes a history table and a point in time, and returns the observation table at that point in time.

at_time <- function(V, time) {
  V |>
    filter(coalesce(from <= !!time, TRUE), coalesce(to > !!time, TRUE)) |>
    select(-from, -to) |>
    arrange(id)
}

H1 |>
  at_time(1) |>
  waldo::compare(V1)
## ✔ No differences
H2 |>
  at_time(2) |>
  waldo::compare(V2)
## ✔ No differences
H2 |>
  at_time(1) |>
  waldo::compare(V1)
## ✔ No differences
H3 |>
  at_time(3) |>
  waldo::compare(V3)
## ✔ No differences
H3 |>
  at_time(2) |>
  waldo::compare(V2)
## ✔ No differences
H3 |>
  at_time(1) |>
  waldo::compare(V1)
## ✔ No differences
H4 |>
  at_time(4) |>
  waldo::compare(V4)
## ✔ No differences
H4 |>
  at_time(3) |>
  waldo::compare(V3)
## ✔ No differences
H4 |>
  at_time(2) |>
  waldo::compare(V2)
## ✔ No differences
H4 |>
  at_time(1) |>
  waldo::compare(V1)
## ✔ No differences

Decomposition

The history tables can be decomposed into two tables: O (observation) and D (difference). The observation table contains the details from the history table at the point in time, and is identical to the data at that point in time, save for the from and to columns. The difference table contains the changes that happened compared to the prior point in time.

Because we want to avoid storing the same data multiple times, we omit rows in a difference table that are identical to rows found in previous difference tables.

O1 <- H1
O1 |>
  select(-from, -to) |>
  waldo::compare(V1)
## ✔ No differences
D1 <- H2[0, ]

O2 <- H2
O2 |>
  select(-from, -to) |>
  waldo::compare(V2)
## ✔ No differences
D2 <- H3[1, ]

O3 <- H3[3:2, ]
O3 |>
  select(-from, -to) |>
  waldo::compare(V3)
## ✔ No differences
# This does not contain H4[1, ], on purpose:
D3 <- H4[3, ]

O4 <- H4[2, ]

O4 |>
  select(-from, -to) |>
  waldo::compare(V4)
## ✔ No differences

Binding the an observed table and past history tables give exactly the history table at that point in time.

bind_rows(O1) |>
  arrange(from, id) |>
  waldo::compare(H1)
## ✔ No differences
bind_rows(O2, D1) |>
  arrange(from, id) |>
  waldo::compare(H2)
## ✔ No differences
bind_rows(O3, D2, D1) |>
  arrange(from, id) |>
  waldo::compare(H3)
## ✔ No differences
bind_rows(O4, D3, D2, D1) |>
  arrange(from, id) |>
  waldo::compare(H4)
## ✔ No differences

Therefore, the at_time() function also works when combining and observation table with difference tables.

O4 |>
  at_time(4)
## # A tibble: 1 × 2
##      id x    
##   <int> <chr>
## 1     2 b
bind_rows(O4, D3) |>
  at_time(3)
## # A tibble: 2 × 2
##      id x    
##   <int> <chr>
## 1     1 c    
## 2     2 b
bind_rows(O4, D3, D2) |>
  at_time(2)
## # A tibble: 2 × 2
##      id x    
##   <int> <chr>
## 1     1 a    
## 2     2 b
bind_rows(O4, D3, D2, D1) |>
  at_time(1)
## # A tibble: 1 × 2
##      id x    
##   <int> <chr>
## 1     1 a

Generalization

Because observation and difference tables are a superset of history tables, combining, e.g., one observation table and two difference tables allows reconstructing the original data for three points in time in the past.

bind_rows(O4, D3, D2, D1) |>
  at_time(1) |>
  waldo::compare(V1)
## ✔ No differences
bind_rows(O4, D3, D2, D1) |>
  at_time(2) |>
  waldo::compare(V2)
## ✔ No differences
bind_rows(O4, D3, D2) |>
  at_time(2) |>
  waldo::compare(V2)
## ✔ No differences
bind_rows(O4, D3, D2) |>
  at_time(3) |>
  waldo::compare(V3)
## ✔ No differences
bind_rows(O4, D3) |>
  at_time(3) |>
  waldo::compare(V3)
## ✔ No differences
bind_rows(O4, D3) |>
  at_time(4) |>
  waldo::compare(V4)
## ✔ No differences

Updating observation and difference tables

How to construct O4 and D3 from O3, D2, D1, and V4? Same question for constructing O3 and D2 from O2, D1, and V3? Or for constructing O2 and D1 from O1 and V2? Or for the initialization, constructing O1 from V1?

We know that we can reconstruct the history table from the observation and difference tables. This then boils down to the question of how to construct O4 and D3 from O3, H3, and V4.

O4
## # A tibble: 1 × 4
##    from    to    id x    
##   <int> <int> <int> <chr>
## 1     2    NA     2 b
D3
## # A tibble: 1 × 4
##    from    to    id x    
##   <int> <int> <int> <chr>
## 1     3     4     1 c
O3
## # A tibble: 2 × 4
##    from    to    id x    
##   <int> <int> <int> <chr>
## 1     3    NA     1 c    
## 2     2    NA     2 b
H3
## # A tibble: 3 × 4
##    from    to    id x    
##   <int> <int> <int> <chr>
## 1     1     3     1 a    
## 2     2    NA     2 b    
## 3     3    NA     1 c
V4
## # A tibble: 1 × 2
##      id x    
##   <int> <chr>
## 1     2 b

We know how to extract V3 from O3:

O3 |>
  at_time(3) |>
  waldo::compare(V3)
## ✔ No differences

We then can compute the new or updated, and deleted rows. We also define V0 and O0 as the empty tables.

V0 <- V1[0, ]
O0 <- O1[0, ]

compute_diff <- function(old, new, time) {
  # Contains both new and updated rows
  P <-
    new |>
    anti_join(old, by = names(new)) |>
    mutate(from = as.integer(!!time), to = NA_integer_, .before = 1)

  # The id values of the deleted rows
  M <-
    old |>
    anti_join(new, by = "id") |>
    select(id)

  # The id values of the changed (new, updated, or deleted) rows
  PM <-
    P |>
    select(id) |>
    bind_rows(M)

  list(P = P, M = M, PM = PM)
}

X4 <- compute_diff(H3, V4, 4)
X4
## $P
## # A tibble: 0 × 4
## # ℹ 4 variables: from <int>, to <int>, id <int>, x <chr>
## 
## $M
## # A tibble: 2 × 1
##      id
##   <int>
## 1     1
## 2     1
## 
## $PM
## # A tibble: 2 × 1
##      id
##   <int>
## 1     1
## 2     1
X3 <- compute_diff(H2, V3, 3)
X3
## $P
## # A tibble: 1 × 4
##    from    to    id x    
##   <int> <int> <int> <chr>
## 1     3    NA     1 c    
## 
## $M
## # A tibble: 0 × 1
## # ℹ 1 variable: id <int>
## 
## $PM
## # A tibble: 1 × 1
##      id
##   <int>
## 1     1
X2 <- compute_diff(H1, V2, 2)
X2
## $P
## # A tibble: 1 × 4
##    from    to    id x    
##   <int> <int> <int> <chr>
## 1     2    NA     2 b    
## 
## $M
## # A tibble: 0 × 1
## # ℹ 1 variable: id <int>
## 
## $PM
## # A tibble: 1 × 1
##      id
##   <int>
## 1     2
X1 <- compute_diff(H0, V1, 1)
X1
## $P
## # A tibble: 1 × 4
##    from    to    id x    
##   <int> <int> <int> <chr>
## 1     1    NA     1 a    
## 
## $M
## # A tibble: 0 × 1
## # ℹ 1 variable: id <int>
## 
## $PM
## # A tibble: 1 × 1
##      id
##   <int>
## 1     1

The observation table is the same as the new table with from and to set to the relevant points in time. For new and updated rows, from is set to the current point in time; otherwise, the point in time from the old observation table is used. The to column is always set to missing. Deleted rows must be removed from the observation table.

X4$P |>
  bind_rows(O3) |>
  distinct(id, .keep_all = TRUE) |>
  anti_join(X4$M, by = "id") |>
  arrange(id) |>
  waldo::compare(O4)
## ✔ No differences
X3$P |>
  bind_rows(O2) |>
  distinct(id, .keep_all = TRUE) |>
  anti_join(X3$M, by = "id") |>
  arrange(id) |>
  waldo::compare(O3)
## ✔ No differences
X2$P |>
  bind_rows(O1) |>
  distinct(id, .keep_all = TRUE) |>
  anti_join(X2$M, by = "id") |>
  arrange(id) |>
  waldo::compare(O2)
## ✔ No differences
X1$P |>
  bind_rows(O0) |>
  distinct(id, .keep_all = TRUE) |>
  anti_join(X1$M, by = "id") |>
  arrange(id) |>
  waldo::compare(O1)
## ✔ No differences

The new difference table is the history table with the changed rows and to set to the current point in time.

H3 |>
  semi_join(X4$PM, by = "id") |>
  filter(.by = id, row_number(from) == n()) |>
  mutate(to = 4L) |>
  waldo::compare(D3)
## ✔ No differences
H2 |>
  semi_join(X3$PM, by = "id") |>
  filter(.by = id, row_number(from) == n()) |>
  mutate(to = 3L) |>
  waldo::compare(D2)
## ✔ No differences
H1 |>
  semi_join(X2$PM, by = "id") |>
  filter(.by = id, row_number(from) == n()) |>
  mutate(to = 2L) |>
  waldo::compare(D1)
## ✔ No differences

The first observation table is the same as the first table with from set to the first point in time and to set to missing.

V1 |>
  mutate(from = 1L, to = NA_integer_, .before = 1) |>
  waldo::compare(O1)
## ✔ No differences

This defines a process for efficiently maintaining the observation and difference tables as new data arrives.

Maintaining an inline history table

The approach above is useful if the data is stored in multiple flat files. Given H3 and V4, how to update H3 in the most efficient way so that it becomes H4? Can we use a variant of compute_diff() and a combination of rows_append(), rows_update(), rows_upsert() and/or rows_delete() for this task?

compute_diff_history <- function(old, new, time) {
  # Contains both new and updated rows
  P <-
    new |>
    anti_join(old, by = names(new)) |>
    mutate(from = as.integer(!!time), to = NA_integer_, .before = 1)

  last <-
    old |>
    select(id, from) |>
    arrange(from) |>
    filter(.by = id, row_number(from) == n())

  deleted <- anti_join(last, new, by = "id")

  changed <- semi_join(last, P, by = "id")

  # The id values of the rows to be patched to reflect deletion
  C <-
    bind_rows(deleted, changed) |>
    mutate(to = as.integer(!!time))

  list(P = P, C = C)
}

Y4 <- compute_diff_history(H3, V4, 4)
Y4
## $P
## # A tibble: 0 × 4
## # ℹ 4 variables: from <int>, to <int>, id <int>, x <chr>
## 
## $C
## # A tibble: 1 × 3
##      id  from    to
##   <int> <int> <int>
## 1     1     3     4
H3 |>
  rows_patch(Y4$C, by = c("id", "from")) |>
  rows_append(Y4$P) |>
  waldo::compare(H4)
## ✔ No differences
Y3 <- compute_diff_history(H2, V3, 3)
Y3
## $P
## # A tibble: 1 × 4
##    from    to    id x    
##   <int> <int> <int> <chr>
## 1     3    NA     1 c    
## 
## $C
## # A tibble: 1 × 3
##      id  from    to
##   <int> <int> <int>
## 1     1     1     3
H2 |>
  rows_patch(Y3$C, by = c("id", "from")) |>
  rows_append(Y3$P) |>
  waldo::compare(H3)
## ✔ No differences
Y2 <- compute_diff_history(H1, V2, 2)
Y2
## $P
## # A tibble: 1 × 4
##    from    to    id x    
##   <int> <int> <int> <chr>
## 1     2    NA     2 b    
## 
## $C
## # A tibble: 0 × 3
## # ℹ 3 variables: id <int>, from <int>, to <int>
H1 |>
  rows_patch(Y2$C, by = c("id", "from")) |>
  rows_append(Y2$P) |>
  waldo::compare(H2)
## ✔ No differences
Y1 <- compute_diff_history(H0, V1, 1)
Y1
## $P
## # A tibble: 1 × 4
##    from    to    id x    
##   <int> <int> <int> <chr>
## 1     1    NA     1 a    
## 
## $C
## # A tibble: 0 × 3
## # ℹ 3 variables: id <int>, from <int>, to <int>
H0 |>
  rows_patch(Y1$C, by = c("id", "from")) |>
  rows_append(Y1$P) |>
  waldo::compare(H1)
## ✔ No differences
Y0 <- compute_diff_history(H0, V0, 0)

Because rows_patch() and rows_append() work on data frames and databases alike, and can persist the changes to a database with in_place = TRUE, the approach above defines a process for efficiently maintaining the history table as new data arrives. Using a single rows_upsert() call is possible but worse because this would mean that the payload would be overwritten for old rows.

Data that is changing for each data delivery

The example above assumes that only few rows are changing for each data delivery. In real-world datasets, situations can occur where a few columns are changing for each data delivery across the entire dataset. In this case, no compression can be achieved by storing only the changed rows. A viable solution is to store the ever-changing columns in a separate table and join them with the history table when needed.

Conclusion

The naive approach to maintaining different versions of a table is to store the entire table for each version (V# in our example). This is inefficient in terms of storage but offers the best performance for querying.

A good compromise is to maintain a history or temporal table (H# in our example). This requires each row to be identified by a unique identifier (the id column). The id column can be an integer, a GUID, or any other unique identifier. Composite keys are also possible. A temproal table contains two extra columns, from and to, that define the time period during which a row is valid. These columns can be of any ordered type, such as integers, dates, or timestamps. The at_time() function provides a way to query such a table at a specific point in time.

The maintenance of a temporal table as new data arrives is slightly different depending on the storage medium because they have different trade-offs. Flat files are easy to work with but require the entire table to be read and written. To maintain efficiency, the history table can be split into observation (O#) and difference (D#) tables. In contrast, a database table can be changed in-place but requires the changesets to be specified in bulk for efficiency.

For flat files, the compute_diff() function provides a way to efficiently maintain the observation and difference tables as new data arrives. For each new data delivery, only the most recent observation table must be replaced with a new difference table, and the new delivery essentially becomes the new observation table.

The compute_diff_history() function provides a way to efficiently maintain a history table on a database new data arrives. It specifies precisely the rows to be updated and appended. For updated rows, the payload x is never touched.