Skip to content

Pipeline API Reference

Auto-generated reference for ca_biositing.pipeline — the ETL pipeline components for data ingestion, transformation, and loading.

Pipeline Package

CA Biositing ETL Pipeline Package.

This package contains ETL (Extract, Transform, Load) pipelines and workflows for processing bioeconomy data for the CA Biositing project.

ETL Core

ETL task package for CA BioSiting.

This package groups extract, transform, and load task modules used by Prefect flows. Keep imports in submodules to avoid import-time side effects.

Extract

Extract tasks for loading source datasets.

Modules in this package read external data sources (files, APIs, and sheets) and return pandas/geopandas data frames for downstream transforms.

Modules

autoclave

ETL Extract: Autoclave

Functions

basic_sample_info

ETL Extract: Basic Sample Info

Functions

billion_ton

Functions

extract(file_id: str = '11xLy_kPTHvoqciUMy3SYA3DLCDIjkOGa', file_name: str = 'billionton_23_agri_download.csv', mime_type: str = 'text/csv', project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts raw Billion Ton data from a file on Google Drive.

Parameters:

Name Type Description Default
file_id str

The Google Drive File ID.

'11xLy_kPTHvoqciUMy3SYA3DLCDIjkOGa'
file_name str

The local filename to save as.

'billionton_23_agri_download.csv'
mime_type str

The MIME type of the file.

'text/csv'
project_root Optional[str]

Optional root directory of the project.

None

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

bioconversion_data

ETL Extract: BioConversion Data

Functions

bioconversion_methods

ETL Extract: BioConversion Methods

Functions

bioconversion_parameters

ETL Extract: BioConv Parameters

Functions

bioconversion_setup

ETL Extract: BioConversion Setup

Functions

biodiesel_plants

Functions

extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts raw data from a .csv file.

This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

ca_proc_points

Functions

extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts raw data from a .zip file.

This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

calorimetry

ETL Extract: Calorimetry

Functions

cmpana

ETL Extract: CmpAna

Functions

compost

ETL Extract: Compost

Functions

decon_methods

ETL Extract: Decon Methods

Functions

decon_vessels

ETL Extract: Decon Vessels

Functions

enz_hydr_methods

ETL Extract: Enz Hydr Methods

Functions

experiments

ETL Extract: Experiments

Functions

factory

Extractor Factory for GSheet-based ETL tasks.

Functions

create_extractor(gsheet_name: str, worksheet_name: str, task_name: Optional[str] = None)

Creates a Prefect task for extracting data from a specific GSheet worksheet.

icp

ETL Extract: ICP

Functions

landiq

ETL Extract for Land IQ Data.

This module provides functionality for extracting Land IQ geospatial data from shapefiles. Supports loading from a local path (Docker Compose / dev) or downloading from an HTTP URL at runtime (Cloud Run) when LANDIQ_SHAPEFILE_URL is set.

Functions

download_shapefile(url: str, logger) -> Optional[tuple[str, str]]

Download a shapefile (or zip archive containing one) from a URL.

Returns a tuple of (shp_path, tmp_dir) on success, or None on failure. The caller is responsible for cleanup of tmp_dir.

extract(shapefile_path: Optional[str] = None) -> Optional[gpd.GeoDataFrame]

Extracts raw data from a Land IQ shapefile.

Resolution order: 1. shapefile_path argument (if provided and exists locally) 2. DEFAULT_SHAPEFILE_PATH (falls back to volume-mounted file for Docker Compose)

URL download is handled by the flow before calling this task.

Parameters:

Name Type Description Default
shapefile_path Optional[str]

Path to the Land IQ shapefile. If None, uses default path.

None

Returns:

Type Description
Optional[GeoDataFrame]

A geopandas GeoDataFrame containing the raw data, or None if an error occurs.

petroleum_pipelines

Functions

extract(project_root: Optional[str] = None) -> Optional[gpd.GeoDataFrame]

Extracts raw data from a .geojson file.

This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.

Returns:

Type Description
Optional[GeoDataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

preparation

ETL Extract: Preparation

Functions

pretreatment_data

ETL Extract: Pretreatment Data

Functions

pretreatment_setup

ETL Extract: Pretreatment Setup

Functions

provider_info

ETL Extract Template.

This module provides a template for extracting data from a Google Sheet.

To use this template: 1. Copy this file to the appropriate subdirectory in src/etl/extract/. For example: src/etl/extract/new_module/new_data.py 2. Update the configuration constants (GSHEET_NAME, WORKSHEET_NAME). 3. Ensure the CREDENTIALS_PATH is correct.

Functions

extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts raw data from the specified Google Sheet worksheet.

This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

proximate

ETL Extract Template.

This module provides a template for extracting data from a Google Sheet.

To use this template: 1. Copy this file to the appropriate subdirectory in src/etl/extract/. For example: src/etl/extract/new_module/new_data.py 2. Update the configuration constants (GSHEET_NAME, WORKSHEET_NAME). 3. Ensure the CREDENTIALS_PATH is correct.

Functions

extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts raw data from the specified Google Sheet worksheet.

This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

reaction_setup

ETL Extract: Reaction Setup

Functions

resources

ETL Extract: Resources

Functions

samplemetadata

ETL Extract: SampleMetadata

Functions

static_resource_info

ETL Extract Template.

This module provides a template for extracting data from a Google Sheet.

To use this template: 1. Copy this file to the appropriate subdirectory in src/etl/extract/. For example: src/etl/extract/new_module/new_data.py 2. Update the configuration constants (GSHEET_NAME, WORKSHEET_NAME). 3. Ensure the CREDENTIALS_PATH is correct.

Functions

extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts raw data from the specified Google Sheet worksheet.

This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

thermochem_data

ETL Extract: Thermochemical Conversion Data

Functions

ultimate

ETL Extract Template.

This module provides a template for extracting data from a Google Sheet.

To use this template: 1. Copy this file to the appropriate subdirectory in src/etl/extract/. For example: src/etl/extract/new_module/new_data.py 2. Update the configuration constants (GSHEET_NAME, WORKSHEET_NAME). 3. Ensure the CREDENTIALS_PATH is correct.

Functions

extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts raw data from the specified Google Sheet worksheet.

This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

usda_census_survey

USDA Census and Survey Data Extraction.

This module extracts agricultural census and survey data from the USDA NASS Quick Stats API for California. Data includes: - Census data (every 5 years): Complete agricultural census - Survey data (annual): Preliminary and final agricultural estimates The USDA API provides access to decades of historical data across many commodities and regions. For more information: https://quickstats.nass.usda.gov/api

Functions

extract() -> Optional[pd.DataFrame]

Extracts USDA data ONLY for commodities mapped in resource_usda_commodity_map and for priority counties (North San Joaquin Valley). This allows adding new crops by updating the database, no code changes needed.

xrd

ETL Extract: XRD

Functions

xrf

ETL Extract: XRF

Functions

Transform

Transform tasks for standardizing extracted data.

Modules in this package clean, validate, and reshape raw extracts into schema-aligned tables ready for load tasks.

Modules

billion_ton

ETL Transform for Billion Ton 2023 Agricultural Dataset

Functions

transform(data_sources: Dict[str, pd.DataFrame], etl_run_id: str | None = None, lineage_group_id: str | None = None) -> Optional[pd.DataFrame]

Transforms raw Billion Ton data into the BillionTon2023Record format.

Modules

landiq

Modules

landiq_record

ETL Transform for Land IQ Data.

This module provides functionality for transforming Land IQ GeoDataFrames into the LandiqRecord table format.

Functions
transform_landiq_record(gdf: gpd.GeoDataFrame, etl_run_id: str = None, lineage_group_id: int = None) -> pd.DataFrame

Transforms Land IQ GeoDataFrame into the LandiqRecord table format.

Parameters:

Name Type Description Default
gdf GeoDataFrame

Raw GeoDataFrame from Land IQ shapefile.

required
etl_run_id str

ID of the current ETL run.

None
lineage_group_id int

ID of the lineage group.

None

Returns:

Type Description
DataFrame

A pandas DataFrame formatted for the landiq_record table.

Modules

prepared_sample

ETL Transform for Prepared Sample.

This module transforms raw preparation data into the prepared_sample table format.

Functions

transform(data_sources: Dict[str, pd.DataFrame], etl_run_id: str | None = None, lineage_group_id: str | None = None) -> Optional[pd.DataFrame]

Transforms raw preparation data into a structured format for the prepared_sample table.

Parameters:

Name Type Description Default
data_sources Dict[str, DataFrame]

Dictionary where keys are source names and values are DataFrames.

required
etl_run_id str | None

ID of the current ETL run.

None
lineage_group_id str | None

ID of the lineage group.

None

Modules

resource

ETL Transform Template.

This module provides a template for transforming raw data from multiple sources. It includes standard cleaning, coercion, and normalization patterns used in the pipeline.

Functions

transform(data_sources: Dict[str, pd.DataFrame], etl_run_id: str | None = None, lineage_group_id: str | None = None) -> Optional[pd.DataFrame]

Transforms raw data from multiple sources into a structured format.

Parameters:

Name Type Description Default
data_sources Dict[str, DataFrame]

Dictionary where keys are source names and values are DataFrames.

required
etl_run_id str | None

ID of the current ETL run.

None
lineage_group_id str | None

ID of the lineage group.

None

Modules

Load

Load tasks for persisting transformed data.

Modules in this package write transformed frames to PostgreSQL using the shared data models and ETL lineage metadata.

Modules

billion_ton

Functions

load(df: pd.DataFrame)

Loads transformed Billion Ton data into the billion_ton2023_record table. Ensures that Place records exist before loading.

field_sample

Functions

load_field_sample(df: pd.DataFrame)

Upserts FieldSample records into the database. Links sampling_location_id based on preserved location metadata.

landiq

Functions

bulk_insert_polygons_ignore(session: Session, geoms: list[str], etl_run_id: str = None, lineage_group_id: str = None, dataset_id: int = None)

Inserts polygons in bulk, ignoring duplicates based on geom.

fetch_polygon_ids_by_geoms(session: Session, geoms: list[str]) -> dict[str, int]

Fetches polygon IDs for a list of geometries.

bulk_upsert_landiq_records(session: Session, records: list[dict]) -> int

Upserts LandiqRecords in bulk using ON CONFLICT (record_id) DO UPDATE. Returns the number of records processed.

load_landiq_record(df: pd.DataFrame)

Upserts Land IQ records into the database using optimized bulk operations.

location_address

Functions

load_location_address(df: pd.DataFrame)

Upserts LocationAddress records into the database. Maps generic location names (like counties) to geography_ids during load.

prepared_sample

Functions

load_prepared_sample(df: pd.DataFrame)

Upserts PreparedSample records into the database based on the 'name' column.

resource

Functions

load_resource(df: pd.DataFrame)

Upserts resource records into the database.

static_resource_info

Functions

load_landiq_resource_mapping(df: pd.DataFrame)

Upserts LandiqResourceMapping records.

load_resource_availability(df: pd.DataFrame)

Upserts ResourceAvailability records.

Utilities

Shared ETL utility package.

Includes reusable helpers used across extract, transform, and load modules. See subpackages for cleaning/coercion and lookup-specific functions.

Cleaning Functions

Cleaning helpers package.

Expose commonly used cleaning and coercion helpers for the ETL pipeline. This package is intentionally small and documented; individual modules contain the implementation so unit tests can target them directly.

Functions

clean_names_df(df: pd.DataFrame) -> pd.DataFrame

Return a copy of df with cleaned column names using janitor.clean_names().

If df is not a DataFrame, the original value is returned and an error is logged.

replace_empty_with_na(df: pd.DataFrame, columns: Optional[Iterable[str]] = None, regex: str = '^\\s*$') -> pd.DataFrame

Replace empty/whitespace-only strings with np.nan.

Parameters:

Name Type Description Default
df DataFrame

input DataFrame

required
columns Optional[Iterable[str]]

optional iterable of column names to process; if None operate on whole frame

None
regex str

regex used to identify empty/whitespace strings

'^\\s*$'

Returns:

Type Description
DataFrame

A new DataFrame with replacements applied.

to_lowercase_df(df: pd.DataFrame, columns: Optional[Iterable[str]] = None) -> pd.DataFrame

Lowercase string columns.

Converts selected columns (or all string-like columns) to pandas string dtype, then applies .str.lower(). Missing values are preserved.

standard_clean(df: pd.DataFrame, lowercase: bool = True, replace_empty: bool = True) -> Optional[pd.DataFrame]

Run a composed standard cleaning pipeline and return a cleaned DataFrame.

Steps
  1. clean_names_df
  2. replace_empty_with_na (optional)
  3. to_lowercase_df (optional)
  4. convert_dtypes() to allow pandas to pick improved nullable dtypes

coerce_columns(df: pd.DataFrame, int_cols: Optional[Iterable[str]] = None, float_cols: Optional[Iterable[str]] = None, datetime_cols: Optional[Iterable[str]] = None, bool_cols: Optional[Iterable[str]] = None, category_cols: Optional[Iterable[str]] = None, geometry_cols: Optional[Iterable[str]] = None, dtype_map: Optional[dict] = None, float_dtype=np.float64, geometry_format: str = 'wkt') -> pd.DataFrame

Coerce groups of columns to target types.

dtype_map may be provided as an alternative mapping with keys like 'int','float','datetime','bool','category','geometry'. Explicit keyword lists take precedence over dtype_map entries.

geometry_format controls how geometry columns are coerced: - 'wkt': parse WKT strings using shapely (default) - 'geodataframe': skip coercion (columns already GeoSeries from geopandas)

coerce_columns_list(dfs: Iterable[pd.DataFrame], **coerce_kwargs) -> list

Apply coerce_columns to each DataFrame in dfs and return a list of results.

Non-DataFrame items are preserved with a warning.

detect_latlon_columns(df: pd.DataFrame) -> Dict[str, list]

Auto-detect latitude and longitude columns in a DataFrame.

Searches for common naming patterns like: - latitude/longitude, lat/lon, desc_lat/desc_lon - sampling_lat/sampling_lon, prod_lat/prod_lon, etc. - Combined columns: latlong, lat_lon, latlng, location, coordinates

Returns:

Type Description
Dict[str, list]

Dict with keys:

Dict[str, list]
  • 'latitude': list of detected latitude columns
Dict[str, list]
  • 'longitude': list of detected longitude columns
Dict[str, list]
  • 'combined': list of combined lat/lon columns

split_combined_latlon(df: pd.DataFrame, col: str, sep: Optional[str] = None, lat_col: str = 'desc_lat', lon_col: str = 'desc_lon', keep_original: bool = False) -> pd.DataFrame

Split a combined lat/lon column into two separate columns.

Handles multiple separators: comma, space, semicolon, pipe, tab. Auto-detects delimiter if not specified.

Parameters:

Name Type Description Default
df DataFrame

input DataFrame

required
col str

name of combined lat/lon column

required
sep Optional[str]

delimiter (e.g., ',', ';'); if None, auto-detects

None
lat_col str

name for output latitude column

'desc_lat'
lon_col str

name for output longitude column

'desc_lon'
keep_original bool

if True, keep the original combined column

False

Returns:

Type Description
DataFrame

DataFrame with new lat/lon columns

standardize_latlon(df: pd.DataFrame, lat_cols: Optional[Iterable[str]] = None, lon_cols: Optional[Iterable[str]] = None, combined_cols: Optional[Iterable[str]] = None, auto_detect: bool = True, output_lat: str = 'desc_lat', output_lon: str = 'desc_lon', sep: Optional[str] = None, coerce_to_float: bool = True) -> pd.DataFrame

Standardize latitude/longitude columns in a DataFrame.

Workflow: 1. Auto-detect lat/lon columns if enabled 2. Split any combined lat/lon columns 3. Rename detected separate columns to output names 4. Optionally coerce to float with error handling

Parameters:

Name Type Description Default
df DataFrame

input DataFrame

required
lat_cols Optional[Iterable[str]]

explicit list of latitude columns to process

None
lon_cols Optional[Iterable[str]]

explicit list of longitude columns to process

None
combined_cols Optional[Iterable[str]]

explicit list of combined lat/lon columns to split

None
auto_detect bool

if True, automatically detect columns by name pattern

True
output_lat str

name for standardized latitude column

'desc_lat'
output_lon str

name for standardized longitude column

'desc_lon'
sep Optional[str]

delimiter for parsing combined columns

None
coerce_to_float bool

if True, coerce to float64

True

Returns:

Type Description
DataFrame

DataFrame with standardized lat/lon columns