class CDS(Connector):
"""
Attributes:
connector_type (str): Name of connector
collections (list): A list of available collections.
collections_details (list): Detailed information about the collections.
"""
def __init__(self):
"""
Initialize climate_data_store with collections and configuration.
"""
self.connector_type: str = "climate_data_store"
self.CDSAPI_URL: str = "https://cds.climate.copernicus.eu/api"
self.stac_url: str = "https://cds.climate.copernicus.eu/api/catalogue/v1/"
self.collections: list[Any] = load_and_list_collections(
connector_type=self.connector_type
)
self.collections_details: list[Any] = load_and_list_collections(
as_json=True, connector_type=self.connector_type
)
self.metadata_dir = Path(__file__).parent / "cds_utils"
# Load CORDEX domains
self.cordex_domains = CORDEX_DOMAINS
# ERA5 variable name to stepType mapping
# This lookup table allows inferring stepType when it's not in the filename
VARIABLE_STEPTYPE_MAP = {
# Instantaneous parameters
"t2m": "instant",
"2m_temperature": "instant",
"u10": "instant",
"10m_u_component_of_wind": "instant",
"v10": "instant",
"10m_v_component_of_wind": "instant",
"msl": "instant",
"mean_sea_level_pressure": "instant",
"d2m": "instant",
"2m_dewpoint_temperature": "instant",
"sp": "instant",
"surface_pressure": "instant",
"skt": "instant",
"skin_temperature": "instant",
"tcc": "instant",
"total_cloud_cover": "instant",
"tcwv": "instant",
"total_column_water_vapour": "instant",
# Accumulated parameters
"tp": "accum",
"total_precipitation": "accum",
"ssr": "accum",
"surface_net_solar_radiation": "accum",
"str": "accum",
"surface_net_thermal_radiation": "accum",
"e": "accum",
"evaporation": "accum",
"ro": "accum",
"runoff": "accum",
"sf": "accum",
"snowfall": "accum",
"ssrd": "accum",
"surface_solar_radiation_downwards": "accum",
"strd": "accum",
"surface_thermal_radiation_downwards": "accum",
# Mean rate parameters
"avg_tprate": "avg",
"mean_total_precipitation_rate": "avg",
# Min/Max parameters
"mx2t": "max",
"maximum_2m_temperature_since_previous_post_processing": "max",
"mn2t": "min",
"minimum_2m_temperature_since_previous_post_processing": "min",
"fg10": "max",
"10m_wind_gust_since_previous_post_processing": "max",
}
def _is_cordex_collection(self, collection_name: str) -> bool:
"""Check if collection is a CORDEX dataset."""
return "cordex" in collection_name.lower()
def _download_and_extract_month(
self,
month_info: tuple[int, str, str],
total_months: int,
data_collection_name: str,
bbox: list[Any],
bands: list[Any],
query_params: Dict[str, Any],
working_dir: str,
extract_dir: Path,
) -> tuple[int, str, str]:
"""Download and extract a single month's data."""
idx, month_start, month_end = month_info
logger.info(
f"Downloading chunk {idx}/{total_months}: {month_start} to {month_end}"
)
zip_path = self._download_from_cds(
data_collection_name,
month_start,
month_end,
bbox,
bands,
query_params,
working_dir,
)
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(extract_dir)
Path(zip_path).unlink()
return idx, month_start, month_end
def _get_cordex_year_blocks(
self,
collection_name: str,
domain: str,
experiment: str,
horizontal_resolution: str,
temporal_resolution: str,
gcm_model: str,
rcm_model: str,
ensemble_member: str,
variable: str,
start_year: int,
end_year: int,
) -> list:
"""
Get the year blocks for a CORDEX request based on constraints.
For CORDEX data with fixed blocks, this returns the list of year blocks
that cover the requested time range. For flexible ranges, it returns
the full requested range as a single block.
Args:
collection_name: Name of the CORDEX collection
domain: CORDEX domain
experiment: Experiment type
horizontal_resolution: Grid resolution
temporal_resolution: Temporal resolution
gcm_model: Global Climate Model
rcm_model: Regional Climate Model
ensemble_member: Ensemble member
variable: Variable name
start_year: Start year for data request
end_year: End year for data request
Returns:
List of tuples (block_start_year, block_end_year) representing year blocks
"""
# Load constraints_variables file
constraints_list = self._load_cordex_constraints_variables(collection_name)
# Collect all matching year blocks across all constraint entries
# The constraints file may have separate entries for each year block
all_year_blocks = []
for combo in constraints_list:
if (
domain in combo.get("domain", [])
and experiment in combo.get("experiment", [])
and horizontal_resolution in combo.get("horizontal_resolution", [])
and temporal_resolution in combo.get("temporal_resolution", [])
and gcm_model in combo.get("gcm_model", [])
and rcm_model in combo.get("rcm_model", [])
and ensemble_member in combo.get("ensemble_member", [])
and variable in combo.get("variable", [])
):
combo_start_years = combo.get("start_year", [])
combo_end_years = combo.get("end_year", [])
if combo_start_years and combo_end_years:
# Add all year blocks from this combo
for sy, ey in zip(combo_start_years, combo_end_years):
block_start = int(sy)
block_end = int(ey)
# Include block if it overlaps with requested range
if block_start <= end_year and block_end >= start_year:
all_year_blocks.append((block_start, block_end))
if all_year_blocks:
# Remove duplicates and sort
unique_blocks = sorted(set(all_year_blocks))
return unique_blocks
# If no matching combination found, return the requested range as a single block
# (validation will catch any issues)
return [(start_year, end_year)]
def _download_and_extract_cordex_block(
self,
block_info: tuple[int, int, int],
total_blocks: int,
data_collection_name: str,
bbox: list[Any],
bands: list[Any],
query_params: Dict[str, Any],
working_dir: str,
extract_dir: Path,
) -> tuple[int, int, int]:
"""
Download and extract a single CORDEX year block.
Args:
block_info: Tuple of (index, block_start_year, block_end_year)
total_blocks: Total number of blocks being downloaded
data_collection_name: Name of the data collection
bbox: Bounding box
bands: List of bands
query_params: Query parameters
working_dir: Working directory
extract_dir: Directory to extract files to
Returns:
Tuple of (index, block_start_year, block_end_year)
"""
idx, block_start_year, block_end_year = block_info
# Convert year block to date strings
block_start_str = f"{block_start_year}-01-01"
block_end_str = f"{block_end_year}-12-31"
logger.info(
f"Downloading CORDEX block {idx}/{total_blocks}: {block_start_year}-{block_end_year}"
)
zip_path = self._download_from_cds(
data_collection_name,
block_start_str,
block_end_str,
bbox,
bands,
query_params,
working_dir,
)
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(extract_dir)
Path(zip_path).unlink()
return idx, block_start_year, block_end_year
def _get_cordex_domain_from_bbox(self, bbox: list) -> str:
"""
Map user bbox to appropriate CORDEX domain code.
Args:
bbox: User's bounding box [min_lon, min_lat, max_lon, max_lat]
Returns:
str: CORDEX domain code (e.g., 'EUR-11')
Raises:
TerrakitValidationError: If no matching domain found
"""
matching_domains = find_matching_domains(bbox)
if not matching_domains:
raise TerrakitValidationError(
message=f"Bbox {bbox} does not intersect with any CORDEX domain. "
f"Use list_cordex_domains() to see available domains."
)
if len(matching_domains) == 1:
return matching_domains[0]
# Multiple matches - return best match based on overlap
return self._find_best_cordex_match(bbox, matching_domains)
def _cordex_code_to_api_domain(self, domain_code: str) -> str:
"""
Convert CORDEX domain code to CDS API domain name.
Args:
domain_code: CORDEX domain code (e.g., 'EUR-11', 'AFR-44')
Returns:
str: CDS API domain name (e.g., 'europe', 'africa')
Raises:
TerrakitValidationError: If domain code is invalid
"""
# Mapping from domain code prefixes to CDS API domain names
domain_mapping = {
"AFR": "africa",
"ANT": "antarctic",
"ARC": "arctic",
"AUS": "australasia",
"CAM": "central_america",
"CAS": "central_asia",
"EAS": "east_asia",
"EUR": "europe",
"MED": "mediterranean",
"MNA": "middle_east_and_north_africa",
"NAM": "north_america",
"SAM": "south_america",
"SAS": "south_asia",
"SEA": "south_east_asia",
"WAS": "west_asia",
}
# Extract prefix from domain code (e.g., 'EUR' from 'EUR-11')
prefix = domain_code.split("-")[0] if "-" in domain_code else domain_code
if prefix not in domain_mapping:
raise TerrakitValidationError(
message=f"Unknown CORDEX domain code: {domain_code}"
)
return domain_mapping[prefix]
def _find_best_cordex_match(self, bbox: list, domain_codes: list) -> str:
"""
Find CORDEX domain with maximum overlap with user bbox.
Args:
bbox: User's bounding box
domain_codes: List of candidate domain codes
Returns:
str: Best matching domain code
"""
user_box = box(bbox[0], bbox[1], bbox[2], bbox[3])
best_domain: str = domain_codes[0] # Initialize with first domain
max_overlap = 0
for domain_code in domain_codes:
domain_bbox = self.cordex_domains[domain_code]["bbox"]
domain_box = box(
domain_bbox[0], domain_bbox[1], domain_bbox[2], domain_bbox[3]
)
overlap_area = user_box.intersection(domain_box).area
if overlap_area > max_overlap:
max_overlap = overlap_area
best_domain = domain_code
logger.info(
f"Multiple CORDEX domains match bbox. Selected {best_domain} with largest overlap."
)
return best_domain
def _infer_steptype(
self, filename: str, variable_name: str, collection_name: str = ""
) -> str:
"""
Infer stepType from filename or variable name.
Uses a three-tier approach:
1. Extract from filename if present (stepType-xxx)
2. Look up variable name in VARIABLE_STEPTYPE_MAP
3. Fall back to "unknown"
Note: stepType is only relevant for ERA5 data (derived from GRIB format).
For CORDEX data, stepType is not applicable and will be set to "unknown"
without generating a warning.
Parameters
----------
filename : str
NetCDF filename
variable_name : str
Variable name from the dataset
collection_name : str, optional
Name of the data collection (used to determine if stepType is relevant)
Returns
-------
str
stepType: 'instant', 'accum', 'avg', 'max', 'min', or 'unknown'
"""
# Method 1: Try extracting from filename for variables consolidated by stepType
if "stepType-" in filename:
step_type = filename.split("stepType-")[1].split(".")[0]
logger.debug(f"Extracted stepType '{step_type}' from filename: {filename}")
return step_type
# Method 2: Look up variable name in mapping
if variable_name in self.VARIABLE_STEPTYPE_MAP:
step_type = self.VARIABLE_STEPTYPE_MAP[variable_name]
logger.debug(
f"Inferred stepType '{step_type}' from variable name: {variable_name}"
)
return step_type
# Method 3: Fall back to unknown
# For CORDEX collections, stepType is not applicable (it's an ERA5/GRIB concept)
# so we don't warn about it
is_cordex = self._is_cordex_collection(collection_name)
if not is_cordex:
logger.warning(
f"Could not determine stepType for variable '{variable_name}' "
f"in file '{filename}'. Marking as 'unknown'. "
f"Consider adding this variable to VARIABLE_STEPTYPE_MAP."
)
else:
logger.debug(
f"stepType not applicable for CORDEX data. "
f"Setting to 'unknown' for variable '{variable_name}' in file '{filename}'."
)
return "unknown"
def _estimate_request_size(
self,
collection_name: str,
date_start: str,
date_end: str,
bbox: list,
bands: list,
) -> dict:
"""
Estimate the size and duration of a CDS request.
Returns:
dict with keys: 'num_days', 'num_variables', 'area_km2',
'estimated_mb', 'estimated_minutes'
"""
# Calculate number of days
start = datetime.strptime(date_start, "%Y-%m-%d")
end = datetime.strptime(date_end, "%Y-%m-%d")
num_days = (end - start).days + 1
# Calculate area in km²
# Approximate conversion: 1 degree ≈ 111 km at equator
lon_range = bbox[2] - bbox[0]
lat_range = bbox[3] - bbox[1]
avg_lat = (bbox[1] + bbox[3]) / 2
# Adjust longitude distance by latitude (cosine correction)
lon_km = lon_range * 111 * math.cos(math.radians(avg_lat))
lat_km = lat_range * 111
area_km2 = lon_km * lat_km
# Number of variables
num_variables = len(bands) if bands else 1
# Estimate file size (rough approximations based on CDS data)
if self._is_cordex_collection(collection_name):
# CORDEX: ~0.5 MB per day per variable for typical domain
mb_per_day_per_var = 0.5
else:
# ERA5: depends on resolution and area
# ~0.1 MB per day per variable per 10,000 km²
mb_per_day_per_var = (area_km2 / 10000) * 0.1
estimated_mb = num_days * num_variables * mb_per_day_per_var
# Estimate download time
# CDS queue time: 1-5 minutes (average 2)
# Download speed: ~5 MB/min (conservative estimate)
queue_time_min = 2
download_time_min = estimated_mb / 5
estimated_minutes = queue_time_min + download_time_min
return {
"num_days": num_days,
"num_variables": num_variables,
"area_km2": round(area_km2, 2),
"estimated_mb": round(estimated_mb, 2),
"estimated_minutes": round(estimated_minutes, 1),
}
def _download_from_cds(
self,
collection_name: str,
date_start: str,
date_end: str,
bbox: list,
bands: list = [],
query_params: dict = {},
working_dir: str = ".",
) -> str:
"""
Download data from CDS API with size and time estimates.
Args:
collection_name: CDS dataset name
date_start: Start date (YYYY-MM-DD)
date_end: End date (YYYY-MM-DD)
bbox: Bounding box [min_lon, min_lat, max_lon, max_lat]
bands: List of variables/bands to download
working_dir: Directory to save the downloaded zip file
Returns:
Path to downloaded zip file in working_dir
"""
# Ensure working_dir exists
Path(working_dir).mkdir(parents=True, exist_ok=True)
# Estimate request size
estimate = self._estimate_request_size(
collection_name, date_start, date_end, bbox, bands
)
# Log detailed information
logger.info(f"Submitting CDS request for {collection_name}")
logger.info(
f"Date range: {date_start} to {date_end} ({estimate['num_days']} days)"
)
logger.info(f"Area: {estimate['area_km2']} km²")
logger.info(f"Variables: {estimate['num_variables']}")
logger.info(f"Estimated size: ~{estimate['estimated_mb']} MB")
logger.info(f"Estimated time: ~{estimate['estimated_minutes']} minutes")
# Connect and build request
client = self._connect_to_cds()
request_params = self._build_request_params(
collection_name,
date_start,
date_end,
bbox,
bands,
self._load_constraints(collection_name),
query_params,
)
# Log request parameters for debugging
logger.debug("CDS Request Parameters:")
logger.debug(json.dumps(request_params, indent=2))
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_filename = f"cds_{collection_name}_{timestamp}.zip"
output_zip = str(Path(working_dir) / output_filename)
logger.info("Request submitted to CDS queue. Please wait...")
try:
start_time = datetime.now()
client.retrieve(collection_name, request_params, output_zip)
# Log success
actual_time = (datetime.now() - start_time).total_seconds() / 60
logger.info(f"✓ Download complete: {output_zip}")
logger.info(f"Actual time: {actual_time:.1f} minutes")
return output_zip
except requests.HTTPError as e:
# Parse CDS-specific error messages
error_details = self._parse_cds_error(e)
logger.error("=" * 70)
logger.error("CLIMATE DATA STORE REQUEST FAILED")
logger.error("=" * 70)
logger.error(f"Collection: {collection_name}")
logger.error(f"Error Type: {error_details['type']}")
logger.error(f"Error Message: {error_details['message']}")
logger.error("")
logger.error("Request Parameters:")
logger.error(json.dumps(request_params, indent=2))
logger.error("")
logger.error("Possible causes:")
for cause in error_details["possible_causes"]:
logger.error(f" - {cause}")
logger.error("=" * 70)
raise TerrakitValidationError(
message=f"CLIMATE DATA STORE REQUEST FAILED: {error_details['message']}\n"
f"Collection: {collection_name}\n"
f"Error type: {error_details['type']}\n"
f"See logs for full request parameters and troubleshooting tips."
)
except Exception as e:
logger.error("=" * 70)
logger.error("UNEXPECTED ERROR DURING CDS DOWNLOAD")
logger.error("=" * 70)
logger.error(f"Collection: {collection_name}")
logger.error(f"Error: {str(e)}")
logger.error("")
logger.error("Request Parameters:")
logger.error(json.dumps(request_params, indent=2))
logger.error("=" * 70)
raise TerrakitValidationError(
message=f"Failed to download from CDS: {str(e)}\n"
f"Collection: {collection_name}\n"
f"See logs for full request parameters."
)
def _parse_cds_error(self, error: requests.HTTPError) -> dict:
"""
Parse CDS API error and provide helpful troubleshooting information.
Returns:
dict with keys: 'type', 'message', 'possible_causes'
"""
error_str = str(error)
# Common CDS error patterns
if "ValueError" in error_str:
return {
"type": "ValueError",
"message": "Invalid parameter value in request",
"possible_causes": [
"Variable/band name not valid for this collection",
"Date outside collection temporal range",
"Invalid area/bbox coordinates",
"Missing required parameters",
"Check CDS documentation for valid parameter values",
],
}
elif "400" in error_str or "Bad Request" in error_str:
return {
"type": "Bad Request (400)",
"message": "CDS rejected the request parameters",
"possible_causes": [
"Invalid parameter format",
"Required parameter missing",
"Parameter value out of range",
"Check parameter names match CDS API expectations",
],
}
elif "401" in error_str or "Unauthorized" in error_str:
return {
"type": "Unauthorized (401)",
"message": "Authentication failed",
"possible_causes": [
"Invalid or missing CDS API key",
"API key not set in environment (CDSAPI_KEY)",
"Account not activated or suspended",
],
}
elif "403" in error_str or "Forbidden" in error_str:
return {
"type": "Forbidden (403)",
"message": "Access denied to this dataset",
"possible_causes": [
"Dataset license not accepted",
"Visit CDS website to accept terms and conditions",
"Account lacks permissions for this dataset",
],
}
else:
return {
"type": "Unknown Error",
"message": error_str,
"possible_causes": [
"Check CDS service status",
"Verify request parameters",
"Review CDS API documentation",
],
}
def _build_request_params(
self,
collection_name: str,
date_start: str,
date_end: str,
bbox: list,
bands: list,
constraints: dict,
query_params: dict = {},
) -> Dict[str, Any]:
"""
Build CDS API request parameters based on collection type.
Args:
collection_name: CDS dataset name
date_start: Start date (YYYY-MM-DD)
date_end: End date (YYYY-MM-DD)
bbox: Bounding box [min_lon, min_lat, max_lon, max_lat]
bands: List of variables/bands to download
constraints: Collection constraints from metadata
query_params: Additional collection-specific parameters (e.g., daily_statistic, frequency)
Returns:
Dictionary of request parameters for CDS API
"""
params: Dict[str, Any] = {}
# Handle different collection types
if self._is_cordex_collection(collection_name):
# CORDEX collections need domain instead of bbox
domain_code = self._get_cordex_domain_from_bbox(bbox)
# Convert domain code (e.g., 'EUR-11') to API domain name (e.g., 'europe')
api_domain = self._cordex_code_to_api_domain(domain_code)
params["domain"] = api_domain
# Set default parameters for CORDEX collections
# These can be overridden by query_params
params["experiment"] = "historical"
params["horizontal_resolution"] = "0_44_degree_x_0_44_degree"
params["temporal_resolution"] = "daily_mean"
params["ensemble_member"] = "r1i1p1"
params["gcm_model"] = "ichec_ec_earth" # Default GCM model
params["rcm_model"] = "knmi_racmo22t" # Default RCM model
params["data_format"] = "netcdf"
# Add start_year and end_year based on date range
start_date = datetime.strptime(date_start, "%Y-%m-%d")
end_date = datetime.strptime(date_end, "%Y-%m-%d")
params["start_year"] = [str(start_date.year)]
params["end_year"] = [str(end_date.year)]
# CORDEX collections use start_year/end_year only
# The API does not support month/day filtering - it returns all data for the year range
# Temporal filtering will be done after download if needed
else:
# ERA5 and other collections use bbox directly
# CDS API expects area as [North, West, South, East]
# Input bbox is [min_lon, min_lat, max_lon, max_lat] = [West, South, East, North]
# ERA5 uses -180 to 180° longitude convention (NOT 0-360°)
# Do NOT convert longitudes - use them as-is
params["area"] = [
bbox[3], # North (max_lat)
bbox[0], # West (min_lon) - keep in -180/180 system
bbox[1], # South (min_lat)
bbox[2], # East (max_lon) - keep in -180/180 system
]
# Set default parameters for ERA5 collections
# These can be overridden by query_params
params["product_type"] = "reanalysis"
params["data_format"] = "netcdf"
params["daily_statistic"] = "daily_mean"
params["frequency"] = "6_hourly"
params["time_zone"] = "utc+00:00"
# Add temporal parameters for ERA5 collections
# Note: Multi-year requests are split at a higher level (in get_data) to avoid
# invalid date combinations from Cartesian products
params["year"] = self._get_years_list(date_start, date_end)
params["month"] = self._get_months_list(date_start, date_end)
params["day"] = self._get_days_list(date_start, date_end)
# Add variables/bands
if bands:
params["variable"] = bands
elif "variable" in constraints:
# Use first available variable if none specified
params["variable"] = [constraints["variable"][0]]
# Merge query_params - these override any defaults set above
# This allows users to specify collection-specific parameters like:
# - daily_statistic: "daily_mean", "daily_maximum", "daily_minimum", "daily_standard_deviation"
# - frequency: "1hr", "3hr", "6hr", "day", "mon", "sem", "fx"
# - product_type: override default "reanalysis"
# - time_zone: override default "utc+00:00"
# Filter out internal parameters that should not be sent to CDS API
internal_params = {"max_workers"}
filtered_query_params = {
k: v for k, v in query_params.items() if k not in internal_params
}
params.update(filtered_query_params)
return params
def _get_years_list(self, date_start: str, date_end: str) -> list[str]:
"""Get list of years between start and end dates."""
start = datetime.strptime(date_start, "%Y-%m-%d")
end = datetime.strptime(date_end, "%Y-%m-%d")
return [str(year) for year in range(start.year, end.year + 1)]
def _get_months_list(self, date_start: str, date_end: str) -> list[str]:
"""Get list of months between start and end dates."""
start = datetime.strptime(date_start, "%Y-%m-%d")
end = datetime.strptime(date_end, "%Y-%m-%d")
months = set()
current = start
while current <= end:
months.add(f"{current.month:02d}")
# Move to the same day in the next month when possible, otherwise clamp to
# the month's last valid day (e.g. Jan 31 -> Feb 28/29).
if current.month == 12:
next_year = current.year + 1
next_month = 1
else:
next_year = current.year
next_month = current.month + 1
current = current.replace(
year=next_year,
month=next_month,
day=min(current.day, monthrange(next_year, next_month)[1]),
)
return sorted(list(months))
def _get_days_list(self, date_start: str, date_end: str) -> list[str]:
"""Get list of days between start and end dates."""
start = datetime.strptime(date_start, "%Y-%m-%d")
end = datetime.strptime(date_end, "%Y-%m-%d")
days = set()
current = start
while current <= end:
days.add(f"{current.day:02d}")
current += timedelta(days=1)
return sorted(list(days))
def _get_constraint_value(
self, constraints: dict, *keys: str, collection_name: str = ""
):
"""
Safely extract nested values from constraints with clear error messages.
Args:
constraints: The constraints dictionary
*keys: Sequence of keys to traverse (e.g., 'extent', 'temporal', 'interval')
collection_name: Optional collection name for better error messages
Returns:
The value at the specified path
Raises:
TerrakitValidationError: If any key in the path is missing
"""
if not constraints:
raise TerrakitValidationError(
message=f"No constraints metadata available{f' for {collection_name}' if collection_name else ''}"
)
value = constraints
path = []
for key in keys:
path.append(key)
if not isinstance(value, dict) or key not in value:
path_str = " -> ".join(path)
raise TerrakitValidationError(
message=f"Collection constraints missing required field: '{path_str}'"
f"{f' for {collection_name}' if collection_name else ''}"
)
value = value[key]
if value is None:
path_str = " -> ".join(path)
raise TerrakitValidationError(
message=f"Collection constraints field is null: '{path_str}'"
f"{f' for {collection_name}' if collection_name else ''}"
)
return value
def _validate_temporal(
self,
date_start: str,
date_end: str,
constraints: dict,
collection_name: str = "",
):
"""Validate dates against collection constraints."""
# Check dates are valid
check_start_end_date_in_correct_order(date_start, date_end)
check_date_format(date_start, start_or_end="start")
check_date_format(date_start, start_or_end="end")
# Get temporal interval using helper
intervals = self._get_constraint_value(
constraints,
"extent",
"temporal",
"interval",
collection_name=collection_name,
)
if not intervals or not intervals[0] or len(intervals[0]) < 2:
raise TerrakitValidationError(
message=f"Invalid temporal interval format in constraints"
f"{f' for {collection_name}' if collection_name else ''}"
)
try:
# Get allowed date range
allowed_start = datetime.fromisoformat(
intervals[0][0].replace("+00:00", "")
)
allowed_end = datetime.fromisoformat(intervals[0][1].replace("+00:00", ""))
print(allowed_start, allowed_end)
# Parse requested dates
req_start = datetime.strptime(date_start, "%Y-%m-%d")
req_end = datetime.strptime(date_end, "%Y-%m-%d")
# Validate start date
if req_start < allowed_start:
raise TerrakitValidationError(
message=f"Start date {date_start} is before allowed start date {allowed_start.date()}"
)
# Validate end date
if req_end > allowed_end:
raise TerrakitValidationError(
message=f"End date {date_end} is after allowed end date {allowed_end.date()}"
)
except ValueError as e:
raise TerrakitValidationError(message=f"Invalid date format: {e}")
def _validate_spatial(
self, bbox: list, constraints: dict, collection_name: str = ""
):
"""Validate bbox against collection constraints."""
basic_bbox_validation(bbox, self.connector_type)
# Check minimum bbox size for ERA5 collections (0.25° grid resolution)
if not self._is_cordex_collection(collection_name):
# ERA5 uses -180/180° system, so work directly with bbox values
min_lon = bbox[0] # West (min_lon in -180/180°)
min_lat = bbox[1] # South (min_lat)
max_lat = bbox[3] # North (max_lat)
max_lon = bbox[2] # East (max_lon in -180/180°)
lon_span = max_lon - min_lon
lat_span = max_lat - min_lat
# ERA5 has 0.25° resolution, require at least 0.25° in each dimension
MIN_RESOLUTION = 0.25
if lon_span < MIN_RESOLUTION or lat_span < MIN_RESOLUTION:
# Store original values for logging
orig_lon_span = lon_span
orig_lat_span = lat_span
# Calculate how much to expand in each dimension
lon_deficit = max(0, MIN_RESOLUTION - lon_span)
lat_deficit = max(0, MIN_RESOLUTION - lat_span)
# Expand equally on both sides to preserve center point
expand_lon = lon_deficit / 2
expand_lat = lat_deficit / 2
# Calculate new bounds in -180/180 system (original bbox system)
new_min_lon = bbox[0] - expand_lon
new_max_lon = bbox[2] + expand_lon
new_min_lat = bbox[1] - expand_lat
new_max_lat = bbox[3] + expand_lat
# Update bbox in place (keep in -180/180 system)
bbox[0] = new_min_lon # west
bbox[1] = new_min_lat # south
bbox[2] = new_max_lon # east
bbox[3] = new_max_lat # north
# Calculate final dimensions for logging
final_lon_span = new_max_lon - new_min_lon
final_lat_span = new_max_lat - new_min_lat
# Log warning to user
logger.warning(
f"Bounding box expanded to meet ERA5 minimum resolution requirement. "
f"Original size: {orig_lon_span:.4f}° × {orig_lat_span:.4f}°. "
f"Expanded to: {final_lon_span:.4f}° × {final_lat_span:.4f}°. "
f"New bbox: [{bbox[0]:.4f}, {bbox[1]:.4f}, {bbox[2]:.4f}, {bbox[3]:.4f}]"
)
# For CORDEX collections, map bbox to domain
if self._is_cordex_collection(collection_name):
try:
domain_code = self._get_cordex_domain_from_bbox(bbox)
logger.info(f"Mapped bbox to CORDEX domain: {domain_code}")
# Store domain for later use in find_data
self._selected_cordex_domain = domain_code
except TerrakitValidationError:
raise
else:
# Get spatial bbox using helper
bbox_list = self._get_constraint_value(
constraints,
"extent",
"spatial",
"bbox",
collection_name=collection_name,
)
if not bbox_list or not bbox_list[0] or len(bbox_list[0]) != 4:
raise TerrakitValidationError(
message=f"Invalid spatial bbox format in constraints"
f"{f' for {collection_name}' if collection_name else ''}"
)
allowed_bbox = bbox_list[0]
# ERA5 uses -180/180° system, but constraints file has 0-360° format
# Convert constraints bbox from 0-360° to -180/180° for validation
allowed_min_lon, allowed_min_lat, allowed_max_lon, allowed_max_lat = (
allowed_bbox
)
# Convert allowed longitude bounds from 0-360° to -180/180°
# 0° stays 0°, but 360° becomes 180° (not -180° to avoid wrap issues)
# For global coverage [0, 360] we want [-180, 180]
if allowed_min_lon == 0 and allowed_max_lon == 360:
# Global coverage case
allowed_min_lon = -180
allowed_max_lon = 180
else:
# Convert individual values
if allowed_min_lon > 180:
allowed_min_lon -= 360
if allowed_max_lon > 180:
allowed_max_lon -= 360
# User bbox is already in -180/180° system
min_lon = bbox[0]
max_lon = bbox[2]
min_lat = bbox[1]
max_lat = bbox[3]
# Validate each bound (using -180/180° for longitude)
errors = []
if min_lon < allowed_min_lon:
errors.append(f"min_lon {min_lon:.4f} < allowed {allowed_min_lon}")
if min_lat < allowed_min_lat:
errors.append(f"min_lat {min_lat} < allowed {allowed_min_lat}")
if max_lon > allowed_max_lon:
errors.append(f"max_lon {max_lon:.4f} > allowed {allowed_max_lon}")
if max_lat > allowed_max_lat:
errors.append(f"max_lat {max_lat} > allowed {allowed_max_lat}")
if errors:
raise TerrakitValidationError(
message=f"Bounding box out of range: {'; '.join(errors)}"
)
def _load_constraints(self, collection_name: str) -> dict:
"""Load constraints metadata from local file."""
constraints_file = self.metadata_dir / f"{collection_name}_constraints.json"
if not constraints_file.exists():
raise TerrakitValidationError(
message=f"No constraints file found for collection '{collection_name}'. "
f"Expected: {constraints_file}"
)
try:
with open(constraints_file, "r") as f:
constraints: Dict[str, Any] = json.load(f)
except json.JSONDecodeError as e:
raise TerrakitValidationError(
message=f"Invalid JSON in constraints file for '{collection_name}': {e}"
)
except Exception as e:
raise TerrakitValidationError(
message=f"Error loading constraints for '{collection_name}': {e}"
)
return constraints
def _load_cordex_constraints_variables(self, collection_name: str) -> list:
"""
Load CORDEX constraints_variables metadata from local file.
This file contains valid combinations of CORDEX parameters including:
domain, experiment, horizontal_resolution, temporal_resolution,
gcm_model, rcm_model, ensemble_member, variable, start_year, end_year.
Args:
collection_name: Name of the CORDEX collection
Returns:
List of valid parameter combinations
"""
constraints_file = (
self.metadata_dir / f"{collection_name}_constraints_variables.json"
)
if not constraints_file.exists():
raise TerrakitValidationError(
message=f"No constraints_variables file found for collection '{collection_name}'. "
f"Expected: {constraints_file}"
)
try:
with open(constraints_file, "r") as f:
constraints: list = json.load(f)
except json.JSONDecodeError as e:
raise TerrakitValidationError(
message=f"Invalid JSON in constraints_variables file for '{collection_name}': {e}"
)
except Exception as e:
raise TerrakitValidationError(
message=f"Error loading constraints_variables for '{collection_name}': {e}"
)
return constraints
def _is_fixed_block_constraint(self, combo: dict) -> bool:
"""
Determine if a constraint represents a fixed block vs flexible range.
A fixed block is when start_year and end_year arrays have matching lengths
and represent specific ranges that must be requested EXACTLY as defined (e.g.,
start_year: ["1950"], end_year: ["1955"] means you must request exactly 1950-1955,
not subsets like 1951-1953).
A flexible range is when the arrays represent a continuous range where any
subset is valid (e.g., start_year: ["1950", "1951", ...], end_year: ["2005"]).
Args:
combo: A constraint combination dictionary
Returns:
True if this is a fixed block, False if it's a flexible range
"""
start_years = combo.get("start_year", [])
end_years = combo.get("end_year", [])
# No year constraints means it's not a fixed block
if not start_years or not end_years:
return False
# If arrays have matching lengths > 1, it's likely fixed blocks
# Each pair represents a specific range that must be requested together
if len(start_years) == len(end_years) and len(start_years) > 1:
return True
# Single pair could be either, but we treat it as a fixed block
# to be more restrictive (safer approach)
if len(start_years) == 1 and len(end_years) == 1:
return True
return False
def _validate_cordex_constraints(
self,
collection_name: str,
domain: str,
experiment: str,
horizontal_resolution: str,
temporal_resolution: str,
gcm_model: str,
rcm_model: str,
ensemble_member: str,
variable: str,
start_year: Union[int, None],
end_year: Union[int, None],
date_start: Union[str, None] = None,
date_end: Union[str, None] = None,
) -> None:
"""
Validate CORDEX request parameters against constraints_variables file.
This performs preflight validation to check if the requested combination of
parameters is available in the CDS CORDEX dataset before attempting download.
The validation distinguishes between:
- Fixed blocks: Specific year ranges that must be requested exactly as defined
(e.g., 1950-1955, 1956-1960 as separate blocks)
- Flexible ranges: Continuous ranges where any subset is valid
(e.g., any years between 1950-2005)
For fixed blocks, the entire time period must be requested. Partial year requests
(e.g., requesting 3 days from a year-long block) are not allowed as the CDS API
will return the entire block regardless.
Args:
collection_name: Name of the CORDEX collection
domain: CORDEX domain (e.g., 'africa', 'europe')
experiment: Experiment type (e.g., 'historical', 'rcp_8_5')
horizontal_resolution: Grid resolution (e.g., '0_44_degree_x_0_44_degree')
temporal_resolution: Temporal resolution (e.g., 'daily_mean', 'fixed')
gcm_model: Global Climate Model (e.g., 'ichec_ec_earth')
rcm_model: Regional Climate Model (e.g., 'knmi_racmo22t')
ensemble_member: Ensemble member (e.g., 'r1i1p1')
variable: Variable name (e.g., '2m_air_temperature')
start_year: Start year for data request (None for 'fixed' temporal_resolution)
end_year: End year for data request (None for 'fixed' temporal_resolution)
date_start: Start date in 'YYYY-MM-DD' format (optional, for partial year validation)
date_end: End date in 'YYYY-MM-DD' format (optional, for partial year validation)
Raises:
TerrakitValidationError: If the combination is not available, with suggestions
for valid alternatives, or if a partial year is requested
for a fixed block
"""
# Load constraints_variables file
constraints_list = self._load_cordex_constraints_variables(collection_name)
# Find matching combinations
matching_combos = []
all_matching_blocks = [] # Collect all year blocks across all matching combos
for combo in constraints_list:
# Check if all parameters match
if (
domain in combo.get("domain", [])
and experiment in combo.get("experiment", [])
and horizontal_resolution in combo.get("horizontal_resolution", [])
and temporal_resolution in combo.get("temporal_resolution", [])
and gcm_model in combo.get("gcm_model", [])
and rcm_model in combo.get("rcm_model", [])
and ensemble_member in combo.get("ensemble_member", [])
and variable in combo.get("variable", [])
):
# For temporal_resolution != 'fixed', also check year range
if (
temporal_resolution != "fixed"
and start_year is not None
and end_year is not None
):
combo_start_years = combo.get("start_year", [])
combo_end_years = combo.get("end_year", [])
# Check if requested years are within available range
if combo_start_years and combo_end_years:
# Collect all year blocks from this combo
for sy, ey in zip(combo_start_years, combo_end_years):
block_start = int(sy)
block_end = int(ey)
all_matching_blocks.append((block_start, block_end))
matching_combos.append(combo)
else:
# For 'fixed' temporal_resolution, year range doesn't apply
matching_combos.append(combo)
# Now validate if the collected blocks can cover the requested range
if matching_combos and all_matching_blocks:
# Remove duplicates and sort
unique_blocks = sorted(set(all_matching_blocks))
# Check if any matching combo is a fixed block constraint
is_fixed_block = any(
self._is_fixed_block_constraint(combo) for combo in matching_combos
)
if is_fixed_block:
# For fixed blocks, the requested range must EXACTLY match one of the blocks
exact_match = (start_year, end_year) in unique_blocks
if not exact_match:
# No exact match - clear matching_combos to trigger error
matching_combos = []
else:
# For flexible ranges, check if blocks can cover the requested range
# Find blocks that overlap with requested range
if start_year is not None and end_year is not None:
overlapping_blocks = [
(bs, be)
for bs, be in unique_blocks
if bs <= end_year and be >= start_year
]
if overlapping_blocks:
# Check if overlapping blocks cover the entire requested range
min_block_start = min(bs for bs, be in overlapping_blocks)
max_block_end = max(be for bs, be in overlapping_blocks)
# Requested range must be within the coverage of available blocks
if min_block_start <= start_year and max_block_end >= end_year:
# Valid - blocks can cover the requested range
pass
else:
# Blocks don't cover the full range - clear matching_combos to trigger error
matching_combos = []
else:
# No overlapping blocks - clear matching_combos to trigger error
matching_combos = []
if matching_combos:
# Valid combination found at year level
# For fixed blocks, check if user is requesting a partial year
if (
date_start is not None
and date_end is not None
and start_year is not None
and end_year is not None
):
# Check if any matching combo is a fixed block
for combo in matching_combos:
if self._is_fixed_block_constraint(combo):
# This is a fixed block - check if requesting partial year
start_dt = datetime.strptime(date_start, "%Y-%m-%d")
end_dt = datetime.strptime(date_end, "%Y-%m-%d")
# Check if the request spans the entire year(s) in the block
# For single year blocks (start_year == end_year)
if start_year == end_year:
# Must request from Jan 1 to Dec 31 of that year
year_start = datetime(start_year, 1, 1)
year_end = datetime(start_year, 12, 31)
if start_dt != year_start or end_dt != year_end:
# Partial year request detected
combo_start_years = combo.get("start_year", [])
combo_end_years = combo.get("end_year", [])
# Find the matching block
block_ranges = []
for sy, ey in zip(combo_start_years, combo_end_years):
if int(sy) == start_year and int(ey) == end_year:
block_ranges.append(f"{sy}-01-01 to {ey}-12-31")
error_msg = [
"CORDEX fixed block constraint detected for this combination.",
f"You requested: {date_start} to {date_end}",
"",
"For CORDEX data with fixed time blocks, you must request the ENTIRE time period.",
"The CDS API will return the full block regardless of the date range specified.",
"",
f"Required date range for this block: {year_start.strftime('%Y-%m-%d')} to {year_end.strftime('%Y-%m-%d')}",
"",
"Please update your request to:",
f" date_start = '{year_start.strftime('%Y-%m-%d')}'",
f" date_end = '{year_end.strftime('%Y-%m-%d')}'",
]
raise TerrakitValidationError(
message="\n".join(error_msg)
)
else:
# Multi-year block - must request from Jan 1 of start_year to Dec 31 of end_year
block_start_dt = datetime(start_year, 1, 1)
block_end_dt = datetime(end_year, 12, 31)
if start_dt != block_start_dt or end_dt != block_end_dt:
error_msg = [
"CORDEX fixed block constraint detected for this combination.",
f"You requested: {date_start} to {date_end}",
"",
"For CORDEX data with fixed time blocks, you must request the ENTIRE time period.",
"The CDS API will return the full block regardless of the date range specified.",
"",
f"Required date range for this block: {block_start_dt.strftime('%Y-%m-%d')} to {block_end_dt.strftime('%Y-%m-%d')}",
"",
"Please update your request to:",
f" date_start = '{block_start_dt.strftime('%Y-%m-%d')}'",
f" date_end = '{block_end_dt.strftime('%Y-%m-%d')}'",
]
raise TerrakitValidationError(
message="\n".join(error_msg)
)
# All validations passed
return
# No exact match - build helpful error message with alternatives
error_parts = [
"CORDEX data not available for the requested combination:",
f" Domain: {domain}",
f" Experiment: {experiment}",
f" Horizontal Resolution: {horizontal_resolution}",
f" Temporal Resolution: {temporal_resolution}",
f" GCM Model: {gcm_model}",
f" RCM Model: {rcm_model}",
f" Ensemble Member: {ensemble_member}",
f" Variable: {variable}",
]
if (
temporal_resolution != "fixed"
and start_year is not None
and end_year is not None
):
error_parts.append(f" Year Range: {start_year}-{end_year}")
# Find partial matches to suggest alternatives
# Try relaxing constraints one at a time to find what's available
# Find combinations matching domain, experiment, resolution, temporal_resolution
base_matches = [
combo
for combo in constraints_list
if (
domain in combo.get("domain", [])
and experiment in combo.get("experiment", [])
and horizontal_resolution in combo.get("horizontal_resolution", [])
and temporal_resolution in combo.get("temporal_resolution", [])
)
]
if base_matches:
# Extract valid options for the failing parameters
valid_gcm_models = sorted(
set(gcm for combo in base_matches for gcm in combo.get("gcm_model", []))
)
valid_rcm_models = sorted(
set(rcm for combo in base_matches for rcm in combo.get("rcm_model", []))
)
valid_ensemble_members = sorted(
set(
ens
for combo in base_matches
for ens in combo.get("ensemble_member", [])
)
)
valid_variables = sorted(
set(var for combo in base_matches for var in combo.get("variable", []))
)
error_parts.append(
"\nValid alternatives for this domain/experiment/resolution:"
)
if gcm_model not in valid_gcm_models:
error_parts.append(
f" Valid GCM Models: {', '.join(valid_gcm_models[:10])}"
)
if len(valid_gcm_models) > 10:
error_parts.append(f" ... and {len(valid_gcm_models) - 10} more")
if rcm_model not in valid_rcm_models:
error_parts.append(
f" Valid RCM Models: {', '.join(valid_rcm_models[:10])}"
)
if len(valid_rcm_models) > 10:
error_parts.append(f" ... and {len(valid_rcm_models) - 10} more")
if ensemble_member not in valid_ensemble_members:
error_parts.append(
f" Valid Ensemble Members: {', '.join(valid_ensemble_members[:10])}"
)
if len(valid_ensemble_members) > 10:
error_parts.append(
f" ... and {len(valid_ensemble_members) - 10} more"
)
if variable not in valid_variables:
error_parts.append(
f" Valid Variables: {', '.join(valid_variables[:10])}"
)
if len(valid_variables) > 10:
error_parts.append(f" ... and {len(valid_variables) - 10} more")
# Check year range if applicable
if (
temporal_resolution != "fixed"
and start_year is not None
and end_year is not None
):
# Find combinations matching all parameters except year range
year_matches = [
combo
for combo in base_matches
if (
gcm_model in combo.get("gcm_model", [])
and rcm_model in combo.get("rcm_model", [])
and ensemble_member in combo.get("ensemble_member", [])
and variable in combo.get("variable", [])
)
]
if year_matches:
available_years = set()
has_fixed_blocks = False
for combo in year_matches:
start_years = combo.get("start_year", [])
end_years = combo.get("end_year", [])
if start_years and end_years:
# Check if this combo represents fixed blocks
if self._is_fixed_block_constraint(combo):
has_fixed_blocks = True
for sy, ey in zip(start_years, end_years):
available_years.add((int(sy), int(ey)))
if available_years:
year_ranges = sorted(available_years)
if has_fixed_blocks:
error_parts.append(
f" Available Year Blocks (must request exact ranges): {', '.join(f'{sy}-{ey}' for sy, ey in year_ranges[:5])}"
)
else:
error_parts.append(
f" Available Year Ranges: {', '.join(f'{sy}-{ey}' for sy, ey in year_ranges[:5])}"
)
if len(year_ranges) > 5:
error_parts.append(
f" ... and {len(year_ranges) - 5} more ranges"
)
else:
error_parts.append(
"\nNo data available for this domain/experiment/resolution combination."
)
error_parts.append(
"Try different values for domain, experiment, or resolution."
)
raise TerrakitValidationError(message="\n".join(error_parts))
def _connect_to_cds(self) -> cdsapi.Client:
"""
Connect to climate data store.
"""
try:
client = cdsapi.Client(url=self.CDSAPI_URL, key=os.getenv("CDSAPI_KEY"))
except Exception as err:
error_msg = f"Unable to connect to Climate Data Store. {err}"
logger.error(error_msg)
raise TerrakitValidationError(error_msg)
return client
def list_cordex_domains(self) -> Dict[str, Any]:
"""
List all available CORDEX domains with their information.
Returns:
dict: Dictionary of domain codes and their information
"""
cordex_domains: Dict[str, Any] = self.cordex_domains
return cordex_domains
def get_cordex_domain_info(self, domain_code: str) -> dict:
"""
Get information for a specific CORDEX domain.
Args:
domain_code: CORDEX domain code (e.g., 'EUR-11')
Returns:
dict: Domain information including name, bbox, and resolution
Raises:
TerrakitValueError: If domain code not found
"""
return get_domain_info(domain_code)
def list_collections(self) -> list[Any]:
"""
Lists the available collections.
Returns:
list: A list of collection names.
"""
logger.info("Listing available collections")
return self.collections
def list_bands(self, data_collection_name: str) -> list[dict[str, Any]]:
"""
List available bands for a given collection.
Parameters:
data_collection_name (str): The name of the collection to get bands for.
Returns:
list[dict[str, Any]]: A list of band dictionaries containing band information.
Each dictionary contains keys like 'band_name', 'resolution', 'description', etc.
Raises:
TerrakitValidationError: If the collection is not found or has no band information.
Example:
```python
from terrakit import DataConnector
dc = DataConnector(connector_type="climate_data_store")
dc = DataConnector(connector_type='climate_data_store')
bands = dc.connector.list_bands(data_collection_name='derived-era5-single-levels-daily-statistics')
print(f'\nFound {len(bands)} bands for derived-era5-single-levels-daily-statistics')
print('\nFirst 3 bands:')
for band in bands[:3]:
print(f" - {band['band_name']}: {band.get('description', 'N/A')}")
```
"""
# Check if collection exists
check_collection_exists(data_collection_name, self.collections)
# Find the collection details
collection_details = None
for collection in self.collections_details:
if collection["collection_name"] == data_collection_name:
collection_details = collection
break
if collection_details is None or "bands" not in collection_details:
raise TerrakitValidationError(
message=f"No band information found for collection '{data_collection_name}'"
)
bands_list: list[dict[str, Any]] = collection_details["bands"]
logger.info(
f"Found {len(bands_list)} bands for collection '{data_collection_name}'"
)
return bands_list
def find_data(
self,
data_collection_name: str,
date_start: str,
date_end: str,
area_polygon=None,
bbox=None,
bands=[],
maxcc=100,
data_connector_spec=None,
) -> Union[tuple[list[Any], list[dict[str, Any]]], tuple[None, None]]:
"""
This function retrieves unique dates and corresponding data results from a specified Climate Data Store data collection.
Args:
data_collection_name (str): The name of the Climate Data Store data collection to search.
date_start (str): The start date for the time interval in 'YYYY-MM-DD' format.
date_end (str): The end date for the time interval in 'YYYY-MM-DD' format.
area_polygon (Polygon, optional): A polygon defining the area of interest.
bbox (tuple, optional): A bounding box defining the area of interest in the format (minx, miny, maxx, maxy).
bands (list, optional): A list of bands to retrieve. Defaults to [].
maxcc (int, optional): The maximum cloud cover percentage for the data. Default is 100 (no cloud cover filter).
data_connector_spec (list, optional): A dictionary containing the data connector specification.
Returns:
tuple: A tuple containing a sorted list of unique dates and a list of data results.
"""
if "CDSAPI_KEY" not in os.environ:
raise TerrakitValidationError(
message="Error: Missing credentials 'CDSAPI_KEY'. Please update .env with correct credentials."
)
# Check data_collection_name exists in self.collections.
check_collection_exists(data_collection_name, self.collections)
# Load constraints
constraints = self._load_constraints(data_collection_name)
# Validate contsraint parameters using collection name for better errors
self._validate_temporal(date_start, date_end, constraints, data_collection_name)
self._validate_spatial(bbox, constraints, data_collection_name)
# Generate dates
start = datetime.strptime(date_start, "%Y-%m-%d")
end = datetime.strptime(date_end, "%Y-%m-%d")
unique_dates = []
value = start
while value <= end:
unique_dates.append(value.strftime("%Y-%m-%d"))
value += timedelta(days=1)
results = [
{
"collection": data_collection_name,
"date_range": f"{date_start} to {date_end}",
"total_dates": len(unique_dates),
"temporal_extent": constraints.get("extent", {}).get("temporal"),
"spatial_extent": constraints.get("extent", {}).get("spatial"),
}
]
# TODO: filter by cloud cover
return unique_dates, results
def get_data( # type: ignore[override]
self,
data_collection_name,
date_start,
date_end,
area_polygon=None,
bbox=None,
bands=[],
maxcc=100,
data_connector_spec=None,
save_file=None,
working_dir=".",
query_params={},
) -> Union[xr.Dataset, None]:
"""
Fetches data from Climate Data Store for the specified collection, date range, area, and bands.
Args:
data_collection_name (str): Name of the data collection to fetch data from.
date_start (str): Start date for the data retrieval (inclusive), in 'YYYY-MM-DD' format.
date_end (str): End date for the data retrieval (inclusive), in 'YYYY-MM-DD' format.
area_polygon (list, optional): Polygon defining the area of interest. Defaults to None.
bbox (list, optional): Bounding box defining the area of interest. Defaults to None.
bands (list, optional): List of bands to retrieve. Defaults to all bands.
query_params (dict, optional): Additional query parameters. Defaults to {}.
Supported parameters:
- max_workers (int): Number of parallel workers for downloading monthly chunks.
Default is 4. Set to 1 for sequential downloads. Higher values (e.g., 8-10)
can speed up multi-year requests but may hit API rate limits.
- Other collection-specific parameters (e.g., daily_statistic, frequency, time_zone)
data_connector_spec (dict, optional): Data connector specification. Defaults to None.
save_file (str, optional): Path to save the output file as a single time-series NetCDF.
Climate data is saved as a continuous time series (not split by date) to facilitate
temporal analysis. The file will contain all requested variables across all time steps.
Example: 'output.nc' will save all data in one file. If None, no files are saved to disk.
Defaults to None.
working_dir (str, optional): Working directory for temporary files. Defaults to '.'.
Returns:
xarray.Dataset: An xarray Dataset containing all fetched data with variables as data variables.
Each variable has dimensions (time, latitude, longitude) and includes a 'stepType'
attribute indicating the parameter class ('instant', 'accum', 'avg', 'max', 'min').
To convert to the old DataArray format:
data_array = dataset.to_array(dim='band')
Note:
This method now returns xarray.Dataset instead of xarray.DataArray to preserve
parameter class (stepType) information. To convert to the old format:
data_array = dataset.to_array(dim='band')
This allows accessing data as before:
temp = data_array.sel(band='2m_temperature')
Multi-year requests are automatically split into monthly chunks and downloaded in
parallel (default 4 workers) to handle CDS API size limits and improve performance.
Example:
```python
import terrakit
data_connector = "climate_data_store"
dc = terrakit.DataConnector(connector_type=data_connector)
# Basic usage with default parallel download (4 workers)
data = dc.connector.get_data(
data_collection_name="derived-era5-single-levels-daily-statistics",
date_start="2025-01-01",
date_end="2025-01-02",
bbox=[-1.32, 51.06, -1.30, 51.08],
bands=["2m_temperature", "total_precipitation"],
query_params={
"daily_statistic": "daily_minimum",
"frequency": "1hr",
"time_zone": "utc+03:00"
}
)
# Multi-year download with custom parallelization
data = dc.connector.get_data(
data_collection_name="derived-era5-single-levels-daily-statistics",
date_start="2020-01-01",
date_end="2023-12-31",
bbox=[-1.32, 51.06, -1.30, 51.08],
bands=["2m_temperature", "total_precipitation"],
query_params={
"max_workers": 8 # Use 8 parallel workers for faster download
},
save_file="./era5_multi_year.nc"
)
# Access variables
temperature = data['2m_temperature']
print(temperature.attrs['stepType']) # 'instant'
# Filter by stepType
instant_vars = [v for v in data.data_vars if data[v].attrs.get('stepType') == 'instant']
```
"""
# Load constraints and validate parameters
constraints = self._load_constraints(data_collection_name)
self._validate_temporal(date_start, date_end, constraints, data_collection_name)
self._validate_spatial(bbox, constraints, data_collection_name)
if bbox is None:
raise TerrakitValidationError(message="bbox is required for CDS downloads")
if bands is None:
bands = []
# For CORDEX collections, validate the joint combination of parameters
if self._is_cordex_collection(data_collection_name):
# Get domain from bbox
domain_code = self._get_cordex_domain_from_bbox(bbox)
api_domain = self._cordex_code_to_api_domain(domain_code)
# Extract parameters from query_params or use defaults
experiment = query_params.get("experiment", "historical")
horizontal_resolution = query_params.get(
"horizontal_resolution", "0_44_degree_x_0_44_degree"
)
temporal_resolution = query_params.get("temporal_resolution", "daily_mean")
gcm_model = query_params.get("gcm_model", "ichec_ec_earth")
rcm_model = query_params.get("rcm_model", "knmi_racmo22t")
ensemble_member = query_params.get("ensemble_member", "r1i1p1")
# Get year range from dates
start_date = datetime.strptime(date_start, "%Y-%m-%d")
end_date = datetime.strptime(date_end, "%Y-%m-%d")
start_year = start_date.year if temporal_resolution != "fixed" else None
end_year = end_date.year if temporal_resolution != "fixed" else None
# Validate each requested variable
variables_to_validate = (
bands if bands else ["2m_air_temperature"]
) # Default variable
for variable in variables_to_validate:
self._validate_cordex_constraints(
collection_name=data_collection_name,
domain=api_domain,
experiment=experiment,
horizontal_resolution=horizontal_resolution,
temporal_resolution=temporal_resolution,
gcm_model=gcm_model,
rcm_model=rcm_model,
ensemble_member=ensemble_member,
variable=variable,
start_year=start_year,
end_year=end_year,
date_start=date_start,
date_end=date_end,
)
extract_dir = Path(working_dir) / "temp_netcdf"
extract_dir.mkdir(parents=True, exist_ok=True)
if self._is_cordex_collection(data_collection_name):
# Get year blocks for CORDEX based on constraints
start_date = datetime.strptime(date_start, "%Y-%m-%d")
end_date = datetime.strptime(date_end, "%Y-%m-%d")
start_year = start_date.year
end_year = end_date.year
# Get domain from bbox
domain_code = self._get_cordex_domain_from_bbox(bbox)
api_domain = self._cordex_code_to_api_domain(domain_code)
# Extract parameters from query_params
experiment = query_params.get("experiment", "historical")
horizontal_resolution = query_params.get(
"horizontal_resolution", "0_44_degree_x_0_44_degree"
)
temporal_resolution = query_params.get("temporal_resolution", "daily_mean")
gcm_model = query_params.get("gcm_model", "ichec_ec_earth")
rcm_model = query_params.get("rcm_model", "knmi_racmo22t")
ensemble_member = query_params.get("ensemble_member", "r1i1p1")
# Get the first variable to determine year blocks (all variables should have same blocks)
first_variable = bands[0] if bands else "2m_air_temperature"
# Get year blocks based on constraints
year_blocks = self._get_cordex_year_blocks(
collection_name=data_collection_name,
domain=api_domain,
experiment=experiment,
horizontal_resolution=horizontal_resolution,
temporal_resolution=temporal_resolution,
gcm_model=gcm_model,
rcm_model=rcm_model,
ensemble_member=ensemble_member,
variable=first_variable,
start_year=start_year,
end_year=end_year,
)
# Check if we need to split into multiple blocks
if len(year_blocks) > 1:
# Multiple year blocks - download in parallel
max_workers = query_params.get("max_workers", 4)
logger.info(
f"Splitting CORDEX request into {len(year_blocks)} year block(s) for parallel download"
)
# Prepare block info tuples with index
block_info_list = [
(idx, block_start, block_end)
for idx, (block_start, block_end) in enumerate(year_blocks, 1)
]
# Download in parallel using ThreadPoolExecutor
if max_workers == 1:
# Sequential download for max_workers=1
logger.info("Using sequential download (max_workers=1)")
for block_info in block_info_list:
self._download_and_extract_cordex_block(
block_info=block_info,
total_blocks=len(year_blocks),
data_collection_name=data_collection_name,
bbox=bbox,
bands=bands,
query_params=query_params,
working_dir=working_dir,
extract_dir=extract_dir,
)
else:
# Parallel download
logger.info(f"Using parallel download with {max_workers} workers")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all download tasks
future_to_block = {
executor.submit(
self._download_and_extract_cordex_block,
block_info,
len(year_blocks),
data_collection_name,
bbox,
bands,
query_params,
working_dir,
extract_dir,
): block_info
for block_info in block_info_list
}
# Wait for all downloads to complete and handle any errors
for future in as_completed(future_to_block):
block_info = future_to_block[future]
try:
idx, block_start, block_end = future.result()
logger.info(
f"Completed block {idx}/{len(year_blocks)}: {block_start}-{block_end}"
)
except Exception as exc:
logger.error(
f"Block {block_info[0]} generated an exception: {exc}"
)
raise
month_ranges = [(date_start, date_end)]
else:
# Single year block - download as before
logger.info(
"CORDEX request covers a single year block; downloading as one request"
)
zip_path = self._download_from_cds(
data_collection_name,
date_start,
date_end,
bbox,
bands,
query_params,
working_dir,
)
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(extract_dir)
Path(zip_path).unlink()
month_ranges = [(date_start, date_end)]
else:
# 1. Split requests into monthly chunks to handle CDS API size limits
# The CDS API has two constraints:
# a) Separate year/month/day parameters create a Cartesian product, causing invalid
# date combinations across year boundaries (e.g., 2025-12-31 when requesting 2024-12-31 to 2025-01-02)
# b) Large requests (e.g., full year) exceed cost limits with error "Your request is too large"
# Solution: Split by month to avoid both issues
start_dt = datetime.strptime(date_start, "%Y-%m-%d")
end_dt = datetime.strptime(date_end, "%Y-%m-%d")
# Generate list of (year, month) tuples for each month in the range
month_ranges = []
current = start_dt
while current <= end_dt:
# Determine start and end dates for this month
month_start = current if current == start_dt else current.replace(day=1)
# Calculate last day of current month
if current.month == 12:
next_month = current.replace(year=current.year + 1, month=1, day=1)
else:
next_month = current.replace(month=current.month + 1, day=1)
last_day_of_month = (next_month - timedelta(days=1)).day
# Month end is either the last day of month or the overall end date
month_end_day = min(
last_day_of_month,
end_dt.day
if current.year == end_dt.year and current.month == end_dt.month
else last_day_of_month,
)
month_end = current.replace(day=month_end_day)
if month_end > end_dt:
month_end = end_dt
month_ranges.append(
(month_start.strftime("%Y-%m-%d"), month_end.strftime("%Y-%m-%d"))
)
# Move to next month
current = next_month
# Get max_workers from query_params, default to 4 for parallel downloads
max_workers = query_params.get("max_workers", 4)
logger.info(
f"Splitting request into {len(month_ranges)} monthly chunk(s) to handle CDS API limits"
)
# 2. Download monthly chunks in parallel
# Prepare month info tuples with index
month_info_list: list[tuple[int, str, str]] = [
(idx, month_start, month_end)
for idx, (month_start, month_end) in enumerate(month_ranges, 1)
]
# Download in parallel using ThreadPoolExecutor
if max_workers == 1:
# Sequential download for max_workers=1
logger.info("Using sequential download (max_workers=1)")
for month_info in month_info_list:
self._download_and_extract_month(
month_info=month_info,
total_months=len(month_ranges),
data_collection_name=data_collection_name,
bbox=bbox,
bands=bands,
query_params=query_params,
working_dir=working_dir,
extract_dir=extract_dir,
)
else:
# Parallel download
logger.info(f"Using parallel download with {max_workers} workers")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all download tasks
future_to_month: dict[
Future[tuple[int, str, str]], tuple[int, str, str]
] = {
executor.submit(
self._download_and_extract_month,
month_info,
len(month_ranges),
data_collection_name,
bbox,
bands,
query_params,
working_dir,
extract_dir,
): month_info
for month_info in month_info_list
}
# Wait for all downloads to complete and handle any errors
for future in as_completed(future_to_month): # type: ignore[assignment]
month_info = future_to_month[future] # type: ignore[index]
try:
idx, month_start_str, month_end_str = future.result()
logger.info(
f"Completed chunk {idx}/{len(month_ranges)}: {month_start_str} to {month_end_str}"
)
except Exception as exc:
logger.error(
f"Chunk {month_info[0]} generated an exception: {exc}"
)
raise
# 3. Find all NetCDF file(s) from all months
netcdf_files = list(extract_dir.glob("*.nc"))
if not netcdf_files:
raise TerrakitValueError(f"No NetCDF files found in {extract_dir}")
logger.info(
f"Found {len(netcdf_files)} NetCDF files across {len(month_ranges)} month(s)"
)
# 4. Load NetCDF and process into Dataset with stepType preservation
# CDS may return multiple NetCDF files (one per stepType)
# Extract stepType from filename: data_stream-oper_stepType-{type}.nc
# Collect data organized by stepType and date
date_data_dict: Dict[
str, Dict[str, tuple[xr.DataArray, str]]
] = {} # {date_str: {var_name: (DataArray, stepType)}}
for netcdf_file in netcdf_files:
ds = xr.open_dataset(netcdf_file)
# Determine dimension names. CORDEX files may use rotated-grid dimensions
# (rlon/rlat) while exposing lon/lat as 2D coordinates.
time_name = "time" if "time" in ds.dims else "valid_time"
candidate_lon_dims = ["longitude", "lon", "rlon", "x"]
candidate_lat_dims = ["latitude", "lat", "rlat", "y"]
lon_name = next(
(name for name in candidate_lon_dims if name in ds.dims), None
)
lat_name = next(
(name for name in candidate_lat_dims if name in ds.dims), None
)
# Determine if this is a single-variable file or multi-variable file
# Single-variable files don't have stepType in filename
is_single_variable_file = not any(
step in netcdf_file.name
for step in ["accum", "avg", "instant", "max", "min"]
)
# Get the main data variable(s) - these are our bands.
# Exclude helper/bounds/grid-mapping variables that are not spatial-temporal
# data bands, e.g. rotated_pole (scalar) or time_bnds (time,bnds).
data_vars = [
v
for v in ds.data_vars
if time_name in ds[v].dims
and any(dim in ds[v].dims for dim in candidate_lon_dims)
and any(dim in ds[v].dims for dim in candidate_lat_dims)
]
# Log variables found in this file
# all_variables_found.update(data_vars)
logger.debug(f"File {netcdf_file.name} contains variables: {data_vars}")
# Process each time step
for time_idx in range(len(ds[time_name])):
# Extract the date for this time step
time_value = ds[time_name].isel({time_name: time_idx}).values
# date_str = pd.Timestamp(time_value).strftime("%Y-%m-%d %H:%M")
date_str = pd.Timestamp(time_value).strftime("%Y-%m-%d")
# Initialize dict for this date if not exists
if date_str not in date_data_dict:
date_data_dict[date_str] = {}
# Confirm variable name by extracting from filename if needed
# If the NetCDF file doesn't contain stepType in its name and has only one variable,
# extract the variable name from the filename pattern
# Extract variable name from filename if this is a single-variable file
extracted_var_name = None
if is_single_variable_file:
# Extract variable name from filename pattern: variable_name_YYYYMMDD_HHMMSS.nc
match = re.match(r"^([a-zA-Z0-9_]+?)_\d", netcdf_file.name)
if match:
extracted_var_name = match.group(1)
logger.debug(
f"Extracted variable name '{extracted_var_name}' from filename {netcdf_file.name}"
)
# Store each variable for this date with its stepType
for var_name in data_vars:
# Ensure var_name is a string
var_name_str = str(var_name)
# Determine which variable name to use for stepType inference and data access
# For stepType inference: use extracted name if available, otherwise use original
steptype_var_name: str = (
extracted_var_name if extracted_var_name else var_name_str
)
# For data access: always use the original variable name from the NetCDF
data_access_var_name = var_name
# Try to get stepType from GRIB_stepType attribute first
if "GRIB_stepType" in ds[var_name].attrs:
step_type = ds[var_name].attrs["GRIB_stepType"]
logger.debug(
f"Extracted stepType '{step_type}' from GRIB_stepType attribute for variable '{var_name}'"
)
else:
# Fall back to inference method
step_type = self._infer_steptype(
netcdf_file.name, steptype_var_name, data_collection_name
)
# Extract data for this specific time step using the original NetCDF variable name
da_var = ds[data_access_var_name].isel({time_name: time_idx})
# Add CRS and spatial dimensions
da_var = da_var.rio.write_crs("EPSG:4326")
da_var = da_var.rio.set_spatial_dims(x_dim=lon_name, y_dim=lat_name)
# Store in dict with stepType using the appropriate variable name
# Use extracted name if available for consistency in output, otherwise use original
output_var_name: str = (
extracted_var_name if extracted_var_name else var_name_str
)
date_data_dict[date_str][output_var_name] = (da_var, step_type)
ds.close()
# Now process each unique date and build a Dataset with stepType attributes
# Build data for each band across all time steps, tracking dates for each band
band_data: Dict[
str, Dict[str, Any]
] = {} # {band_name: {'data': list, 'dates': list, 'stepType': str}}
for date_str in sorted(date_data_dict.keys()):
data_date_datetime = datetime.strptime(date_str, "%Y-%m-%d")
var_dict = date_data_dict[date_str]
for var_name in sorted(var_dict.keys()):
da_var, step_type = var_dict[var_name]
# Drop time coordinate if it exists
if "time" in da_var.coords:
da_var = da_var.drop_vars("time")
# Use the NetCDF variable name directly as the band name
# This ensures we preserve the original variable names from CDS
band_name = var_name
# Initialize band_data entry if needed
if band_name not in band_data:
band_data[band_name] = {
"data": [],
"dates": [],
"stepType": step_type,
}
# Store the data array and its corresponding date
band_data[band_name]["data"].append(da_var)
band_data[band_name]["dates"].append(data_date_datetime)
# 5. Create Dataset with stepType attributes
# Each variable gets its own time coordinate based on which dates it has data for
merged_dataset = xr.Dataset()
for band_name, band_info in band_data.items():
# Concatenate all time steps for this band
data_arrays = band_info["data"]
dates = band_info["dates"]
# Check for duplicate dates
if len(dates) != len(set(dates)):
logger.warning(f"Variable {band_name} has duplicate dates: {dates}")
# Remove duplicates by keeping only unique dates
seen_dates = {}
unique_data = []
unique_dates = []
for da, date in zip(data_arrays, dates):
if date not in seen_dates:
seen_dates[date] = True
unique_data.append(da)
unique_dates.append(date)
data_arrays = unique_data
dates = unique_dates
logger.info(
f"After deduplication: {len(dates)} unique dates for {band_name}"
)
# Stack along a new dimension first
# Use coords='minimal' to avoid issues with inconsistent coordinates like 'number'
band_da = xr.concat(
data_arrays, dim="time", coords="minimal", compat="override"
)
# Assign the time coordinate specific to this variable
band_da = band_da.assign_coords({"time": dates})
# Add stepType to variable attributes
band_da.attrs["stepType"] = band_info["stepType"]
# Add to merged dataset
merged_dataset[band_name] = band_da
# Add dataset-level attributes
merged_dataset.attrs["source"] = "Climate Data Store (CDS)"
merged_dataset.attrs["dataset"] = data_collection_name
# Write CRS (EPSG:4326 for CDS data)
merged_dataset.rio.write_crs("EPSG:4326", inplace=True)
# Derive time values robustly from dataset coordinates, even if the dataset-level
# `time` attribute accessor is unavailable for some merged outputs.
dataset_time_values = None
if "time" in merged_dataset.coords:
dataset_time_values = merged_dataset.coords["time"].values
else:
for data_var_name in merged_dataset.data_vars:
var_da = merged_dataset[data_var_name]
if "time" in var_da.coords:
dataset_time_values = var_da.coords["time"].values
break
unique_dates = (
sorted(set(dataset_time_values)) if dataset_time_values is not None else []
)
# 6. Save as single time-series NetCDF file
# Climate data is best analyzed as continuous time series, not individual days
if save_file is not None:
# Ensure the directory exists
save_dir = Path(save_file).parent
save_dir.mkdir(parents=True, exist_ok=True)
# Ensure .nc extension
if not save_file.endswith(".nc"):
save_file = f"{save_file}.nc"
# Save entire time series as single NetCDF file
merged_dataset.to_netcdf(save_file)
# Log summary information
start_date_str = (
pd.Timestamp(unique_dates[0]).strftime("%Y-%m-%d")
if unique_dates
else "N/A"
)
end_date_str = (
pd.Timestamp(unique_dates[-1]).strftime("%Y-%m-%d")
if unique_dates
else "N/A"
)
logger.info(
f"Saved time-series NetCDF: {save_file} "
f"({len(unique_dates)} time steps from {start_date_str} to {end_date_str}, "
f"{len(merged_dataset.data_vars)} variables)"
)
# 7. Cleanup temporary files
shutil.rmtree(extract_dir)
# Note: Individual zip files are already cleaned up in the download loop
logger.info(
f"Processed {len(unique_dates)} time steps and {len(merged_dataset.data_vars)} variables into Dataset"
)
return merged_dataset