3  Analytical Data Store

Author

Zachary M. Smith

The goal of Analytical Data Store is to provide a simple and fast way to access BWAM water quality data.

Data from the BWAM Oracle Data Warehouse, Water Quality Monitoring and Assessment (WQMA), have been extracted, collapsed into a two large tables, and saved as a parquet files. The two tables represent:

  1. obt_taxa_abundance.parquet: All the taxonomic count data and associated sample information.

  2. obt_results.parquet: All results (e.g., chemistry, habitat, survey questions, etc.) and associated sample information.

We will use the packages arrow to connect to the parquet file and dplyr to help us query data.

Let’s load the necessary packages and establish the file path to the results parquet file. The parquet file is stored on the L-drive and should be accessible to all DOW staff.


Attaching package: 'arrow'
The following object is masked from 'package:utils':

    timestamp

Attaching package: 'dplyr'
The following objects are masked from 'package:stats':

    filter, lag
The following objects are masked from 'package:base':

    intersect, setdiff, setequal, union
# Directories
obt_result_dir <- file.path(
  "L:",
  "DOW",
  "BWAM Share",
  "data",
  "parquet",
  "analytical_table_store",
  "obt_result.parquet"
)

obt_taxa_dir <- file.path(
  "L:",
  "DOW",
  "BWAM Share",
  "data",
  "parquet",
  "analytical_table_store",
  "obt_taxa_abundance.parquet"
)

3.0.1 Just Give Me the Data

Some example queries provided to get you started quickly…

# All data
all_df <- open_dataset(obt_result_dir) |> 
  collect()

# All lake data
lake_df <- open_dataset(obt_result_dir) |> 
  filter(WATERBODY_TYPE %in% "lake") |> 
  collect() 

# All TP results
tp_df <- open_dataset(obt_result_dir) |> 
  filter(PARAMETER_NAME %in% "phosphorus",
         FRACTION %in% "total") |>
  select(SITE_CODE,
         EVENT_ID,
         EVENT_DATETIME,
         FRACTION,
         PARAMETER_NAME,
         RESULT_VALUE,
         UNIT,
         RESULT_QUALIFIER) |> 
  distinct() |> 
  collect()

# Count of the number of sites with chloride data by waterbody type
cl_count_df <- open_dataset(obt_result_dir) |> 
  filter(PARAMETER_NAME %in% "chloride") |>
  distinct(WATERBODY_TYPE, SITE_CODE) |> 
  count(WATERBODY_TYPE) |> 
  collect()

3.1 Parquet and Arrow Introduction

Apache Parquet is an open source, column-oriented data file format designed for efficient data storage and retrieval. it provides high performance compression and encoding schemes to handle complex data in bulk and is supported in many programming language and analtyics tools.

-Parquet (apache.org)

As described above, parquet files are designed for efficient data retrieval. The data are stored in a columnar format, enabling rapid data retrievals. Using Apache Arrow, analytics can be performed in-memory (using RAM) on parquet files without the need to pull of the data into R or python. In general, this means queries can be performed very rapidly and you only load data you need into R or python.

Additionally, parquet files have meta data that describe the column types (character, numeric, date, etc.). Unlike, column separated values (CSVs) or excel file (XLSX), there is no need to tell R or python what the data type of data are stored in each column. This enables users to query data and begin working with it immediately with out the overhead of specifying column type details with every new analysis.

Note

For more information about Apache Parquet or Arrow, please visit:

  • Parquet (apache.org)

  • Apache Arrow | Apache Arrow

3.2 Query Overview

Parquet files can be queried using the python library, PyArrow, or the R package, arrow. The remainder of this chapter will focus on using the R package, arrow.

Querying data is very similar to reading CSV files with read.csv(), but it is a two step process:

  1. Connect to the parquet file using the arrow function, open_dataset()

  2. Read data into R, using the dplyr function, collect()

obt <- open_dataset(obt_result_dir) |> # 1
  collect() # 2
Tip

Although it is not recommended for these data stores, you can use the arrow function, read_parquet(), to perform the above query in a single step. This is not recommended, because in almost all instances you do not need all data in a data store for your analysis.

The open_dataset() function can be followed by dplyr functions to query and perform analytics on the data store before bringing the data into R.

Tip

arrow supports many dplyr functions, but not all. If a dplyr function is not support, do as much of the work as you can with arrow, collect() the data into R, and then use the dplyr function of interest.

