Pipeline API Reference
Auto-generated reference for ca_biositing.pipeline — the ETL pipeline
components for data ingestion, transformation, and loading.
Pipeline Package
CA Biositing ETL Pipeline Package.
This package contains ETL (Extract, Transform, Load) pipelines and workflows for processing bioeconomy data for the CA Biositing project.
ETL Core
ETL task package for CA BioSiting.
This package groups extract, transform, and load task modules used by Prefect flows. Keep imports in submodules to avoid import-time side effects.
Extract
Extract tasks for loading source datasets.
Modules in this package read external data sources (files, APIs, and sheets) and return pandas/geopandas data frames for downstream transforms.
Modules
autoclave
ETL Extract: Autoclave
Functions
basic_sample_info
ETL Extract: Basic Sample Info
Functions
billion_ton
Functions
extract(file_id: str = '11xLy_kPTHvoqciUMy3SYA3DLCDIjkOGa', file_name: str = 'billionton_23_agri_download.csv', mime_type: str = 'text/csv', project_root: Optional[str] = None) -> Optional[pd.DataFrame]
Extracts raw Billion Ton data from a file on Google Drive.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_id
|
str
|
The Google Drive File ID. |
'11xLy_kPTHvoqciUMy3SYA3DLCDIjkOGa'
|
file_name
|
str
|
The local filename to save as. |
'billionton_23_agri_download.csv'
|
mime_type
|
str
|
The MIME type of the file. |
'text/csv'
|
project_root
|
Optional[str]
|
Optional root directory of the project. |
None
|
Returns:
| Type | Description |
|---|---|
Optional[DataFrame]
|
A pandas DataFrame containing the raw data, or None if an error occurs. |
bioconversion_data
ETL Extract: BioConversion Data
Functions
bioconversion_methods
ETL Extract: BioConversion Methods
Functions
bioconversion_parameters
ETL Extract: BioConv Parameters
Functions
bioconversion_setup
ETL Extract: BioConversion Setup
Functions
biodiesel_plants
Functions
extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]
Extracts raw data from a .csv file.
This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.
Returns:
| Type | Description |
|---|---|
Optional[DataFrame]
|
A pandas DataFrame containing the raw data, or None if an error occurs. |
ca_proc_points
Functions
extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]
Extracts raw data from a .zip file.
This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.
Returns:
| Type | Description |
|---|---|
Optional[DataFrame]
|
A pandas DataFrame containing the raw data, or None if an error occurs. |
calorimetry
ETL Extract: Calorimetry
Functions
cmpana
ETL Extract: CmpAna
Functions
compost
ETL Extract: Compost
Functions
decon_methods
ETL Extract: Decon Methods
Functions
decon_vessels
ETL Extract: Decon Vessels
Functions
enz_hydr_methods
ETL Extract: Enz Hydr Methods
Functions
experiments
ETL Extract: Experiments
Functions
factory
Extractor Factory for GSheet-based ETL tasks.
Functions
create_extractor(gsheet_name: str, worksheet_name: str, task_name: Optional[str] = None)
Creates a Prefect task for extracting data from a specific GSheet worksheet.
icp
ETL Extract: ICP
Functions
landiq
ETL Extract for Land IQ Data.
This module provides functionality for extracting Land IQ geospatial data from shapefiles. Supports loading from a local path (Docker Compose / dev) or downloading from an HTTP URL at runtime (Cloud Run) when LANDIQ_SHAPEFILE_URL is set.
Functions
download_shapefile(url: str, logger) -> Optional[tuple[str, str]]
Download a shapefile (or zip archive containing one) from a URL.
Returns a tuple of (shp_path, tmp_dir) on success, or None on failure. The caller is responsible for cleanup of tmp_dir.
extract(shapefile_path: Optional[str] = None) -> Optional[gpd.GeoDataFrame]
Extracts raw data from a Land IQ shapefile.
Resolution order:
1. shapefile_path argument (if provided and exists locally)
2. DEFAULT_SHAPEFILE_PATH (falls back to volume-mounted file for Docker Compose)
URL download is handled by the flow before calling this task.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
shapefile_path
|
Optional[str]
|
Path to the Land IQ shapefile. If None, uses default path. |
None
|
Returns:
| Type | Description |
|---|---|
Optional[GeoDataFrame]
|
A geopandas GeoDataFrame containing the raw data, or None if an error occurs. |
petroleum_pipelines
Functions
extract(project_root: Optional[str] = None) -> Optional[gpd.GeoDataFrame]
Extracts raw data from a .geojson file.
This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.
Returns:
| Type | Description |
|---|---|
Optional[GeoDataFrame]
|
A pandas DataFrame containing the raw data, or None if an error occurs. |
preparation
ETL Extract: Preparation
Functions
pretreatment_data
ETL Extract: Pretreatment Data
Functions
pretreatment_setup
ETL Extract: Pretreatment Setup
Functions
provider_info
ETL Extract Template.
This module provides a template for extracting data from a Google Sheet.
To use this template:
1. Copy this file to the appropriate subdirectory in src/etl/extract/.
For example: src/etl/extract/new_module/new_data.py
2. Update the configuration constants (GSHEET_NAME, WORKSHEET_NAME).
3. Ensure the CREDENTIALS_PATH is correct.
Functions
extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]
Extracts raw data from the specified Google Sheet worksheet.
This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.
Returns:
| Type | Description |
|---|---|
Optional[DataFrame]
|
A pandas DataFrame containing the raw data, or None if an error occurs. |
proximate
ETL Extract Template.
This module provides a template for extracting data from a Google Sheet.
To use this template:
1. Copy this file to the appropriate subdirectory in src/etl/extract/.
For example: src/etl/extract/new_module/new_data.py
2. Update the configuration constants (GSHEET_NAME, WORKSHEET_NAME).
3. Ensure the CREDENTIALS_PATH is correct.
Functions
extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]
Extracts raw data from the specified Google Sheet worksheet.
This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.
Returns:
| Type | Description |
|---|---|
Optional[DataFrame]
|
A pandas DataFrame containing the raw data, or None if an error occurs. |
reaction_setup
ETL Extract: Reaction Setup
Functions
resources
ETL Extract: Resources
Functions
samplemetadata
ETL Extract: SampleMetadata
Functions
static_resource_info
ETL Extract Template.
This module provides a template for extracting data from a Google Sheet.
To use this template:
1. Copy this file to the appropriate subdirectory in src/etl/extract/.
For example: src/etl/extract/new_module/new_data.py
2. Update the configuration constants (GSHEET_NAME, WORKSHEET_NAME).
3. Ensure the CREDENTIALS_PATH is correct.
Functions
extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]
Extracts raw data from the specified Google Sheet worksheet.
This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.
Returns:
| Type | Description |
|---|---|
Optional[DataFrame]
|
A pandas DataFrame containing the raw data, or None if an error occurs. |
thermochem_data
ETL Extract: Thermochemical Conversion Data
Functions
ultimate
ETL Extract Template.
This module provides a template for extracting data from a Google Sheet.
To use this template:
1. Copy this file to the appropriate subdirectory in src/etl/extract/.
For example: src/etl/extract/new_module/new_data.py
2. Update the configuration constants (GSHEET_NAME, WORKSHEET_NAME).
3. Ensure the CREDENTIALS_PATH is correct.
Functions
extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]
Extracts raw data from the specified Google Sheet worksheet.
This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.
Returns:
| Type | Description |
|---|---|
Optional[DataFrame]
|
A pandas DataFrame containing the raw data, or None if an error occurs. |
usda_census_survey
USDA Census and Survey Data Extraction.
This module extracts agricultural census and survey data from the USDA NASS Quick Stats API for California. Data includes: - Census data (every 5 years): Complete agricultural census - Survey data (annual): Preliminary and final agricultural estimates The USDA API provides access to decades of historical data across many commodities and regions. For more information: https://quickstats.nass.usda.gov/api
Functions
extract() -> Optional[pd.DataFrame]
Extracts USDA data ONLY for commodities mapped in resource_usda_commodity_map and for priority counties (North San Joaquin Valley). This allows adding new crops by updating the database, no code changes needed.
xrd
ETL Extract: XRD
Functions
xrf
ETL Extract: XRF
Functions
Transform
Transform tasks for standardizing extracted data.
Modules in this package clean, validate, and reshape raw extracts into schema-aligned tables ready for load tasks.
Modules
billion_ton
ETL Transform for Billion Ton 2023 Agricultural Dataset
Functions
transform(data_sources: Dict[str, pd.DataFrame], etl_run_id: str | None = None, lineage_group_id: str | None = None) -> Optional[pd.DataFrame]
Transforms raw Billion Ton data into the BillionTon2023Record format.
Modules
landiq
Modules
landiq_record
ETL Transform for Land IQ Data.
This module provides functionality for transforming Land IQ GeoDataFrames into the LandiqRecord table format.
Functions
transform_landiq_record(gdf: gpd.GeoDataFrame, etl_run_id: str = None, lineage_group_id: int = None) -> pd.DataFrame
Transforms Land IQ GeoDataFrame into the LandiqRecord table format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gdf
|
GeoDataFrame
|
Raw GeoDataFrame from Land IQ shapefile. |
required |
etl_run_id
|
str
|
ID of the current ETL run. |
None
|
lineage_group_id
|
int
|
ID of the lineage group. |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
A pandas DataFrame formatted for the landiq_record table. |
Modules
prepared_sample
ETL Transform for Prepared Sample.
This module transforms raw preparation data into the prepared_sample table format.
Functions
transform(data_sources: Dict[str, pd.DataFrame], etl_run_id: str | None = None, lineage_group_id: str | None = None) -> Optional[pd.DataFrame]
Transforms raw preparation data into a structured format for the prepared_sample table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_sources
|
Dict[str, DataFrame]
|
Dictionary where keys are source names and values are DataFrames. |
required |
etl_run_id
|
str | None
|
ID of the current ETL run. |
None
|
lineage_group_id
|
str | None
|
ID of the lineage group. |
None
|
Modules
resource
ETL Transform Template.
This module provides a template for transforming raw data from multiple sources. It includes standard cleaning, coercion, and normalization patterns used in the pipeline.
Functions
transform(data_sources: Dict[str, pd.DataFrame], etl_run_id: str | None = None, lineage_group_id: str | None = None) -> Optional[pd.DataFrame]
Transforms raw data from multiple sources into a structured format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_sources
|
Dict[str, DataFrame]
|
Dictionary where keys are source names and values are DataFrames. |
required |
etl_run_id
|
str | None
|
ID of the current ETL run. |
None
|
lineage_group_id
|
str | None
|
ID of the lineage group. |
None
|
Modules
Load
Load tasks for persisting transformed data.
Modules in this package write transformed frames to PostgreSQL using the shared data models and ETL lineage metadata.
Modules
billion_ton
Functions
load(df: pd.DataFrame)
Loads transformed Billion Ton data into the billion_ton2023_record table. Ensures that Place records exist before loading.
field_sample
Functions
load_field_sample(df: pd.DataFrame)
Upserts FieldSample records into the database. Links sampling_location_id based on preserved location metadata.
landiq
Functions
bulk_insert_polygons_ignore(session: Session, geoms: list[str], etl_run_id: str = None, lineage_group_id: str = None, dataset_id: int = None)
Inserts polygons in bulk, ignoring duplicates based on geom.
fetch_polygon_ids_by_geoms(session: Session, geoms: list[str]) -> dict[str, int]
Fetches polygon IDs for a list of geometries.
bulk_upsert_landiq_records(session: Session, records: list[dict]) -> int
Upserts LandiqRecords in bulk using ON CONFLICT (record_id) DO UPDATE. Returns the number of records processed.
load_landiq_record(df: pd.DataFrame)
Upserts Land IQ records into the database using optimized bulk operations.
location_address
Functions
load_location_address(df: pd.DataFrame)
Upserts LocationAddress records into the database. Maps generic location names (like counties) to geography_ids during load.
prepared_sample
Functions
load_prepared_sample(df: pd.DataFrame)
Upserts PreparedSample records into the database based on the 'name' column.
resource
Functions
load_resource(df: pd.DataFrame)
Upserts resource records into the database.
static_resource_info
Functions
load_landiq_resource_mapping(df: pd.DataFrame)
Upserts LandiqResourceMapping records.
load_resource_availability(df: pd.DataFrame)
Upserts ResourceAvailability records.
Utilities
Shared ETL utility package.
Includes reusable helpers used across extract, transform, and load modules. See subpackages for cleaning/coercion and lookup-specific functions.
Cleaning Functions
Cleaning helpers package.
Expose commonly used cleaning and coercion helpers for the ETL pipeline. This package is intentionally small and documented; individual modules contain the implementation so unit tests can target them directly.
Functions
clean_names_df(df: pd.DataFrame) -> pd.DataFrame
Return a copy of df with cleaned column names using janitor.clean_names().
If df is not a DataFrame, the original value is returned and an error is logged.
replace_empty_with_na(df: pd.DataFrame, columns: Optional[Iterable[str]] = None, regex: str = '^\\s*$') -> pd.DataFrame
Replace empty/whitespace-only strings with np.nan.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
input DataFrame |
required |
columns
|
Optional[Iterable[str]]
|
optional iterable of column names to process; if None operate on whole frame |
None
|
regex
|
str
|
regex used to identify empty/whitespace strings |
'^\\s*$'
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
A new DataFrame with replacements applied. |
to_lowercase_df(df: pd.DataFrame, columns: Optional[Iterable[str]] = None) -> pd.DataFrame
Lowercase string columns.
Converts selected columns (or all string-like columns) to pandas string dtype, then applies
.str.lower(). Missing values are preserved.
standard_clean(df: pd.DataFrame, lowercase: bool = True, replace_empty: bool = True) -> Optional[pd.DataFrame]
Run a composed standard cleaning pipeline and return a cleaned DataFrame.
Steps
clean_names_dfreplace_empty_with_na(optional)to_lowercase_df(optional)convert_dtypes()to allow pandas to pick improved nullable dtypes
coerce_columns(df: pd.DataFrame, int_cols: Optional[Iterable[str]] = None, float_cols: Optional[Iterable[str]] = None, datetime_cols: Optional[Iterable[str]] = None, bool_cols: Optional[Iterable[str]] = None, category_cols: Optional[Iterable[str]] = None, geometry_cols: Optional[Iterable[str]] = None, dtype_map: Optional[dict] = None, float_dtype=np.float64, geometry_format: str = 'wkt') -> pd.DataFrame
Coerce groups of columns to target types.
dtype_map may be provided as an alternative mapping with keys like 'int','float','datetime','bool','category','geometry'.
Explicit keyword lists take precedence over dtype_map entries.
geometry_format controls how geometry columns are coerced:
- 'wkt': parse WKT strings using shapely (default)
- 'geodataframe': skip coercion (columns already GeoSeries from geopandas)
coerce_columns_list(dfs: Iterable[pd.DataFrame], **coerce_kwargs) -> list
Apply coerce_columns to each DataFrame in dfs and return a list of results.
Non-DataFrame items are preserved with a warning.
detect_latlon_columns(df: pd.DataFrame) -> Dict[str, list]
Auto-detect latitude and longitude columns in a DataFrame.
Searches for common naming patterns like: - latitude/longitude, lat/lon, desc_lat/desc_lon - sampling_lat/sampling_lon, prod_lat/prod_lon, etc. - Combined columns: latlong, lat_lon, latlng, location, coordinates
Returns:
| Type | Description |
|---|---|
Dict[str, list]
|
Dict with keys: |
Dict[str, list]
|
|
Dict[str, list]
|
|
Dict[str, list]
|
|
split_combined_latlon(df: pd.DataFrame, col: str, sep: Optional[str] = None, lat_col: str = 'desc_lat', lon_col: str = 'desc_lon', keep_original: bool = False) -> pd.DataFrame
Split a combined lat/lon column into two separate columns.
Handles multiple separators: comma, space, semicolon, pipe, tab. Auto-detects delimiter if not specified.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
input DataFrame |
required |
col
|
str
|
name of combined lat/lon column |
required |
sep
|
Optional[str]
|
delimiter (e.g., ',', ';'); if None, auto-detects |
None
|
lat_col
|
str
|
name for output latitude column |
'desc_lat'
|
lon_col
|
str
|
name for output longitude column |
'desc_lon'
|
keep_original
|
bool
|
if True, keep the original combined column |
False
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
DataFrame with new lat/lon columns |
standardize_latlon(df: pd.DataFrame, lat_cols: Optional[Iterable[str]] = None, lon_cols: Optional[Iterable[str]] = None, combined_cols: Optional[Iterable[str]] = None, auto_detect: bool = True, output_lat: str = 'desc_lat', output_lon: str = 'desc_lon', sep: Optional[str] = None, coerce_to_float: bool = True) -> pd.DataFrame
Standardize latitude/longitude columns in a DataFrame.
Workflow: 1. Auto-detect lat/lon columns if enabled 2. Split any combined lat/lon columns 3. Rename detected separate columns to output names 4. Optionally coerce to float with error handling
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
input DataFrame |
required |
lat_cols
|
Optional[Iterable[str]]
|
explicit list of latitude columns to process |
None
|
lon_cols
|
Optional[Iterable[str]]
|
explicit list of longitude columns to process |
None
|
combined_cols
|
Optional[Iterable[str]]
|
explicit list of combined lat/lon columns to split |
None
|
auto_detect
|
bool
|
if True, automatically detect columns by name pattern |
True
|
output_lat
|
str
|
name for standardized latitude column |
'desc_lat'
|
output_lon
|
str
|
name for standardized longitude column |
'desc_lon'
|
sep
|
Optional[str]
|
delimiter for parsing combined columns |
None
|
coerce_to_float
|
bool
|
if True, coerce to float64 |
True
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
DataFrame with standardized lat/lon columns |