4  Generate Parquet Files

Load the necessary packages into your environment.

library(DBI)
library(dm)
library(duckdb)
library(glue)
library(nexus) # Custom R-package available here: https://github.com/BWAM/nexus

Establish where the files will be stored. 1. build_dir: The directory where tables exported from the WQMA Oracle Database will be saved as parquet files. These tables will be used to build the tables in the analytical_dir. 2. analytical_dir: The directory where analytical tables will be saved as parquet files. Analytical tables will represent many of the WQMA tables joined together in a form that is readily available for analytical queries or analysis.

base_dir <- file.path("L:",
                     "DOW",
                     "BWAM Share",
                     "data",
                     "parquet")

build_dir <- file.path(base_dir, "build_tables")
analytical_dir <- file.path(base_dir, "analytical_table_store")

Connect to the WQMA Oracle database and establish the data model.

con <- get_connected(username = "your-user-name",
                     keyring = "wqmap")

data_model <- get_data_model(con = con)

5 Export Tables from WQMA

Query each of the tables from the WQMA Oracle database and save them as a parquet file to the L-drive.

purrr::walk(names(data_model),
            .f = function(.x) {
              print(paste("Querying:",.x))
              tictoc::tic()
              df <- get_big_table(con = con,
                            table = .x,
                            n = 10000)
              
              arrow::write_parquet(x = df,
                     sink = file.path(build_dir,
                                 paste0(.x, ".parquet")))
              print(tictoc::toc()$callback_msg)
              gc()
            })

DBI::dbDisconnect(con)

6 Join and Export Tables as Parquet

6.1 duckdb

duckdb is used to provide better memory management than would be possible if each table was loaded into R. The duckdb database created is only temporary, as indicated by dbdir = ":memorey:". This means that once the tasks are done, the duckdb database is deleted.

duckdb_con <- dbConnect(duckdb(), dbdir = ":memory:")

Add all of the parquet files in the build directory (build_dir) to the temporary duckdb database.

file_list <- list.files(build_dir) |> tools::file_path_sans_ext()

purrr::walk(file_list,
     ~dbSendQuery(
       conn = duckdb_con, 
       glue("CREATE OR REPLACE TABLE {.x} AS 
            SELECT * FROM '{build_dir}/{.x}.parquet'")),
     .progress = TRUE)

Join all of the primary parent tables to the SAMPLE table and save this table as “SAMPLE_OBT”. This table is not exported as a parquet file. It is just used as the foundation for creating subsequent tables to be exported as parquet files.

dbExecute(
  conn = duckdb_con,
  statement = "CREATE OR REPLACE TABLE SAMPLE_OBT AS
          SELECT * FROM EVENT
          LEFT JOIN PROJECT USING (PROJECT)
          LEFT JOIN SITE USING (SITE_ID)
          LEFT JOIN BASIN USING (BASIN)
          LEFT JOIN WATERBODY USING (WATERBODY_CODE)
          LEFT JOIN SAMPLE USING (EVENT_ID)"
)

6.1.1 obt_result

Join RESULT table and its parent tables (i.e., RESULT_QUALIFIER, PARAMETER, and PARAMETER_NAME) to the SAMPLE_OBT table. Exclude any rows where:

  1. SAMPLE_TYPE is ‘macroinverterbate_abundance’. Macroinvertebrate abundance results do not cleanly into this table. These results are stored separately (see Section 6.1.2).
  2. SAMPLE_TYPE is NULL. SQL represents empty cells as NULL, while in R these same cells would be represented as NA. It appears that there are sampling events with no sample or result data in the database. This is being investigated further. See the following issue for more details: https://github.com/BWAM/bwam_analytics/issues/7
  3. RESULT_QUALIFIER is “R”. “R” represents rejected data. These files where accidentally inserted into the database. A ticket is open with OITS to remove these records from the database. Once this is complete, this statement can be removed. See the following issue for more details: https://github.com/BWAM/data_warehouse_prep/issues/6
dbExecute(
  conn = duckdb_con,
  statement = "CREATE OR REPLACE TABLE OBT_RESULT AS
          SELECT * FROM SAMPLE_OBT
          LEFT JOIN RESULT USING (SAMPLE_ID)
          LEFT JOIN RESULT_QUALIFIER USING (RESULT_QUALIFIER)
          LEFT JOIN PARAMETER USING (PARAMETER_ID)
          LEFT JOIN PARAMETER_NAME USING (PARAMETER_NAME)
          WHERE SAMPLE_TYPE != 'macroinvertebrate_abundance' AND
          SAMPLE_TYPE IS NOT NULL AND
          RESULT_QUALIFIER != 'R'"
)

Write the OBT_RESULT table as parquet file, obt_result.parquet, in the analytical directory.

dbExecute(
  conn = duckdb_con,
  statement = glue(
    "COPY
          (SELECT * FROM OBT_RESULT)
          TO '{analytical_dir}/obt_result.parquet'
          (FORMAT 'parquet')"
  )
)

6.1.2 obt_taxa_abundance

Join TAXONOMIC_ABUNDANCE table and its parent tables (i.e., TAXONOMY, TAXONOMIC_TRAIT) to the SAMPLE_OBT table. Exclude any rows where:

  1. SAMPLE_TYPE is not ‘macroinverterbate_abundance’. This table only represents macroinvertebrate abundance results– we do not need rows representing other sample types in this table.
  2. SAMPLE_TYPE is NULL. SQL represents empty cells as NULL, while in R these same cells would be represented as NA. It appears that there are sampling events with no sample or result data in the database. This is being investigated further. See the following issue for more details: https://github.com/BWAM/bwam_analytics/issues/7
dbExecute(
  conn = duckdb_con,
  statement = "CREATE OR REPLACE TABLE OBT_TAXA_ABUNDANCE AS
          SELECT * FROM SAMPLE_OBT
          LEFT JOIN TAXONOMIC_ABUNDANCE USING (SAMPLE_ID)
          LEFT JOIN TAXONOMY USING (TAXON_ID)
          LEFT JOIN TAXONOMIC_TRAIT USING (TAXON_ID)
          WHERE SAMPLE_TYPE = 'macroinvertebrate_abundance' AND
          SAMPLE_TYPE IS NOT NULL"
)

Write the OBT_TAXA_ABUNDANCE table as parquet file, obt_taxa_abundance.parquet, in the analytical directory.

dbExecute(
  conn = duckdb_con,
  statement = glue(
    "COPY
          (SELECT * FROM OBT_TAXA_ABUNDANCE)
          TO '{analytical_dir}/obt_taxa_abundance.parquet'
          (FORMAT 'parquet')"
  )
)

7 Test Queries

abundance <- file.path(analytical_dir,
                    "obt_taxa_abundance.parquet") |> 
  arrow::open_dataset() |> 
  dplyr::collect()

result <- file.path(analytical_dir,
                    "obt_result.parquet") |> 
  arrow::open_dataset() |> 
  dplyr::collect()