GAIA Pipeline Documentation#
Overview#
The Global Aggregation of Indicators for Anticipatory Action (GAIA) Pipeline produces a series of thematic files for a collection from 146 countries at administrative level 2. Each file captures different aspects of population, infrastructure, and environmental conditions. Below is an overview of the available files:
Access to Services – Population accessibility to key facilities such as education and health centers.
Facilities – Availability and distribution of essential service infrastructure.
Coping Capacity – Combined indicators derived from Access and Facilities layers.
Demographics – Distribution of vulnerable population.
Rural Population – Demographic indicators focused specifically on rural populations.
Vulnerability – Composite indicators derived from Demographics and Rural Population layers.
Crop Coverage and Change – Extent of agricultural land and observed changes over time.
Vegetation Index – NDVI-based assessment of vegetation conditions.
Flood Exposure – Exposure of populations and facilities to flood hazards.
Available Countries:#
Africa
ID |
Country Code |
Country |
---|---|---|
1 |
AGO |
Angola |
2 |
BDI |
Burundi |
3 |
BEN |
Benin |
4 |
BFA |
Burkina-Faso |
5 |
BWA |
Botswana |
6 |
CAF |
Central-African-Republic |
7 |
CIV |
Ivory-Coast |
8 |
CMR |
Cameroon |
9 |
COD |
Congo-Democratic-Republic |
10 |
COG |
Congo-Brazzaville |
11 |
COM |
Comores |
12 |
CPV |
Cape-Verde |
13 |
DJI |
Djibouti |
14 |
DZA |
Algeria |
15 |
EGY |
Egypt |
16 |
ERI |
Eritrea |
17 |
ESH |
Morocco |
18 |
ETH |
Ethiopia |
19 |
GAB |
Gabon |
20 |
GHA |
Ghana |
21 |
GIN |
Guinea |
22 |
GMB |
Senegal-And-Gambia |
23 |
GNB |
Guinea-Bissau |
24 |
GNQ |
Equatorial-Guinea |
25 |
KEN |
Kenya |
26 |
LBR |
Liberia |
27 |
LBY |
Libya |
28 |
LSO |
Lesotho |
29 |
MAR |
Morocco |
30 |
MDG |
Madagascar |
31 |
MLI |
Mali |
32 |
MOZ |
Mozambique |
33 |
MRT |
Mauritania |
34 |
MUS |
Mauritius |
35 |
MWI |
Malawi |
36 |
MYS |
Malawi |
37 |
NAM |
Namibia |
38 |
NER |
Niger |
39 |
NGA |
Nigeria |
40 |
RWA |
Rwanda |
41 |
SDN |
Sudan |
42 |
SEN |
Senegal-And-Gambia |
43 |
SLE |
Sierra-Leone |
44 |
SOM |
Somalia |
45 |
SSD |
South-Sudan |
46 |
STP |
Sao-Tome-And-Principe |
47 |
SWZ |
Eswatini |
48 |
SYC |
Seychelles |
49 |
TCD |
Chad |
50 |
TGO |
Togo |
51 |
TUN |
Tunisia |
52 |
TZA |
Tanzania |
53 |
UGA |
Uganda |
54 |
ZAF |
South-Africa |
55 |
ZMB |
Zambia |
56 |
ZWE |
Zimbabwe |
Asia
ID |
Country Code |
Country |
---|---|---|
57 |
AFG |
Afghanistan |
58 |
ARE |
Gcc-States |
59 |
ARM |
Armenia |
60 |
AZE |
Azerbaijan |
61 |
BGD |
Bangladesh |
62 |
BHR |
Gcc-States |
63 |
BTN |
Bhutan |
64 |
CHN |
China |
65 |
IDN |
Indonesia |
66 |
IRN |
Iran |
67 |
IRQ |
Iraq |
68 |
KAZ |
Kazakhstan |
69 |
KGZ |
Kyrgyzstan |
70 |
KHM |
Cambodia |
71 |
KWT |
Gcc-States |
72 |
LAO |
Laos |
73 |
LBN |
Lebanon |
74 |
LKA |
Sri-Lanka |
75 |
MDV |
Maldives |
76 |
MMR |
Myanmar |
77 |
MNG |
Mongolia |
78 |
NPL |
Nepal |
79 |
OMN |
Gcc-States |
80 |
PAK |
Pakistan |
81 |
PHL |
Philippines |
82 |
PRK |
North-Korea |
83 |
QAT |
Gcc-States |
84 |
SAU |
Gcc-States |
85 |
SYR |
Syria |
86 |
THA |
Thailand |
87 |
TJK |
Tajikistan |
88 |
TLS |
Indonesia |
89 |
UZB |
Uzbekistan |
90 |
VNM |
Vietnam |
91 |
YEM |
Yemen |
92 |
PNG |
Indonesia |
Europe
ID |
Country Code |
Country |
---|---|---|
93 |
ALB |
Albania |
94 |
BGR |
Bulgaria |
95 |
BLR |
Belarus |
96 |
GEO |
Georgia |
97 |
GRC |
Greece |
98 |
HUN |
Hungary |
99 |
MDA |
Moldova |
100 |
MKD |
Macedonia |
101 |
POL |
Poland |
102 |
ROU |
Romania |
103 |
SVK |
Slovakia |
104 |
TUR |
Turkey |
105 |
UKR |
Ukraine |
106 |
RUS |
Russia |
South America
ID |
Country Code |
Country |
---|---|---|
107 |
ARG |
Argentina |
108 |
BOL |
Bolivia |
109 |
BRA |
Brazil |
110 |
CHL |
Chile |
111 |
COL |
Colombia |
112 |
ECU |
Ecuador |
113 |
GUY |
Guyana |
114 |
PER |
Peru |
115 |
PRY |
Paraguay |
116 |
SUR |
Suriname |
117 |
URY |
Uruguay |
118 |
VEN |
Venezuela |
Central America & Caribbean
ID |
Country Code |
Country |
---|---|---|
119 |
ATG |
Antigua and Barbuda |
120 |
BHS |
Bahamas |
121 |
BLZ |
Belize |
122 |
CRI |
Costa-Rica |
123 |
CUB |
Cuba |
124 |
DMA |
Dominica |
125 |
DOM |
Haiti-And-DomRep |
126 |
GTM |
Guatemala |
127 |
HND |
Honduras |
128 |
HTI |
Haiti-And-DomRep |
129 |
JAM |
Jamaica |
130 |
NIC |
Nicaragua |
131 |
PAN |
Panama |
132 |
SLV |
El-Salvador |
133 |
TTO |
None |
134 |
VCT |
None |
135 |
BRB |
Central-America |
136 |
LCA |
None |
137 |
KNA |
None |
Australia & Oceania
ID |
Country Code |
Slug |
---|---|---|
138 |
FJI |
Fiji |
139 |
FSM |
Micronesia |
140 |
KIR |
Kiribati |
141 |
MHL |
Marshall-Islands |
142 |
SLB |
Solomon-Islands |
143 |
TON |
Tonga |
144 |
VUT |
Vanuatu |
North America
ID |
Country Code |
Country |
---|---|---|
145 |
GRD |
Greenland |
146 |
MEX |
Mexico |
Risk Assessment Plugin#
The datasets produced by GAIA are fully compatible with the Risk Assessment QGIS Plugin, enabling seamless integration into spatial risk analyses. They are openly available for multiple countries through HeiGIT on HDX, providing ready-to-use geospatial layers for risk assessments.
Information
The Coping Capacity, Vulnerability, and Flood Exposure files can be used directly as input for the Risk Assessment QGIS Plugin when analyzing flood hazards. Only minor adjustments, such as renaming the ID columns, are required for compatibility.
1. Access to Services#
Represents the share of the population with access to key facilities within defined distances or travel times.
Data Sources#
Accessibility data (isochrones) downloaded from MinIO for the categories:
education
,hospitals
,primary_healthcare
.Population data: WorldPop raster data (vulnerable populations) via Demographics.
Processing Steps#
Download accessibility isochrones in GPKG format for each category.
Load administrative boundaries at admin level 2 from local GeoJSON files.
Fetch WorldPop raster datasets and sum relevant indicators to produce a vulnerable population raster.
For each category and cutoff:
Select isochrone polygons for the cutoff value.
Intersect polygons with administrative boundaries.
Use
rasterstats.zonal_stats
to compute total population within the isochrone area.
Save results as a CSV containing population counts per administrative unit.
Outputs#
CSV file:
{country_code}_{admin_level}_access.csv
Columns:
{admin_level}_PCODE
,access_pop_education_5km
,access_pop_education_10km
, etc.CRS: WGS84 (
EPSG:4326
)Units: population count
Python Script (Access to Services)
import os
import requests
from pathlib import Path
import geopandas as gpd
import rioxarray
import rasterio
import pandas as pd
import numpy as np
import argparse
import logging
from rasterstats import zonal_stats
from scripts.fetch_worldpop import fetch_worldpop
CATEGORY_SPECS = {
"education": {
"ranges": {5000: "access_pop_education_5km",
10000: "access_pop_education_10km",
20000: "access_pop_education_20km"},
"type": "distance", # meters
},
"hospitals": {
"ranges": {1800: "access_pop_hospitals_30min",
3600: "access_pop_hospitals_1h",
7200: "access_pop_hospitals_2h"},
"type": "time", # seconds
},
"primary_healthcare": {
"ranges": {1800: "access_pop_primary_healthcare_30min",
3600: "access_pop_primary_healthcare_1h",
7200: "access_pop_primary_healthcare_2h"},
"type": "time", # seconds
}
}
def download_access_gpkg(country_code, category, work_dir, context):
"""Download accessibility gpkg file if missing, return path."""
url = f"https://warm.storage.heigit.org/heigit-hdx-public/access/{country_code.lower()}/{country_code}_{category}_access.gpkg"
gpkg_path = Path(work_dir) / f"{country_code}_{category}_access.gpkg"
if not gpkg_path.exists():
context.info(f"Downloading {category} access GPKG…")
resp = requests.get(url, stream=True)
resp.raise_for_status()
with open(gpkg_path, "wb") as f:
for chunk in resp.iter_content(1024 * 1024):
f.write(chunk)
else:
context.info(f"{gpkg_path} already exists, skipping download.")
return gpkg_path
def find_value_field(df, context):
"""Try to find the field that contains the isochrone cutoff values."""
for candidate in ["value", "range", "cost", "distance", "time"]:
if candidate in df.columns:
return candidate
# fallback: first numeric column
for col in df.columns:
if pd.api.types.is_numeric_dtype(df[col]):
context.info(f"Guessed value field: {col}")
return col
raise ValueError("Could not identify value/range column in isochrones")
def compute_access_population(country_code, admin_level, gdf_admin, work_dir, output_dir, context):
"""
Compute how much vulnerable population lives within accessibility isochrones,
grouped by chosen administrative level.
"""
country_code = country_code.upper()
work_dir = Path(work_dir)
output_dir = Path(output_dir)
out_csv = output_dir / f"{country_code}_{admin_level}_access.csv"
if out_csv.exists():
context.info(f"CSV already exists, skipping: {out_csv}")
return str(out_csv)
# --- ensure vulnerable population raster ---
context.info("Fetching WorldPop rasters…")
indicator_tifs = fetch_worldpop(country_code)
pop_rasters = [rioxarray.open_rasterio(p).squeeze() for p in indicator_tifs]
vuln_pop = sum(pop_rasters)
vuln_tmp = work_dir / f"{country_code}_vulnerable_pop.tif"
vuln_pop.rio.to_raster(vuln_tmp)
results = pd.DataFrame({f"{admin_level}_PCODE": gdf_admin[f"{admin_level}_PCODE"]})
for category, spec in CATEGORY_SPECS.items():
gpkg_path = download_access_gpkg(country_code, category, work_dir, context)
# Load ADM0 isochrones
isochrones = gpd.read_file(gpkg_path, layer="ADM0")
isochrones = isochrones.to_crs("EPSG:4326")
value_field = find_value_field(isochrones, context)
for cutoff, colname in spec["ranges"].items():
context.info(f"{category}: processing {cutoff} ({colname})")
subset = isochrones[isochrones[value_field] == cutoff]
if subset.empty:
# Skip creating this column if no polygons found
context.info(f" No polygons found for {cutoff} in {category} — skipping.")
continue
# Intersect chosen admin level with isochrone
admin_in_range = gpd.overlay(gdf_admin, subset, how="intersection")
if admin_in_range.empty:
context.info(f" No intersections found for {cutoff} in {category} — skipping.")
continue
stats = zonal_stats(admin_in_range, vuln_tmp, stats="sum", nodata=0, geojson_out=True)
if not stats:
context.info(f" No zonal stats computed for {cutoff} in {category} — skipping.")
continue
sums = {}
for s in stats:
pcode = s["properties"][f"{admin_level}_PCODE"]
sums[pcode] = sums.get(pcode, 0) + (s["properties"]["sum"] or 0)
results[colname] = (
results[f"{admin_level}_PCODE"]
.map(sums)
.fillna(0)
.round(0)
.astype(int)
)
output_dir.mkdir(parents=True, exist_ok=True)
results.to_csv(out_csv, index=False)
context.info(f"Accessibility population CSV written to {out_csv}")
return str(out_csv)
2. Facilities#
Represents the availability and spatial distribution of essential service infrastructure such as schools, hospitals, and primary healthcare centers.
Data Sources#
OpenStreetMap (OSM) data, accessed through either:
Overpass API, for direct querying of recent OSM feature data; or
Ohsome API, for more stable and filtered access to OSM geometries.
Administrative boundaries (GeoJSON): Used to spatially aggregate features at administrative level 2.
Processing Steps#
Load administrative boundaries (GeoJSON) for the target country at the desired administrative level.
Define filters for each facility type:
Education:
amenity=school
Hospitals:
amenity=hospital
orhealthcare=hospital
Primary healthcare: Non-hospital facilities providing healthcare services, such as
clinic
,doctors
,midwife
,nurse
, orcenter
.
Determine API source:
If Overpass is used, queries are dynamically built and executed using bounding boxes of the country boundary.
If Ohsome is used, geometry data is fetched for the same bounding box and time period (if specified).
Download and parse facility points:
For Overpass: CSV results are parsed into GeoDataFrames with
lon
,lat
, andosmId
attributes.For Ohsome: JSON features are directly converted into GeoDataFrames.
Assign categories and export raw results:
Each facility type (education, hospitals, primary healthcare) is stored as a separate GeoJSON in a
Temporary
folder.
Aggregate by administrative unit:
Facilities are spatially joined (
sjoin
) with administrative polygons.For each category, counts are grouped by
{admin_level}_PCODE
.
Combine and export results:
Counts for all categories are merged into a single summary table.
Output is saved as a CSV file in the
Output
directory.
Outputs#
CSV file:
{country_code}_{admin_level}_facilities.csv
Columns:
{admin_level}_PCODE
education_count
hospitals_count
primary_healthcare_count
CRS: WGS84 (
EPSG:4326
)Units: number of facilities per administrative unit
Intermediate GeoJSON files:
Stored in
Temporary/
folder as raw point data for each facility category.
Python Script (Facilities)
import argparse
import os
import sys
import requests
import geopandas as gpd
import pandas as pd
import overpass
import warnings
from pathlib import Path
os.environ["OGR_GEOJSON_MAX_OBJ_SIZE"] = "0" # no limits when reading complex geojsons
warnings.simplefilter("ignore", UserWarning)
OVERPASS_FILTERS = {
"education": ["nwr[amenity=school]"],
"hospitals": ["nwr[amenity=hospital]","nwr[healthcare=hospital]"],
"primary_healthcare": [
'nwr["amenity"~"^(doctors|clinic)$"]["amenity"!="hospital"]["healthcare"!="hospital"]',
'nwr["healthcare"~"^(clinic|doctors|midwife|nurse|center)$"]["amenity" != "hospital"]["healthcare" != "hospital"]'
]
}
OHSOME_ENDPOINT = "https://api.ohsome.org/v1/elements/geometry"
OHSOME_FILTERS = {
"education": "amenity=school",
"hospitals": "amenity=hospital or healthcare=hospital",
"primary_healthcare": (
"not amenity=hospital and not healthcare=hospital and "
"(amenity=doctors or amenity=clinic or healthcare=clinic or "
"healthcare=doctors or healthcare=midwife or healthcare=nurse or healthcare=center)"
),
}
def parse_overpass_csv_to_gpd(result):
headers = result[0]
rows = result[1:]
df = pd.DataFrame(rows, columns=headers)
df['@lon'] = df['@lon'].astype(float)
df['@lat'] = df['@lat'].astype(float)
df['@id'] = df['@id'].astype(int)
df = df.rename(columns={'@lon': 'lon', '@lat': 'lat', '@id': 'osmId'})
gdf = gpd.GeoDataFrame(
df,
geometry=gpd.points_from_xy(df['lon'], df['lat']),
crs="EPSG:4326"
)
return gdf
def fetch_overpass(context_log, boundary_file, output_dir, country_code, admin_level, time=None):
context_log.info("Using Overpass API to fetch facilities...")
id_col = f"{admin_level.upper()}_PCODE"
# Paths for output files
temp_dir = output_dir / "Temporary"
out_dir = output_dir / "Output"
expected_files = [
out_dir / f"{country_code}_{admin_level}_facilities.csv"
] + [
temp_dir / f"{country_code}_{category}_raw.geojson"
for category in OVERPASS_FILTERS.keys()
]
# Check if all expected files exist → skip
if all(f.exists() for f in expected_files):
context_log.info("All Overpass output files exist. Skipping fetch_overpass.")
return expected_files[0] # Return main summary path
try:
boundary = gpd.read_file(boundary_file)
except Exception as e:
context_log.info(f"Error reading boundary: {e}")
sys.exit(1)
if id_col not in boundary.columns:
context_log.warning(f"Expected ID column {id_col} not found in {boundary_file}")
return None
minx, miny, maxx, maxy = boundary.total_bounds.tolist()
bbox_str = f"{miny},{minx},{maxy},{maxx}"
date_clause = f'[date:"{time}"]' if time else ''
api = overpass.API(timeout=300)
category_gdfs = {}
for category, exprs in OVERPASS_FILTERS.items():
filter_parts = [f"{expr}({bbox_str});" for expr in exprs]
query = f'[out:csv(::lon,::lat,::id,::type)]{date_clause};({"".join(filter_parts)});out center;'
try:
result = api.get(query, build=False)
except Exception as e:
context_log.info(f"Overpass query failed for {category}: {e}")
continue
if not result or len(result) < 2:
context_log.info(f"No results for {category}")
continue
gdf = parse_overpass_csv_to_gpd(result)
if gdf.empty:
context_log.info(f"No features found for {category}")
continue
gdf["geometry"] = gdf.geometry.centroid
gdf["category"] = category
raw_path = temp_dir / f"{country_code}_{category}_raw.geojson"
raw_path.parent.mkdir(parents=True, exist_ok=True)
gdf.to_file(raw_path, driver="GeoJSON")
context_log.info(f"Wrote raw {category} features to {raw_path}")
category_gdfs[category] = gdf
if not category_gdfs:
context_log.info("No Overpass features found at all.")
return None
counts = boundary[[id_col]].copy()
for cat, gdf in category_gdfs.items():
joined = gpd.sjoin(gdf, boundary, how="inner", predicate="intersects")
grouped = joined.groupby(id_col).size().rename(f"{cat}_count")
counts = counts.merge(grouped, on=id_col, how="left")
counts = counts.fillna(0).astype({col: int for col in counts.columns if col != id_col})
summary_path = out_dir / f"{country_code}_{admin_level}_facilities.csv"
summary_path.parent.mkdir(parents=True, exist_ok=True)
counts.to_csv(summary_path, index=False)
context_log.info(f"Wrote summary to {summary_path}")
return summary_path
def fetch_ohsome(context_log, boundary_file, output_dir, country_code, admin_level, time=None):
context_log.info("Using Ohsome API to fetch facilities...")
id_col = f"{admin_level.upper()}_PCODE"
temp_dir = output_dir / "Temporary"
out_dir = output_dir / "Output"
expected_files = [
out_dir / f"{country_code}_{admin_level}_facilities.csv"
] + [
temp_dir / f"{country_code}_{category}_raw.geojson"
for category in OHSOME_FILTERS.keys()
]
if all(f.exists() for f in expected_files):
context_log.info("All Ohsome output files exist. Skipping fetch_ohsome.")
return expected_files[0]
try:
boundary = gpd.read_file(boundary_file)
except Exception as e:
context_log.info(f"Error reading boundary: {e}")
sys.exit(1)
if id_col not in boundary.columns:
context_log.warning(f"Expected ID column {id_col} not found in {boundary_file}")
return None
minx, miny, maxx, maxy = boundary.total_bounds.tolist()
bbox_str = f"{minx},{miny},{maxx},{maxy}"
category_gdfs = {}
for category, filter_str in OHSOME_FILTERS.items():
params = {"bboxes": bbox_str, "filter": filter_str}
if time:
params["time"] = time
try:
r = requests.post(OHSOME_ENDPOINT, params=params)
r.raise_for_status()
except Exception as e:
context_log.info(f"Ohsome query failed for {category}: {e}")
continue
data = r.json()
gdf = gpd.GeoDataFrame.from_features(data["features"], crs="EPSG:4326")
if gdf.empty:
context_log.info(f"No features found for {category}")
continue
gdf = gdf.copy()
gdf["geometry"] = gdf.geometry.centroid
gdf["category"] = category
raw_path = temp_dir / f"{country_code}_{category}_raw.geojson"
raw_path.parent.mkdir(parents=True, exist_ok=True)
gdf.to_file(raw_path, driver="GeoJSON")
context_log.info(f"Wrote raw {category} features to {raw_path}")
category_gdfs[category] = gdf
if not category_gdfs:
context_log.info("No categories found, nothing to aggregate.")
return None
counts = boundary[[id_col]].copy()
for cat, gdf in category_gdfs.items():
joined = gpd.sjoin(gdf, boundary, how="inner", predicate="intersects")
grouped = joined.groupby(id_col).size().rename(f"{cat}_count")
counts = counts.merge(grouped, on=id_col, how="left")
counts = counts.fillna(0).astype({col: int for col in counts.columns if col != id_col})
summary_path = out_dir / f"{country_code}_{admin_level}_facilities.csv"
summary_path.parent.mkdir(parents=True, exist_ok=True)
counts.to_csv(summary_path, index=False)
context_log.info(f"Wrote summary to {summary_path}")
return summary_path
3. Coping Capacity#
Represents the ability of a population to access essential services and benefit from available infrastructure.
This layer combines Access to Services and Facilities indicators to assess local capacity to cope with shocks or disruptions.
Data Sources#
Access to Services CSVs: Derived from population accessibility analysis (see Access to Services chapter).
Facilities CSVs: Derived from OpenStreetMap facility data (see Facilities chapter).
Both inputs are aggregated at the same administrative level (e.g., ADM2).
Processing Steps#
Input CSVs:
One Access to Services CSV and one Facilities CSV are provided per administrative level.
Each CSV includes an identifier column such as
ADM0_PCODE
,ADM1_PCODE
, orADM2_PCODE
.
Column Detection:
The script automatically detects the administrative identifier column by searching for any column ending with
_PCODE
.
Merge Operation:
The two datasets are merged on the identified
*_PCODE
column using a left join.This ensures all access indicators are retained even if some administrative areas lack facility data.
Output Generation:
The merged dataset is saved as
{country_code}_{admin_level}_coping.csv
underOutput/
.One output is produced per administrative level present in the input data.
Outputs#
CSV file:
{country_code}_{admin_level}_coping.csv
Columns:
{admin_level}_PCODE
All accessibility indicator columns (e.g.,
access_pop_education_5km
,access_pop_hospitals_1h
)All facility count columns (e.g.,
education_count
,hospitals_count
,primary_healthcare_count
)
CRS: WGS84 (
EPSG:4326
)Units: counts and population values aggregated per administrative unit
Python Script (Coping Capacity)
import os
import pandas as pd
from pathlib import Path
from typing import List
def coping_asset(context, access_asset: List[str], facilities_asset: List[str]) -> list[str]:
"""
Combine accessibility and facilities CSVs into a single coping dataset.
Joins on the ADM*_PCODE column per admin level.
Produces one coping CSV per admin level in Output/.
"""
country_code = context.partition_key.upper()
outputs = []
for access_csv, facilities_csv in zip(access_asset, facilities_asset):
if not os.path.exists(access_csv) or not os.path.exists(facilities_csv):
context.log.warning(
f"Skipping merge for {country_code}: missing files {access_csv}, {facilities_csv}"
)
continue
try:
df_access = pd.read_csv(access_csv)
df_facilities = pd.read_csv(facilities_csv)
# detect admin code column automatically (ADM0_PCODE, ADM1_PCODE, etc.)
id_col = [c for c in df_access.columns if c.endswith("_PCODE")]
if not id_col:
context.log.warning(f"Skipping {access_csv}: no *_PCODE column found")
continue
id_col = id_col[0]
merged = pd.merge(df_access, df_facilities, on=id_col, how="left")
admin_level = id_col.split("_")[0] # e.g., ADM2
output_dir = Path("data") / country_code / "Output"
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / f"{country_code}_{admin_level}_coping.csv"
merged.to_csv(output_path, index=False)
outputs.append(str(output_path))
context.log.info(f"[{country_code}] Wrote coping CSV: {output_path}")
except Exception as e:
context.log.warning(f"Failed to merge {access_csv} and {facilities_csv}: {e}")
if not outputs:
context.log.warning(f"No coping outputs created for {country_code}")
return outputs
4. Demographics#
Provides population distribution indicators based on age and sex.
This layer is derived from the WorldPop global population dataset and quantifies vulnerable population groups across administrative boundaries.
Data Sources#
WorldPop Age-Sex Population Rasters:
Downloaded from the WorldPop Global Constrained Population dataset (Global_2000_2020_Constrained
).Administrative boundaries (GeoJSON): Used to spatially aggregate features at administrative level 2.
Indicators#
Each demographic indicator represents the sum of population counts matching specified age and sex ranges:
Indicator |
Description |
Ages (years) |
Sexes |
---|---|---|---|
|
Total female population |
0–80+ |
f |
|
Total children under 5 |
0–4 |
f, m |
|
Female children under 5 |
0–4 |
f |
|
Population aged 65 and above |
65+ |
f, m |
|
Population under 15 |
0–14 |
f, m |
|
Female population under 15 |
0–14 |
f |
Processing Steps#
Download Required WorldPop Tiles
For each indicator, the script identifies which age/sex rasters are needed (e.g.,
m_0
,f_1
, etc.).Tiles are downloaded from Worldpop.
Merge and Sum Rasters
The corresponding age/sex tiles are summed into one raster per indicator.
Outputs are saved in
data/{country}/Temporary/
.
Aggregate by Administrative Units
Using the administrative boundary GeoJSON (e.g.,
ADM2
), zonal statistics are computed for each indicator.Population counts are summed per administrative area.
Output
A single CSV per country:
{country_code}_{admin_level}_demographics.csv
Saved to
data/{country}/Output/
Outputs#
File:
{country_code}_{admin_level}_demographics.csv
Columns:
{admin_level}_PCODE
female_pop
children_u5
female_u5
elderly
pop_u15
female_u15
CRS: WGS84 (
EPSG:4326
)Units: Population counts (integer, aggregated per administrative unit)
Python Script (Demographics)
import os
import sys
from typing import List
import requests
import rasterio
import pandas as pd
import geopandas as gpd
import argparse
import logging
import shutil
from rasterstats import zonal_stats
# Define indicators directly
INDICATORS = {
"female_pop": {"ages": [0, 1, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80], "sexes": ["f"]},
"children_u5": {"ages": [0, 1], "sexes": ["f", "m"]},
"female_u5": {"ages": [0, 1], "sexes": ["f"]},
"elderly": {"ages": [65, 70, 75, 80], "sexes": ["f", "m"]},
"pop_u15": {"ages": [0, 1, 5, 10], "sexes": ["f", "m"]},
"female_u15": {"ages": [0, 1, 5, 10], "sexes": ["f"]},
}
BASE_URL = "https://data.worldpop.org/GIS"
POP_TIMEFRAME = "Global_2000_2020_Constrained"
YEAR = "2020"
def download_url(url, dest_path):
resp = requests.get(url, stream=True)
resp.raise_for_status()
with open(dest_path, "wb") as fp:
for chunk in resp.iter_content(1024 * 1024):
fp.write(chunk)
def merge_and_sum_rasters(raster_paths: List[str], out_path: str, context_log):
"""
Open each path in `raster_paths`, read band 1 into memory, sum them,
and write a single‐band float32 GeoTIFF to `out_path`.
"""
if not raster_paths:
raise ValueError("No rasters passed for merging!")
with rasterio.open(raster_paths[0]) as src0:
meta = src0.meta.copy()
data_sum = src0.read(1, masked=True).filled(0).astype("float64")
for p in raster_paths[1:]:
with rasterio.open(p) as src:
arr = src.read(1, masked=True).filled(0).astype("float64")
data_sum += arr
meta.update(dtype="float32", count=1, compress="lzw", nodata=0)
with rasterio.open(out_path, "w", **meta) as dst:
dst.write(data_sum.astype("float32"), 1)
context_log.info(f"Wrote merged raster to {out_path}")
def fetch_worldpop(country, context_log=None, worldpop_code=None):
"""
Download WorldPop rasters for a country and aggregate into indicator rasters.
Produces 6 GeoTIFFs: one per indicator (no total population).
"""
if context_log is None:
logging.basicConfig(level=logging.INFO)
context_log = logging.getLogger("worldpop")
country = country.upper()
if not worldpop_code:
worldpop_code = country
worldpop_code_low = country.lower()
else:
worldpop_code_low = worldpop_code.lower()
# Force output directory to Temporary
out_dir = os.path.join("data", country, "Temporary")
os.makedirs(out_dir, exist_ok=True)
# Pre-check: if all 6 indicator rasters already exist, skip
expected_outputs = [
os.path.join(out_dir, f"{country}_pop_{ind_name}_{YEAR}_constrained.tif")
for ind_name in INDICATORS.keys()
]
if all(os.path.exists(path) for path in expected_outputs):
context_log.info(f"[{country}] → all {len(expected_outputs)} WorldPop indicators already exist, skipping download.")
return expected_outputs
out_dir_raw = os.path.join(out_dir, "worldpop_raw")
os.makedirs(out_dir_raw, exist_ok=True)
downloaded = []
# 1) Download needed age/sex rasters
needed_bins = set()
for ind in INDICATORS.values():
for sex in ind["sexes"]:
for age in ind["ages"]:
needed_bins.add((sex, age))
for sex, age in needed_bins:
fname = f"{worldpop_code_low}_{sex}_{age}_{YEAR}_constrained.tif"
url = f"{BASE_URL}/AgeSex_structures/{POP_TIMEFRAME}/{YEAR}/{worldpop_code}/{fname}"
dest = os.path.join(out_dir_raw, f"{country}_{sex}_{age}_{YEAR}_constrained.tif")
if not os.path.exists(dest):
context_log.info(f"[{country}] → downloading {sex}_{age}")
try:
download_url(url, dest)
except Exception as e:
context_log.info(f"[ERROR] failed to download {url}: {e}")
sys.exit(1)
else:
context_log.info(f"[{country}] → skipping existing {fname}")
downloaded.append(dest)
# 2) Aggregate indicators
processed = []
for ind_name, ind in INDICATORS.items():
filtered_paths = [
os.path.join(out_dir_raw, f"{country}_{sex}_{age}_{YEAR}_constrained.tif")
for sex in ind["sexes"]
for age in ind["ages"]
]
merged_out = os.path.join(out_dir, f"{country}_pop_{ind_name}_{YEAR}_constrained.tif")
if not os.path.exists(merged_out):
context_log.info(f"[{country}] → processing indicator {ind_name}")
try:
merge_and_sum_rasters(filtered_paths, merged_out, context_log)
except Exception as e:
context_log.info(f"[ERROR] failed to process {ind_name}: {e}")
sys.exit(1)
else:
context_log.info(f"[{country}] → skipping existing {ind_name}")
processed.append(merged_out)
# 3) Delete raw folder
if os.path.exists(out_dir_raw):
shutil.rmtree(out_dir_raw)
context_log.info(f"[{country}] → deleted raw folder: {out_dir_raw}")
context_log.info(f"\n✔ ∙ {len(processed)} indicators saved under {out_dir}")
return processed
def aggregate_worldpop_to_csv(country_code: str, admin_level="ADM2", context_log=None) -> str:
"""
Download WorldPop indicators for a country and save CSV.
"""
temp_dir = os.path.join("data", country_code, "Temporary")
os.makedirs(temp_dir, exist_ok=True)
output_dir = os.path.join("data", country_code, "Output")
os.makedirs(output_dir, exist_ok=True)
if context_log is None:
logging.basicConfig(level=logging.INFO)
context_log = logging.getLogger("worldpop")
# 1) Fetch indicators (6 GeoTIFFs) into data/{country}/Temporary
tifs = fetch_worldpop(country=country_code, context_log=context_log)
# 2) Load ADM polygons
adm_path = f"data/{country_code}/{country_code}_{admin_level}.geojson"
gdf = gpd.read_file(adm_path)
expected_column = f"{admin_level}_PCODE"
if expected_column not in gdf.columns:
raise ValueError(
f"GeoJSON must contain column '{expected_column}' "
f"(found: {gdf.columns.tolist()})"
)
# 3) Map indicator names to files
indicators = ["female_pop","children_u5","female_u5","elderly","pop_u15","female_u15"]
tif_map = dict(zip(indicators, tifs))
results = pd.DataFrame()
results[f"{admin_level}_PCODE"] = gdf[f"{admin_level}_PCODE"]
# 4) Compute zonal sums
for ind, path in tif_map.items():
stats = zonal_stats(gdf, path, stats="sum", nodata=0)
results[ind] = [s["sum"] for s in stats]
# Round all numeric columns to 0 decimals
numeric_cols = results.columns.drop(f"{admin_level}_PCODE")
results[numeric_cols] = results[numeric_cols].round(0).astype(int)
# 5) Save CSV
out_dir = os.path.join("data", country_code, "Output")
os.makedirs(out_dir, exist_ok=True)
csv_path = os.path.join(out_dir, f"{country_code}_{admin_level}_demographics.csv")
results.to_csv(csv_path, index=False)
return csv_path
5. Rural Population#
Represents the proportion and distribution of people living in rural areas within each administrative unit. Rural areas are those outside urban extents, typically characterized by lower population density, agricultural or natural land use, and limited infrastructure compared to urban centers.
This layer combines WorldPop population rasters with Global Human Settlement Layer (GHS-SMOD) data to estimate rural populations for each demographic group.
Data Sources#
WorldPop Population Rasters:
Derived from the WorldPop Age-Sex structured datasets.
Used to calculate population counts for various demographic indicators (see the Demographics chapter).GHS-SMOD (Settlement Model) Raster:
Downloaded from the JRC GHSL repository for the year 2030 projection.
Used to classify grid cells as urban or rural based on settlement patterns.Administrative Boundaries:
GeoJSON boundary files (e.g., ADM2) define the spatial units used for population aggregation.
Processing Steps#
Download GHS-SMOD Raster
The script downloads the global SMOD ZIP archive for 2030.
Extracts and selects the
.tif
raster corresponding to global settlement data.
Reclassify SMOD Raster
Converts original SMOD categories into two classes:
Rural = 1 (codes 11–13)
Urban = 2 (codes 21–30)
Excluded/Water = 0
The resulting raster is saved as
smod_reclass.tif
.
Fetch WorldPop Demographic Indicators
Ensures that population rasters for indicators such as
female_pop
,children_u5
, etc., are available.Reuses outputs from the
Demographics
asset.
Compute Rural Population
Reprojects the SMOD raster to match each WorldPop raster.
Multiplies each population raster by the rural mask (
smod == 1
).Performs zonal statistics per administrative unit (e.g., ADM2).
Output Generation
Aggregates results into a CSV file per country and administrative level.
Adds both absolute (
_rural
) and relative (_rural_perc
) population values.
Outputs#
CSV file:
{country_code}_{admin_level}_rural_population.csv
Columns:
{admin_level}_PCODE
{indicator}_rural
(e.g.,female_pop_rural
,children_u5_rural
)rural_pop_perc
(percentage of population living in rural areas)
CRS: WGS84 (
EPSG:4326
)Units: counts and percentages per administrative unit
Python Script (Rural Population)
import os
import shutil
import zipfile
import requests
from pathlib import Path
import rioxarray
import rasterio
import pandas as pd
import numpy as np
import argparse
import logging
import geopandas as gpd
from rasterstats import zonal_stats
from scripts.fetch_worldpop import fetch_worldpop
RECLASS_MAP = {
10: None,
11: 1, 12: 1, 13: 1, # rural
21: 2, 22: 2, 23: 2, 30: 2, # urban
}
SMOD_ZIP_URL = (
"https://jeodpp.jrc.ec.europa.eu/ftp/jrc-opendata/GHSL/"
"GHS_SMOD_GLOBE_R2023A/GHS_SMOD_E2030_GLOBE_R2023A_54009_1000/"
"V2-0/GHS_SMOD_E2030_GLOBE_R2023A_54009_1000_V2_0.zip"
)
def download_and_unzip_smod(work_dir, context):
"""Download SMOD ZIP, unzip, return path to TIF."""
zip_path = Path(work_dir) / "smod.zip"
unzip_dir = Path(work_dir) / "unzipped"
unzip_dir.mkdir(parents=True, exist_ok=True)
if not zip_path.exists():
context.info("Downloading SMOD global raster…")
resp = requests.get(SMOD_ZIP_URL, stream=True)
resp.raise_for_status()
with open(zip_path, "wb") as f:
for chunk in resp.iter_content(1024 * 1024):
f.write(chunk)
else:
context.info("SMOD ZIP already exists, skipping download.")
with zipfile.ZipFile(zip_path, "r") as z:
z.extractall(unzip_dir)
for root, _, files in os.walk(unzip_dir):
for fn in files:
if fn.lower().endswith(".tif"):
return zip_path, unzip_dir, Path(root) / fn
raise FileNotFoundError("No .tif found in SMOD ZIP")
def reclassify_raster(in_tif, out_tif, reclass_map, context):
with rasterio.open(in_tif) as src:
meta = src.meta.copy()
arr = src.read(1)
nodata_val = 0
out = np.full(arr.shape, nodata_val, dtype=np.uint8)
for old, new in reclass_map.items():
mask_val = (arr == old)
if new is None:
out[mask_val] = nodata_val
else:
out[mask_val] = new
meta.update(count=1, dtype="uint8", nodata=nodata_val)
with rasterio.open(out_tif, "w", **meta) as dst:
dst.write(out, 1)
context.info(f"Reclassified raster saved to {out_tif}")
def compute_rural_population(country_code, admin_level, gdf, work_dir, output_dir, context):
"""
Compute rural population counts by admin unit for each indicator.
Generates smod_reclass.tif if missing. Skips if output CSV already exists.
Adds percentage columns (_rural_perc) for each indicator.
"""
country_code = country_code.upper()
work_dir = Path(f"{work_dir}/Temporary")
temp_dir = Path(f"{work_dir}/data/{country_code}/Temporary")
output_dir = Path(output_dir)
reclass_tif = work_dir / "smod_reclass.tif"
out_csv = output_dir / f"{country_code}_{admin_level}_rural_population.csv"
# --- skip if CSV exists ---
if out_csv.exists():
context.info(f"CSV already exists, skipping: {out_csv}")
return str(out_csv)
zip_path = None
unzip_dir = None
try:
# --- ensure smod_reclass.tif exists ---
if not reclass_tif.exists():
context.info("smod_reclass.tif missing — will generate it")
zip_path, unzip_dir, smod_tif = download_and_unzip_smod(work_dir, context)
reclassify_raster(smod_tif, reclass_tif, RECLASS_MAP, context)
else:
context.info("Using existing smod_reclass.tif")
# Ensure WorldPop files exist
context.info(f"Ensuring demographic rasters exist in {temp_dir}...")
indicator_tifs = fetch_worldpop(country_code)
indicators = ["female_pop", "children_u5", "female_u5", "elderly", "pop_u15", "female_u15"]
tif_map = dict(zip(indicators, indicator_tifs))
# --- load SMOD raster ---
smod = rioxarray.open_rasterio(reclass_tif, masked=True).squeeze()
rural_df = pd.DataFrame({f"{admin_level}_PCODE": gdf[f"{admin_level}_PCODE"]})
# Store total population sums
total_pop_counts = {}
for label in indicators:
pop_raster_path = tif_map[label]
pop_raster = rioxarray.open_rasterio(pop_raster_path, masked=True).squeeze()
# Align SMOD raster to population raster
smod_aligned = smod.rio.reproject_match(pop_raster, resampling=rasterio.enums.Resampling.nearest)
rural_mask = (smod_aligned == 1).astype("float32")
rural_pop = pop_raster * rural_mask
tmp_rural = work_dir / f"tmp_rural_{label}.tif"
with rasterio.open(pop_raster_path) as src:
meta = src.meta.copy()
meta.update(compress="lzw", tiled=True,
bigtiff="yes" if src.width * src.height > 2**32 else "no")
with rasterio.open(tmp_rural, "w", **meta) as dst:
dst.write(rural_pop.values, 1)
# Rural population sums per admin unit
stats = zonal_stats(gdf, tmp_rural, stats="sum", nodata=0)
rural_df[f"{label}_rural"] = [s["sum"] if s["sum"] is not None else 0 for s in stats]
# Total population sums per admin unit
total_stats = zonal_stats(gdf, pop_raster_path, stats="sum", nodata=0)
total_pop_counts[label] = [s["sum"] if s["sum"] is not None else 0 for s in total_stats]
context.info(f"Processed rural population for {label}")
# --- calculate one overall rural percentage column ---
# Use total population (e.g. pop_u15 as proxy, or sum of all groups)
total_pop = pd.Series(total_pop_counts["female_pop"]).replace({0: np.nan})
rural_df["rural_pop_perc"] = (
rural_df["female_pop_rural"] / total_pop * 100
).fillna(0).round(2)
# --- finalize ---
count_cols = [c for c in rural_df.columns if c.endswith("_rural")]
perc_cols = [c for c in rural_df.columns if c.endswith("_rural_perc")]
# Counts → integers
rural_df[count_cols] = rural_df[count_cols].fillna(0).round(0).astype(int)
# Percentages → keep 2 decimals
rural_df[perc_cols] = rural_df[perc_cols].fillna(0).round(2)
output_dir.mkdir(parents=True, exist_ok=True)
rural_df.to_csv(out_csv, index=False)
context.info(f"Rural population CSV written to {out_csv}")
finally:
# cleanup
if zip_path and zip_path.exists():
zip_path.unlink()
context.info(f"Deleted {zip_path}")
if unzip_dir and unzip_dir.exists():
shutil.rmtree(unzip_dir)
context.info(f"Deleted {unzip_dir}")
# cleanup tmp_rural rasters
for tmp_file in work_dir.glob("tmp_rural_*.tif"):
try:
tmp_file.unlink()
context.info(f"Deleted {tmp_file}")
except Exception as e:
context.info(f"Failed to delete {tmp_file}: {e}")
return str(out_csv)
6. Vulnerability#
Represents the sensitivity of a population to external shocks based on demographic composition and settlement type.
This layer combines Demographics and Rural Population indicators to highlight population groups more likely to experience heightened vulnerability in rural or hard-to-reach regions.
Data Sources#
Demographics CSVs: Derived from WorldPop population datasets, disaggregated by age and sex (see Demographics chapter).
Rural Population CSVs: Derived from settlement layer analysis separating rural and urban populations (see Rural Population chapter).
Both datasets are aggregated to the same administrative level (e.g., ADM2).
Processing Steps#
Input CSVs:
One Demographics CSV and one Rural Population CSV are provided per administrative level.
Each file contains an identifier column such as
ADM0_PCODE
,ADM1_PCODE
, orADM2_PCODE
.
Column Detection:
The script automatically detects the administrative identifier column by finding any column ending with
_PCODE
.
Merge Operation:
The two datasets are merged on the identified
*_PCODE
column using a left join.This ensures demographic indicators are retained even when some administrative areas lack rural population data.
Output Generation:
The merged dataset is exported as
{country_code}_{admin_level}_vulnerability.csv
to theOutput/
directory.One file is created per administrative level present in the input data.
Outputs#
File:
{country_code}_{admin_level}_vulnerability.csv
Columns:
{admin_level}_PCODE
All demographic indicator columns (e.g.,
female_pop
,children_u5
,elderly
,pop_u15
)All rural population indicator columns (e.g.,
rural_pop
,urban_pop
,rural_ratio
)
CRS: WGS84 (
EPSG:4326
)Units: Population counts and ratios aggregated per administrative unit
Python SCript (Vulnerability)
import os
import pandas as pd
from pathlib import Path
from typing import List
def vulnerability_asset(context, demographics_asset: List[str], rural_asset: List[str]) -> list[str]:
"""
Combine demographics and rural population CSVs into a single vulnerability dataset.
Joins on the ADM*_PCODE column per admin level.
Produces one vulnerability CSV per admin level in Output/.
"""
country_code = context.partition_key.upper()
outputs = []
for demo_csv, rural_csv in zip(demographics_asset, rural_asset):
if not os.path.exists(demo_csv) or not os.path.exists(rural_csv):
context.log.warning(f"Skipping merge for {country_code}: missing files {demo_csv}, {rural_csv}")
continue
try:
df_demo = pd.read_csv(demo_csv)
df_rural = pd.read_csv(rural_csv)
# detect admin code column automatically (ADM0_PCODE, ADM1_PCODE, etc.)
id_col = [c for c in df_demo.columns if c.endswith("_PCODE")]
if not id_col:
context.log.warning(f"Skipping {demo_csv}: no *_PCODE column found")
continue
id_col = id_col[0]
merged = pd.merge(df_demo, df_rural, on=id_col, how="left")
admin_level = id_col.split("_")[0] # e.g., ADM2
output_dir = Path("data") / country_code / "Output"
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / f"{country_code}_{admin_level}_vulnerability.csv"
merged.to_csv(output_path, index=False)
outputs.append(str(output_path))
context.log.info(f"[{country_code}] Wrote vulnerability CSV: {output_path}")
except Exception as e:
context.log.warning(f"Failed to merge {demo_csv} and {rural_csv}: {e}")
if not outputs:
context.log.warning(f"No vulnerability outputs created for {country_code}")
return outputs
7. Crop Coverage and Change#
Quantifies the extent and temporal change of cropland areas within administrative boundaries.
This layer evaluates total cropland coverage as well as changes in cropland area between 2023 and 2024 using Google Dynamic World (V1). This dataset provides 10 m, near-real-time land classification derived from Sentinel-2 imagery. For each administrative unit, the proportion of pixels classified as “cropland” (class 4) is calculated for both years. These percentages are then compared to determine both absolute and relative changes in cropland area.
Data Sources#
Dynamic World V1 (
GOOGLE/DYNAMICWORLD/V1
):
Global, near real-time land cover dataset from Google Earth Engine, with 10 m spatial resolution and daily temporal frequency.Administrative Boundaries (GeoJSON):
Used to aggregate cropland extent within each administrative area (e.g., ADM2).Configuration File (
configs/assets_config.yaml
):
Defines two reference years undercrops_asset: years: [year1, year2]
for comparison.
Indicators#
Indicator |
Description |
Units |
---|---|---|
|
Percentage of total area classified as crops in the first year |
% |
|
Percentage of total area classified as crops in the second year |
% |
|
Absolute change in cropland area between the two years |
km² |
|
Percentage point difference in cropland coverage |
% points |
|
Relative percentage change compared to the baseline year |
% |
Processing Steps#
Configuration Parsing:
The script reads
assets_config.yaml
and extracts the two comparison years (e.g., 2018 and 2022).If the configuration does not define two valid years, an error is raised.
Input Geometry:
The administrative boundary file
data/{country_code}/{country_code}_{admin_level}.geojson
is loaded usingGeoPandas
.
Earth Engine Setup:
The script initializes Google Earth Engine (
ee.Initialize(project="aa-automatization")
).The
GOOGLE/DYNAMICWORLD/V1
collection is used to generate yearly mode composites for each year and boundary polygon.
Cropland Extraction:
Each composite image is classified into Dynamic World labels.
Label
4
corresponds to cropland.For each administrative unit, the mean cropland fraction is computed (as % of total area).
Change Calculation:
The following statistics are derived per polygon:
Percentage of cropland per year (
crops_{year1}_pct
,crops_{year2}_pct
)Absolute area change in km² (
crops_diff_km2
)Difference in percentage points (
crops_diff_pctpts
)Relative change compared to the first year (
crops_change_rel_pct
)
Chunked Processing:
To handle large datasets, polygons are processed in chunks (default: 20 features at a time).
Each chunk’s results are written to a temporary CSV, then appended to the final output.
Output Generation:
The combined dataset is saved as
{country_code}_{admin_level}_crops.csv
underdata/{country_code}/Output/
.
Outputs#
File:
{country_code}_{admin_level}_crops.csv
Columns:
{admin_level}_PCODE
crops_{year1}_pct
crops_{year2}_pct
crops_diff_km2
crops_diff_pctpts
crops_change_rel_pct
CRS: WGS84 (
EPSG:4326
)Units: Percentages (%) and area (km²)
Python Script (Crop Coverage and Change)
import os
import geopandas as gpd
import pandas as pd
import geemap
import ee
import yaml
import argparse
def load_years_from_config(config_path="configs/assets_config.yaml"):
"""Load years from crops_asset in assets_config.yaml"""
with open(config_path, "r") as f:
cfg = yaml.safe_load(f)
years = cfg.get("crops_asset", {}).get("years", [])
if not years or len(years) != 2:
raise ValueError("assets_config.yaml must define crops_asset: years: [year1, year2]")
return years[0], years[1]
def process_crops_for_admin(country_code: str, admin_level: str, config_path="configs/assets_config.yaml") -> str:
"""Run crop coverage calculation for a given country/admin level and return output CSV path."""
# Initialize Earth Engine
ee.Initialize(project="aa-automatization")
year_prev, year_curr = load_years_from_config(config_path)
gdf = gpd.read_file(f"data/{country_code}/{country_code}_{admin_level}.geojson")
chunk_size = 20
start_idx = 0
# Ensure Output folder exists
os.makedirs(f"data/{country_code}/Output", exist_ok=True)
output_csv = f"data/{country_code}/Output/{country_code}_{admin_level}_crops.csv"
if os.path.exists(output_csv):
os.remove(output_csv)
col_order = [
f"{admin_level.upper()}_PCODE",
f"crops_{year_prev}_pct",
f"crops_{year_curr}_pct",
"crops_diff_km2",
"crops_diff_pctpts",
"crops_change_rel_pct",
]
dw = ee.ImageCollection("GOOGLE/DYNAMICWORLD/V1")
def yearly_composite(y, geom):
coll = (
dw.filterDate(f"{y}-01-01", f"{y}-12-31")
.filterBounds(geom)
.select("label")
)
return coll.reduce(ee.Reducer.mode())
while start_idx < len(gdf):
end_idx = start_idx + chunk_size
gdf_chunk = gdf.iloc[start_idx:end_idx]
try:
fc = geemap.geopandas_to_ee(gdf_chunk)
def add_crops_stats(feature):
geom = feature.geometry()
comp_prev = yearly_composite(year_prev, geom)
comp_curr = yearly_composite(year_curr, geom)
polygon_area_km2 = ee.Number(geom.area()).divide(1e6)
crop_prev_raw = comp_prev.eq(4).rename("crops").reduceRegion(
reducer=ee.Reducer.mean(),
geometry=geom,
scale=10,
bestEffort=True,
).get("crops")
crop_curr_raw = comp_curr.eq(4).rename("crops").reduceRegion(
reducer=ee.Reducer.mean(),
geometry=geom,
scale=10,
bestEffort=True,
).get("crops")
crop_prev_perc = ee.Number(ee.Algorithms.If(crop_prev_raw, crop_prev_raw, 0)).multiply(100)
crop_curr_perc = ee.Number(ee.Algorithms.If(crop_curr_raw, crop_curr_raw, 0)).multiply(100)
crops_perc_point_dif = crop_curr_perc.subtract(crop_prev_perc)
crops_abs_dif_km2 = crops_perc_point_dif.multiply(polygon_area_km2).divide(100)
crops_rel_change_perc = ee.Algorithms.If(
crop_prev_perc.neq(0),
crops_perc_point_dif.divide(crop_prev_perc).multiply(100),
None,
)
return feature.set({
f"crops_{year_prev}_pct": crop_prev_perc,
f"crops_{year_curr}_pct": crop_curr_perc,
"crops_diff_km2": crops_abs_dif_km2,
"crops_diff_pctpts": crops_perc_point_dif,
"crops_change_rel_pct": crops_rel_change_perc,
})
fc_with_stats = fc.map(add_crops_stats)
fc_filtered = fc_with_stats.select(
propertySelectors=col_order, retainGeometry=False
)
temp_csv = f"data/{country_code}/{country_code}_crops_{admin_level}_temp_chunk.csv"
geemap.ee_to_csv(fc_filtered, filename=temp_csv)
df_chunk = pd.read_csv(temp_csv)
df_chunk = df_chunk[[c for c in col_order if c in df_chunk.columns]]
df_chunk[col_order[1:]] = df_chunk[col_order[1:]].round(2)
if start_idx == 0 and not os.path.exists(output_csv):
df_chunk.to_csv(output_csv, index=False)
else:
df_chunk.to_csv(output_csv, mode="a", header=False, index=False)
os.remove(temp_csv)
start_idx = end_idx
except Exception:
start_idx = end_idx
return output_csv
8. Vegetation Index#
Evaluates vegetation health and density using the Normalized Difference Vegetation Index (NDVI) derived from Landsat composites. NDVI values range from -1 to +1, where higher values indicate denser and healthier vegetation, while lower values reflect sparse or stressed vegetation, bare soil, or urban surfaces.
This layer provides summary statistics (mean, median, quartiles) and surface extent of high, medium, and low NDVI zones per administrative boundary.
Data Sources#
Landsat NDVI Composite (
LANDSAT/COMPOSITES/C02/T1_L2_8DAY_NDVI
):
8-day composite product containing NDVI values at 30 m resolution.Administrative Boundaries (GeoJSON):
Used for spatial aggregation at ADM levels (e.g., ADM2).Configuration File (
configs/assets_config.yaml
):
Defines the target year underndvi_asset: year: [YYYY]
.
Indicators#
Indicator |
Description |
Units |
---|---|---|
|
Mean NDVI value across the administrative unit |
NDVI (unitless) |
|
Median NDVI value |
NDVI (unitless) |
|
25th percentile NDVI |
NDVI (unitless) |
|
75th percentile NDVI |
NDVI (unitless) |
|
Area with NDVI ≥ 0.6 |
km² |
|
Area with 0.1 ≤ NDVI < 0.6 |
km² |
|
Area with NDVI < 0.1 |
km² |
NDVI Thresholds#
Category |
NDVI Range |
Interpretation |
---|---|---|
High NDVI |
≥ 0.6 |
Dense vegetation (forests, crops, green cover) |
Medium NDVI |
0.1 – 0.6 |
Moderate vegetation (grasslands, shrubs, mixed use) |
Low NDVI |
< 0.1 |
Bare soil, urban areas, or degraded vegetation |
Processing Steps#
Configuration Loading
Extracts
year
fromassets_config.yaml
(ndvi_asset: year: [YYYY]
).The script ensures exactly one valid year is defined.
Input Geometry
Loads administrative boundaries (
data/{country_code}/{country_code}_{admin_level}.geojson
) using GeoPandas.
Earth Engine Initialization
Connects to Google Earth Engine under the project
aa-automatization
.
Landsat NDVI Aggregation
Loads Landsat NDVI composites for the specified year.
Calculates the median NDVI across the year to reduce noise from clouds and seasonal variation.
NDVI Classification
Creates three NDVI masks:
High
: NDVI ≥ 0.6Medium
: 0.1 ≤ NDVI < 0.6Low
: NDVI < 0.1
Each mask is used to compute the corresponding surface area in square kilometers.
Statistical Summary per Polygon
Calculates the following metrics using
ee.Reducer
:Mean, Median, 25th and 75th Percentile NDVI
Area (km²) for each NDVI category
Areas are calculated from pixel counts (
count × 900 / 1,000,000
).
Chunked Processing
Polygons are processed in chunks (default: 20).
Reduces API payload errors when handling large datasets.
If payloads exceed limits, chunk size automatically decreases.
Output Generation
Each chunk’s results are exported to temporary CSVs and appended to
data/{country_code}/Output/{country_code}_{admin_level}_ndvi.csv
.Final columns are reordered and rounded to 2 decimals for clarity.
Outputs#
File:
{country_code}_{admin_level}_ndvi.csv
Columns:
{admin_level}_PCODE
NDVI_mean
NDVI_median
NDVI_p25
NDVI_p75
NDVI_high_sqkm
NDVI_medium_sqkm
NDVI_low_sqkm
CRS: WGS84 (
EPSG:4326
)Units: NDVI (unitless), Area (km²)
Python Script (Vegetation Index)
import geopandas as gpd
import geemap
import ee
import pandas as pd
import os
import yaml
import argparse
def load_year_from_config(config_path="configs/assets_config.yaml"):
"""Load year from ndvi_asset in assets_config.yaml"""
with open(config_path, "r") as f:
cfg = yaml.safe_load(f)
year = cfg.get("ndvi_asset", {}).get("year", [])
if not year or len(year) != 1:
raise ValueError("assets_config.yaml must define ndvi_asset: year: [year]")
return year[0]
def process_ndvi_for_admin(country_code: str, admin_level: str, config_path="configs/assets_config.yaml") -> str:
"""Run NDVI calculation for a given country/admin level and return output CSV path."""
# Initialize Earth Engine
ee.Initialize(project='aa-automatization')
gdf = gpd.read_file(f"data/{country_code}/{country_code}_{admin_level}.geojson")
chunk_size = 20
start_idx = 0
year = load_year_from_config(config_path)
os.makedirs(f"data/{country_code}/Output", exist_ok=True)
output_csv = f"data/{country_code}/Output/{country_code}_{admin_level}_ndvi.csv"
if os.path.exists(output_csv):
os.remove(output_csv)
col_order = [
f"{admin_level.upper()}_PCODE",
"NDVI_mean", "NDVI_median", "NDVI_p25", "NDVI_p75",
"NDVI_high_sqkm", "NDVI_medium_sqkm", "NDVI_low_sqkm"
]
while start_idx < len(gdf):
end_idx = start_idx + chunk_size
gdf_chunk = gdf.iloc[start_idx:end_idx]
try:
fc = geemap.geopandas_to_ee(gdf_chunk)
landsat = (ee.ImageCollection("LANDSAT/COMPOSITES/C02/T1_L2_8DAY_NDVI")
.filterDate(f"{year}-01-01", f"{year}-12-31")
.filterBounds(fc))
ndvi_image = landsat.select("NDVI").median()
high_ndvi_mask = ndvi_image.updateMask(ndvi_image.gte(0.6))
medium_ndvi_mask = ndvi_image.updateMask(ndvi_image.gte(0.1).And(ndvi_image.lt(0.6)))
low_ndvi_mask = ndvi_image.updateMask(ndvi_image.lt(0.1))
def add_stats(feature):
stats = ndvi_image.reduceRegion(
reducer=ee.Reducer.mean()
.combine(ee.Reducer.median(), "", True)
.combine(ee.Reducer.percentile([25, 75]), "", True),
geometry=feature.geometry(),
scale=30,
bestEffort=True
)
high_count = high_ndvi_mask.reduceRegion(
reducer=ee.Reducer.count(),
geometry=feature.geometry(),
scale=30,
bestEffort=True
)
medium_count = medium_ndvi_mask.reduceRegion(
reducer=ee.Reducer.count(),
geometry=feature.geometry(),
scale=30,
bestEffort=True
)
low_count = low_ndvi_mask.reduceRegion(
reducer=ee.Reducer.count(),
geometry=feature.geometry(),
scale=30,
bestEffort=True
)
high_count = ee.Dictionary(high_count).map(lambda k, v: ee.Number(v).multiply(900).divide(1_000_000))
medium_count = ee.Dictionary(medium_count).map(lambda k, v: ee.Number(v).multiply(900).divide(1_000_000))
low_count = ee.Dictionary(low_count).map(lambda k, v: ee.Number(v).multiply(900).divide(1_000_000))
all_stats = stats.combine(high_count).combine(medium_count).combine(low_count)
return feature.set(all_stats)
fc_with_stats = fc.map(add_stats)
fc_filtered = fc_with_stats.select(**{
'propertySelectors': col_order,
'retainGeometry': False
})
temp_csv = f"data/{country_code}/{country_code}_ndvi_{admin_level}_temp_chunk.csv"
geemap.ee_to_csv(fc_filtered, filename=temp_csv)
df_chunk = pd.read_csv(temp_csv)
df_chunk = df_chunk[[c for c in col_order if c in df_chunk.columns]]
round_cols = ["NDVI_mean", "NDVI_median", "NDVI_p25", "NDVI_p75"]
df_chunk[round_cols] = df_chunk[round_cols].round(2)
if start_idx == 0 and not os.path.exists(output_csv):
df_chunk.to_csv(output_csv, index=False)
else:
df_chunk.to_csv(output_csv, mode='a', header=False, index=False)
os.remove(temp_csv)
print(f"Processed chunk {start_idx}-{end_idx} (size {chunk_size})")
start_idx = end_idx
except ee.EEException as e:
msg = str(e)
if "Request payload size exceeds the limit" in msg:
if chunk_size > 1:
chunk_size = max(1, chunk_size // 2)
print(f"Payload too large. Reducing chunk size to {chunk_size} and retrying index {start_idx}...")
else:
print(f"Chunk size 1 still too large. Skipping index {start_idx}.")
start_idx += 1
else:
print(f"EEException: {e}. Skipping chunk.")
start_idx = end_idx
except FileNotFoundError:
print(f"Temp CSV not found. Retrying index {start_idx} with smaller chunk...")
if chunk_size > 1:
chunk_size = max(1, chunk_size // 2)
else:
start_idx += 1
except Exception as e:
print(f"Unexpected error: {e}. Skipping chunk {start_idx}-{end_idx}.")
start_idx = end_idx
return output_csv
9. Flood Exposure#
Estimates population and facility exposure to flooding using GLOFAS flood hazard rasters and WorldPop demographic layers. Flooded areas are identified by usign a threshold of 30 cm, and exposure metrics are aggregated per administrative boundary.
This layer provides indicators for the number of people affected per demographic group and the percentage/count of critical facilities (education, hospitals, primary healthcare) impacted by flood events of different return periods (RPs).
Data Sources#
GLOFAS Flood Hazard Rasters (
https://jeodpp.jrc.ec.europa.eu/ftp/jrc-opendata/CEMS-GLOFAS/flood_hazard/
)
Raster tiles containing flood depth values for different return periods (RPs).WorldPop Population Layers (
scripts/fetch_worldpop.py
)
Demographic rasters for multiple indicators at ~100 m resolution: total population, female, children under 5, elderly, etc.Facilities Data (
scripts/fetch_facilities_ohsome_overpass.py
)
Point geometries of critical infrastructure from OHSOME or Overpass APIs.Administrative Boundaries (GeoJSON)
Used to aggregate flood exposure per ADM level (e.g., ADM1).Configuration File (
configs/assets_config.yaml
)
Specifies:setup.flood_threshold
(flood depth in meters)setup.rps
(return periods)facilities_asset.api
(data source for facilities)
Indicators#
Indicator |
Description |
Units |
---|---|---|
|
Number of females affected by flood depth ≥ threshold |
people |
|
Number of children under 5 affected |
people |
|
Number of female children under 5 affected |
people |
|
Number of elderly (>65) affected |
people |
|
Number of population under 15 affected |
people |
|
Number of female population under 15 affected |
people |
|
Percentage of educational facilities flooded |
% |
|
Number of educational facilities flooded |
count |
|
Percentage of hospitals flooded |
% |
|
Number of hospitals flooded |
count |
|
Percentage of primary healthcare facilities flooded |
% |
|
Number of primary healthcare facilities flooded |
count |
{rp}
is the return period (e.g., 10, 50, 100 years),{thresh}
is the flood threshold in cm.
Flood Threshold#
Parameter |
Value |
Description |
---|---|---|
|
e.g., 0.3 m |
Minimum flood depth considered for exposure (configurable in |
Processing Steps#
Configuration Loading
Reads
setup.flood_threshold
andrps
fromassets_config.yaml
.Computes
THRESH_SUFFIX
(flood depth in cm) for indicator naming.
Input Geometry
Loads administrative boundaries (
data/{country_code}/{country_code}_{admin_level}.geojson
) using GeoPandas.
Tile Selection and Download
Queries GLOFAS server for all flood tiles corresponding to each RP.
Selects only tiles intersecting the country boundary.
Downloads missing tiles into
data/{country_code}/Temporary
.
Raster Merging and Clipping
Merges selected tiles into a single raster per RP.
Clips the merged raster to the country boundary and saves to
Temporary
.
WorldPop Population Preparation
Fetches required demographic rasters (total population, female, children under 5, elderly, etc.).
Ensures rasters exist locally for alignment with flood data.
Facility Data Preparation
Downloads or loads facilities geometries from the chosen API (
ohsome-api
,overpass
).Converts geometries to points and reprojects to match raster CRS.
Flood Exposure Calculation per RP
Flooded population: Multiplies population raster by flood mask (flood depth ≥ threshold) to compute the number of affected people per ADM polygon.
Flooded facilities: Samples flood raster at facility locations to compute flooded count and percentage per ADM polygon.
Aggregation and CSV Output
Merges RP results across population and facilities into a single CSV.
Fills missing values with 0 and rounds numbers to integers.
Final CSV saved to
data/{country_code}/Output/{country_code}_{admin_level}_flood_exposure.csv
.
Outputs#
File:
{country_code}_{admin_level}_flood_exposure.csv
Columns:
{admin_level}_PCODE
Flooded population indicators per RP
Flooded facilities indicators per RP
CRS: WGS84 (
EPSG:4326
)Units: people, %, count
Python Script (Flood Exposure)
import os
import re
import argparse
import requests
import geopandas as gpd
import rasterio
from rasterio.merge import merge
from rasterio.mask import mask
from rasterstats import zonal_stats
from rasterio.enums import Resampling
import pandas as pd
import rioxarray
import yaml
from shapely.geometry import mapping
from pathlib import Path
from scripts.fetch_worldpop import fetch_worldpop
from scripts.fetch_facilities_ohsome_overpass import fetch_overpass, fetch_ohsome
ASSET_CONFIG_YAML_PATH = os.path.join(os.getcwd(), "configs", "assets_config.yaml")
with open(ASSET_CONFIG_YAML_PATH) as _fp:
_asset_config = yaml.safe_load(_fp)
BASE_URL_TEMPLATE = "https://jeodpp.jrc.ec.europa.eu/ftp/jrc-opendata/CEMS-GLOFAS/flood_hazard/{rp}/"
ALLOWED_RPS = _asset_config.get("setup", {}).get("rps", [])
try:
FLOOD_THRESHOLD = float(_asset_config["setup"]["flood_threshold"])
except KeyError:
raise KeyError("Missing 'setup.flood_threshold' in assets_config.yaml")
THRESH_SUFFIX = f"{int(FLOOD_THRESHOLD*100)}cm"
def parse_listing(rp):
url = BASE_URL_TEMPLATE.format(rp=f"RP{rp}")
r = requests.get(url)
r.raise_for_status()
return re.findall(r'href="([^"]+_RP{}_depth\.tif)"'.format(rp), r.text)
def tile_bounds_from_filename(fname):
m = re.search(r'_(N|S)(\d+)_([EW])(\d+)_RP', fname)
if not m:
return None
lat_sign = 1 if m.group(1) == "N" else -1
lat = int(m.group(2)) * lat_sign
lon_sign = 1 if m.group(3) == "E" else -1
lon = int(m.group(4)) * lon_sign
xmin = lon
xmax = lon + 10
ymin = lat - 10
ymax = lat
return (xmin, ymin, xmax, ymax)
def bbox_intersects(tile_bbox, geom_bbox):
txmin, tymin, txmax, tymax = tile_bbox
gxmin, gymin, gxmax, gymax = geom_bbox
return not (txmax <= gxmin or txmin >= gxmax or tymax <= gymin or tymin >= gymax)
def download_file(context, fname, temporary_dir, rp):
url = BASE_URL_TEMPLATE.format(rp=f"RP{rp}") + fname
outpath = os.path.join(temporary_dir, fname)
if os.path.exists(outpath):
context.info(f"Already exists: {fname}")
return outpath
context.info(f"Downloading {fname}...")
r = requests.get(url, stream=True)
if r.status_code == 200:
with open(outpath, "wb") as f:
for chunk in r.iter_content(1024 * 64):
f.write(chunk)
return outpath
else:
context.warning(f"Failed to download {fname}")
return None
def process_country_rp(context, country_code, rp, admin_level="ADM0"):
temporary_dir = f"data/{country_code}/Temporary"
output_dir = f"data/{country_code}/Output"
os.makedirs(temporary_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
clipped_path = os.path.join(temporary_dir, f"{country_code}_flooded_RP{rp}.tif")
if os.path.exists(clipped_path):
context.info(f"Clipped raster already exists: {clipped_path}, skipping ...")
return clipped_path
boundary_file = os.path.join("data", country_code, f"{country_code}_{admin_level}.geojson")
if not os.path.exists(boundary_file):
raise FileNotFoundError(f"Boundary file not found: {boundary_file}")
gdf = gpd.read_file(boundary_file)
gxmin, gymin, gxmax, gymax = gdf.total_bounds
geom_bbox = (gxmin, gymin, gxmax, gymax)
context.info(f"Boundary BBOX for {country_code}: {geom_bbox}")
files = parse_listing(rp)
context.info(f"Found {len(files)} tiles on server for RP{rp}.")
selected = [f for f in files if (tb := tile_bounds_from_filename(f)) and bbox_intersects(tb, geom_bbox)]
context.info(f"Selected {len(selected)} tiles to download.")
tile_paths = [download_file(context, f, temporary_dir, rp) for f in selected]
tile_paths = [p for p in tile_paths if p]
if not tile_paths:
context.warning(f"No tiles downloaded for RP{rp}, skipping.")
return None
context.info("Merging tiles...")
srcs = [rasterio.open(p) for p in tile_paths]
mosaic, out_trans = merge(srcs)
out_meta = srcs[0].meta.copy()
out_meta.update({
"driver": "GTiff",
"height": mosaic.shape[1],
"width": mosaic.shape[2],
"transform": out_trans,
"compress": "lzw"
})
context.info("Clipping merged raster to boundary...")
if gdf.crs != srcs[0].crs:
gdf = gdf.to_crs(srcs[0].crs)
geoms = [mapping(geom) for geom in gdf.geometry]
with rasterio.io.MemoryFile() as memfile:
with memfile.open(**out_meta) as dataset:
dataset.write(mosaic)
out_image, out_transform = mask(dataset, geoms, crop=True)
out_meta.update({
"height": out_image.shape[1],
"width": out_image.shape[2],
"transform": out_transform
})
with rasterio.open(clipped_path, "w", **out_meta) as dest:
dest.write(out_image)
context.info(f"Clipped raster saved to {clipped_path}")
for src in srcs:
src.close()
return clipped_path
def process_flood_impact(context, country_code, rps, gdf, admin_level, output_dir):
"""
Process flooded population for all RPs of a given country/admin_level.
Generates a single CSV with columns for each RP, indicator, threshold,
and percentage of facilities flooded.
"""
country_code = country_code.upper()
output_dir = Path(output_dir)
out_csv = output_dir / f"{country_code}_{admin_level}_flood_exposure.csv"
temp_dir = Path("data") / country_code / "Temporary"
# Load existing CSV if present, to append missing RPs
if out_csv.exists():
context.info(f"CSV exists: {out_csv}, will append missing RPs")
final_df = pd.read_csv(out_csv)
else:
final_df = pd.DataFrame({f"{admin_level}_PCODE": gdf[f"{admin_level}_PCODE"]})
# Ensure WorldPop files exist
context.info(f"Ensuring demographic rasters exist in {temp_dir}...")
indicator_tifs = fetch_worldpop(country_code)
indicators = ["female_pop", "children_u5", "female_u5", "elderly", "pop_u15", "female_u15"]
tif_map = dict(zip(indicators, indicator_tifs))
# Ensure facilities exist
context.info(f"Ensuring facility raw geometries exist in {temp_dir}...")
base_path = Path("data") / country_code
boundary_path = base_path / f"{country_code}_{admin_level}.geojson"
api_choice = _asset_config.get("facilities_asset", {}).get("api", "").lower()
if api_choice == "ohsome-api":
summary_path = fetch_ohsome(
context, boundary_path, base_path, country_code, admin_level
)
elif api_choice == "overpass":
summary_path = fetch_overpass(
context, boundary_path, base_path, country_code, admin_level
)
elif api_choice == "ohsome-parquet":
context.info("Not implemented yet: ohsome-parquet")
return None
else:
context.warning(
f"No valid API configured for facilities_asset (got '{api_choice}')"
)
return None
geojsons_map = {}
facility_categories = ["education", "hospitals", "primary_healthcare"]
for category in facility_categories:
if category not in geojsons_map:
geojsons_map[category] = base_path / f"Temporary/{country_code}_{category}_raw.geojson"
for rp in rps:
context.info(f"Processing RP{rp}...")
# Skip RP if all expected columns already exist
expected_cols = [
f"RP{rp}_{label}_{suffix}" for label in indicators for suffix in THRESH_SUFFIX
] + [
f"RP{rp}_{cat}_{suffix}_pct" for cat in facility_categories for suffix in THRESH_SUFFIX
] + [
f"RP{rp}_{cat}_{suffix}_count" for cat in facility_categories for suffix in THRESH_SUFFIX
]
if all(col in final_df.columns for col in expected_cols):
context.info(f"RP{rp} already processed, skipping...")
continue
# Flood raster clipped to country
clipped_path = process_country_rp(context, country_code, rp, admin_level)
if not clipped_path:
context.warning(f"No flood raster for RP{rp}, skipping...")
continue
flood = rioxarray.open_rasterio(clipped_path, masked=True).squeeze()
rp_df = pd.DataFrame({f"{admin_level}_PCODE": gdf[f"{admin_level}_PCODE"]})
# ---- Flooded population ----
for label in indicators:
pop_raster_path = tif_map[label]
pop_raster = rioxarray.open_rasterio(pop_raster_path, masked=True).squeeze()
flood_aligned = flood.rio.reproject_match(pop_raster, resampling=Resampling.bilinear)
flood_mask = (flood_aligned > FLOOD_THRESHOLD).astype("float32")
flooded_pop = pop_raster * flood_mask
tmp_flooded = temp_dir / f"tmp_flooded_{label}_RP{rp}_{THRESH_SUFFIX}.tif"
with rasterio.open(pop_raster_path) as src:
meta = src.meta.copy()
meta.update(compress="lzw", tiled=True,
bigtiff="yes" if src.width*src.height > 2**32 else "no")
with rasterio.open(tmp_flooded, "w", **meta) as dst:
dst.write(flooded_pop.values, 1)
stats = zonal_stats(gdf, tmp_flooded, stats="sum", nodata=0)
rp_df[f"RP{rp}_{label}_{THRESH_SUFFIX}"] = [s["sum"] if s["sum"] is not None else 0 for s in stats]
context.info(f"Processed flooded population for {label} >{FLOOD_THRESHOLD} m ({THRESH_SUFFIX})")
# ---- Flooded facilities ----
for category, filepath in geojsons_map.items():
if not Path(filepath).exists():
rp_df[f"RP{rp}_{category}_{THRESH_SUFFIX}_pct"] = 0
rp_df[f"RP{rp}_{category}_{THRESH_SUFFIX}_count"] = 0
continue
facilities = gpd.read_file(filepath)
if facilities.empty:
rp_df[f"RP{rp}_{category}_{THRESH_SUFFIX}_pct"] = 0
rp_df[f"RP{rp}_{category}_{THRESH_SUFFIX}_count"] = 0
continue
if not all(facilities.geometry.type == "Point"):
if facilities.crs != flood.rio.crs:
facilities = facilities.to_crs(flood.rio.crs)
facilities["geometry"] = facilities.geometry.centroid
if facilities.crs != flood.rio.crs:
facilities = facilities.to_crs(flood.rio.crs)
coords = [(x, y) for x, y in zip(facilities.geometry.x, facilities.geometry.y)]
with rasterio.open(clipped_path) as src:
values = [v[0] for v in src.sample(coords)]
facilities["flooded"] = [1 if v > FLOOD_THRESHOLD else 0 for v in values]
joined = gpd.sjoin(
facilities, gdf[[f"{admin_level}_PCODE", "geometry"]],
how="inner", predicate="within"
)
grouped = joined.groupby(f"{admin_level}_PCODE")["flooded"].agg(["mean", "sum"]).reset_index()
grouped[f"RP{rp}_{category}_{THRESH_SUFFIX}_pct"] = (grouped["mean"] * 100).round(1)
grouped[f"RP{rp}_{category}_{THRESH_SUFFIX}_count"] = grouped["sum"].astype(int)
grouped = grouped.drop(columns=["mean", "sum"])
rp_df = rp_df.merge(grouped, on=f"{admin_level}_PCODE", how="left")
rp_df[f"RP{rp}_{category}_{THRESH_SUFFIX}_pct"] = rp_df[f"RP{rp}_{category}_{THRESH_SUFFIX}_pct"].fillna(0)
rp_df[f"RP{rp}_{category}_{THRESH_SUFFIX}_count"] = rp_df[f"RP{rp}_{category}_{THRESH_SUFFIX}_count"].fillna(0).astype(int)
context.info(f"Processed flooded facilities for {category} >{FLOOD_THRESHOLD} m ({THRESH_SUFFIX})")
final_df = final_df.merge(rp_df, on=f"{admin_level}_PCODE", how="left")
context.info(f"Processed RP{rp}")
numeric_cols = final_df.select_dtypes(include=["float", "int"]).columns
final_df[numeric_cols] = final_df[numeric_cols].fillna(0).round(0).astype(int)
output_dir.mkdir(parents=True, exist_ok=True)
final_df.to_csv(out_csv, index=False)
context.info(f"Flooded population & facilities CSV written to {out_csv}")
return str(out_csv)