Let’s say we are interested in the major drainage basin names for basin numbers “01” and “02.” We can get to this information with the following query and we only need to load a 2x2 table into R as opposed to a 2,148,847x88 table.

  1. Connect to the result data store.

  2. Limit the data to only the columns BASIN and BASIN_NAME and remove duplicates.

  3. Subset the rows to only those rows where BASIN is either “01” or “02.”

  4. Load the queried data into R.

open_dataset(obt_result_dir) |> # 1
  distinct(BASIN, BASIN_NAME) |> # 2
  filter(BASIN %in% c("01", "02")) |> # 3
  collect() # 4
# A tibble: 2 × 2
  BASIN BASIN_NAME                   
  <chr> <chr>                        
1 01    Lake Erie-Niagara River Basin
2 02    Allegheny River Basin        
Important

When performing analytics, you must be cognizant of duplicates. These data sets have been denormalized – meaning some data have been made redundant due to a one-to-many join.

Use the dplyr functions, select() and distinct(), to help you target only the columns of interest and remove redundant rows before performing analytics.

You can also perform analytical summaries of the data store. Let’s say you are interested in the number of sites (SITE_CODE) that have total phosphorus data by waterbody type (WATERBODY_TYPE). The following query limits the data to the rows and columns of interest and counts the number of sties in each waterbody type.

  1. Connect to the result data store.

  2. Subset the rows to only the rows where:

    • PARAMETER_NAME matches “phosphorus”

    • FRACTION matches “total”

  3. Keep only the columns WATERBODY_TYPE and SITE_CODE, and remove any duplicate rows.

  4. Aggregate by WATERBODY_TYPE and count the number of rows.

  5. Load the queried data into R.

open_dataset(obt_result_dir) |> # 1
  filter(PARAMETER_NAME %in% "phosphorus", # 2
         FRACTION %in% "total") |> # 3
  distinct(WATERBODY_TYPE, SITE_CODE) |> # 4
  count(WATERBODY_TYPE) |> # 5
  collect() # 6
# A tibble: 4 × 2
  WATERBODY_TYPE     n
  <chr>          <int>
1 lake            1349
2 river_stream    1376
3 other              1
4 outfall            2

Or maybe you are interested in finding the average and standard deviation of total phosphorus observations in each waterbody type:

  1. Connect to the result data store.

  2. Subset the rows to only the rows where:

    • PARAMETER_NAME matches “phosphorus”

    • FRACTION matches “total”

  3. Keep only the columns WATERBODY_TYPE, SITE_CODE, EVENT_ID, and RESULT_VALUE and remove any duplicate rows.

  4. Aggregate by WATERBODY_TYPE.

  5. Perform summary statistics removing NA results (see warning note below).

  6. Remove the aggregate/grouping from the data.

  7. Load the queried data into R.

open_dataset(obt_result_dir) |> # 1
  filter(PARAMETER_NAME %in% "phosphorus", # 2
         FRACTION %in% "total") |> 
  distinct(WATERBODY_TYPE, SITE_CODE, EVENT_ID, RESULT_VALUE) |> # 3
  group_by(WATERBODY_TYPE) |> # 4
  summarize( # 5
    MEAN = mean(RESULT_VALUE, na.rm = TRUE),
    SD = sd(RESULT_VALUE, na.rm = TRUE)
  ) |> 
  ungroup() |>  # 6
  collect() # 7
# A tibble: 4 × 3
  WATERBODY_TYPE   MEAN     SD
  <chr>           <dbl>  <dbl>
1 lake           0.0391  0.173
2 river_stream   0.0601  0.149
3 other          0.59   NA    
4 outfall        1.39    1.46 
Important

Please note that the query above excludes non-detects using the na.rm = TRUE argument in the mean() and sd() calls. Therefore, the results are biased positively. If you are interested in a similar query, you need to decide how to handle non-detects and implement the decision prior to performing the summary statistics.

3.3 Common Queries

3.3.1 Waterbody Data from a Specific Time Period

What data were collected in Otsego Lake from 2022-2023?

  1. Connect to the result data store.

  2. Subset the rows to only the rows where:

    1. The waterbody name is “Otsego Lake”

    2. The sampling date is between 2022 and 2023

  3. Load the queried data into R.

otsego_df <- open_dataset(obt_result_dir) |> # 1
  filter(WATERBODY_NAME %in% "Otsego Lake", # 2.1
         between(EVENT_DATETIME, # 2.2
                 left = lubridate::ymd("2022-01-01"),
                 right = lubridate::ymd("2023-12-31"))
  ) |> 
  collect() # 3