Corporate Edits Analysis#

In this notebook we demonstrate how you inspect the spatial extent of corporate mapping in OSM. We are going to make use of the Changeset attributes we have added to ohsome-data-insights.

These are the steps you see further down:

  • Set the connection parameters.

  • Prepare your input parameters, e.g. define area of interest and changeset hashtag filters.

  • Download data using PyIceberg and DuckDB.

  • Filter and process data with DuckDB.

  • Visualize the results on a map.

Getting started#

Set connection params.

from dotenv import load_dotenv
load_dotenv()
True
import os

s3_user = os.environ["S3_ACCESS_KEY_ID"]  # add your user here
s3_password = os.environ["S3_SECRET_ACCESS_KEY"]  # add your password here

Configure DuckDB.

!pip install duckdb==1.0.0
import duckdb

con = duckdb.connect(
    config={
        'threads': 32,
        'max_memory': '100GB',
    }
)
con.install_extension("spatial")
con.load_extension("spatial")

Set the connection params to Iceberg Rest Catalog.

!pip install "pyiceberg[s3fs,duckdb,sql-sqlite,pyarrow]"
from pyiceberg.catalog.rest import RestCatalog

catalog = RestCatalog(
    name="default",
    **{
        "uri": "https://sotm2024.iceberg.ohsome.org",
        "s3.endpoint": "https://sotm2024.minio.heigit.org",
        "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
        "s3.access-key-id": s3_user,
        "s3.secret-access-key": s3_password,
        "s3.region": "eu-central-1"
    }
)

Prepare the input parameters for your analysis#

# Set iceberg table
namespace = 'geo_sort'
tablename = 'contributions'
icebergtable = catalog.load_table((namespace, tablename))


# Define time filter (optional)
min_timestamp = '2019-06-01T00:00:00'
max_timestamp = '2024-06-01T00:00:00'


# Define location filter
bboxes = {
    'colombia': (-78.9909352282, -4.29818694419, -66.8763258531, 12.4373031682),    
    'indonesia':  (95.2930261576, -10.3599874813, 141.03385176, 5.47982086834),
    'united_arab_emirates': (51.5795186705, 22.4969475367, 56.3968473651, 26.055464179),
    'south_america': (-93.691406,-58.263287,-22.675781,14.859850),
    'africa': (-24.609375,-39.095963,56.074219,36.173357)
}

selected_region = 'south_america'
xmin, ymin, xmax, ymax = bboxes[selected_region]


# Define hashtag filter
corporate_changeset_hashtags = {
    "amap": 1,
    "adt": 2,
    "bolt": 3,
    "DigitalEgypt": 4,
    "expedia": 5,
    "gojek": 6,
    "MSFTOpenMaps": 7,
    "grab": 8,
    "Kaart": 9,
    "Kontur": 10,
    "mbx": 11,
    "RocketData": 12,
    "disputed_by_claimed_by": 13,
    "Snapp": 14,
    "stackbox": 15,
    "Telenav": 16,
    "Lightcyphers": 17,
    "tomtom": 18,
    "TIDBO": 19,
    "WIGeoGIS-OMV": 20,
    "نشان": 21,
    "mapbox": 22,
    "Komoot": 23,
    "AppLogica": 24
}


# Define output h3 cell resolution
h3_cell_resolution = 3

Get the Data#

First, we do an iceberg table scan with a pre-filter. This is a fast way to download all potential OSM elements that are needed for our analysis.

Here we use one of the Changeset attributes we have joined in advance for each OSM contribution. We use the changeset.hashtags attribute to inspect all the OSM elements with that have been added by corporate mappers.

import time
start_time = time.time()

icebergtable.scan(
    row_filter=(
        f"(status = 'latest' or status = 'history') "
        f"and (xmax >= {xmin} and xmin <= {xmax}) "
        f"and (ymax >= {ymin} and ymin <= {ymax}) "
        f"and valid_from >= '{min_timestamp}' "
        f"and valid_from < '{max_timestamp}' "
    ),
    selected_fields=(
        "user_id",
        "valid_from",
        "changeset",
        "h3_r5",
        "centroid"
    )
).to_duckdb('raw_osm_data',connection=con)

download_time = round(time.time() - start_time, 3)
print(f"download took {download_time} sec.")
download took 119.03 sec.

Filter for the coporate hashtags and assign company name to each contribution.

import time
start_time = time.time()

query = f"""
INSTALL h3 FROM community;
LOAD h3;

DROP TABLE IF EXISTS osm_data;
CREATE TABLE osm_data AS
(
SELECT
    a.user_id,
    a.valid_from,
    h3_cell_to_parent(a.h3_r5, {h3_cell_resolution}) as h3_cell,
    h3_cell_to_boundary_wkt(h3_cell) as geometry,
    CAST(changeset['hashtags'][1] AS VARCHAR) as hashtag_string,
    CASE
		WHEN hashtag_string ILIKE '%amap%' THEN 'amap'
		WHEN hashtag_string ILIKE '%adt%' THEN 'adt'
		WHEN hashtag_string ILIKE '%bolt%' THEN 'bolt'
		WHEN hashtag_string ILIKE '%DigitalEgypt%' THEN 'DigitalEgypt'
		WHEN hashtag_string ILIKE '%expedia%' THEN 'expedia'
		WHEN hashtag_string ILIKE '%gojek%' THEN 'gojek'
		WHEN hashtag_string ILIKE '%MSFTOpenMaps%' THEN 'MSFTOpenMaps'
		WHEN hashtag_string ILIKE '%grab%' THEN 'grab'
		WHEN hashtag_string ILIKE '%Kaart%' THEN 'Kaart'
		WHEN hashtag_string ILIKE '%Kontur%' THEN 'Kontur'
		WHEN hashtag_string ILIKE '%mbx%' THEN 'mbx'
		WHEN hashtag_string ILIKE '%RocketData%' THEN 'RocketData'
		WHEN hashtag_string ILIKE '%disputed_by_claimed_by%' THEN 'disputed_by_claimed_by'
		WHEN hashtag_string ILIKE '%Snapp%' THEN 'Snapp'
		WHEN hashtag_string ILIKE '%stackbox%' THEN 'stackbox'
		WHEN hashtag_string ILIKE '%Telenav%' THEN 'Telenav'
		WHEN hashtag_string ILIKE '%Lightcyphers%' THEN 'Lightcyphers'
		WHEN hashtag_string ILIKE '%tomtom%' THEN 'tomtom'
		WHEN hashtag_string ILIKE '%TIDBO%' THEN 'TIDBO'
		WHEN hashtag_string ILIKE '%WIGeoGIS-OMV%' THEN 'WIGeoGIS-OMV'
		WHEN hashtag_string ILIKE '%نشان%' THEN 'Neshan'
		WHEN hashtag_string ILIKE '%mapbox%' THEN 'mapbox'
		WHEN hashtag_string ILIKE '%Komoot%' THEN 'Komoot'
		WHEN hashtag_string ILIKE '%AppLogica%' THEN 'AppLogica'
		ELSE 'nc'
	END AS corporation
FROM
    raw_osm_data as a,
WHERE 1=1
    and (centroid.x >= {xmin} and centroid.x <= {xmax})
    and (centroid.y >= {ymin} and centroid.y <= {ymax})
)
;
"""
con.sql(query)

processing_time = round(time.time() - start_time, 3)
print(f"processing took {processing_time} sec.")
processing took 23.118 sec.

Display Heatmap of corporate edits#

import geopandas as gpd

map_query = """
    SELECT
        epoch_ms(date_trunc('month', valid_from)) as month,
        corporation,
        h3_cell,
        geometry,
        count(*) as n_edits,
        count(distinct user_id) as n_users
    FROM osm_data
    GROUP by month, corporation, h3_cell, geometry;
"""

df = con.sql(map_query).df()

df["corporation_value"] = df["corporation"].map(corporate_changeset_hashtags)
df.fillna({"corporation_value": 0}, inplace=True)

# convert the data to geodata
gdf = gpd.GeoDataFrame(
    df,
    geometry=gpd.GeoSeries.from_wkt(df['geometry'])
).set_crs('epsg:4326')

Define map parameters and style.

import numpy as np
import datetime
import lonboard
from palettable.colorbrewer.sequential import Blues_9


# compute lonboard color style for contious color map
min_valid_from = 1000 * datetime.datetime(2019,6,1).timestamp()
max_valid_from = 1000 * datetime.datetime(2024,6,1).timestamp()

min_value = 0
max_value = gdf["n_users"].max()

# normalized color values from 0 to 1
user_activity_style = gdf["n_edits"].apply(
    lambda x: (x - min_value) / (max_value - min_value))

filter_values =  np.column_stack(
    [gdf["month"], gdf["corporation_value"]]
)


initial_filter_range = [
    [min_valid_from, max_valid_from],
    [0, 0]
]


gdf["height"] = gdf["n_edits"] 
heights = gdf["height"].to_numpy()
heights = np.nan_to_num(heights, nan=1)

# the lonboard map definition
layer = lonboard.PolygonLayer.from_geopandas(
    gdf,
    extensions=[lonboard.layer_extension.DataFilterExtension(filter_size=2)],
    extruded=True,
    get_elevation=heights,
    get_filter_value=filter_values,  # replace with desired column
    filter_range=initial_filter_range,  # replace with desired filter range
    get_fill_color=lonboard.colormap.apply_continuous_cmap(user_activity_style, Blues_9, alpha=.75),
)

view_state = {
    "longitude": xmin + ((xmax - xmin) / 2),
    "latitude": ymin + ((ymax - ymin) / 2),
    "zoom": 5,
    "pitch": 25
}

currentness_map = lonboard.Map(
    basemap_style=lonboard.basemap.CartoBasemap.Positron,
    layers=[layer],
    view_state=view_state
)

Define Dates Slider Selection Widget and link to map filter range.

from datetime import date, timedelta
import ipywidgets
from traitlets import directional_link

start = datetime.datetime(2019,6,1)
end = datetime.datetime(2024,7,1)
delta = end - start   # returns timedelta
dates = [start + timedelta(days=i) for i in range(delta.days + 1)]
date_options = [(i.strftime('%d-%b-%Y'), int(1000* i.timestamp())) for i in dates]

date_slider = ipywidgets.SelectionRangeSlider(
    options=date_options,
    index=(0, len(dates)-1),
    description='Last Edit:',
    layout=ipywidgets.Layout(width='1000px'),
    disabled=False
)

group_options= [('non-corporate', 0)]
group_options += [(name, value) for name, value in corporate_changeset_hashtags.items() ]

corporate_slider_range = ipywidgets.IntRangeSlider(
    options=group_options,
    index=(0, len(group_options)-1),
    description='Corporate:',
    layout=ipywidgets.Layout(width='1000px'),
    disabled=False
)

corporate_slider = ipywidgets.SelectionSlider(
    options=group_options,
    index=len(group_options)-1,
    description='Group:',
    layout=ipywidgets.Layout(width='1000px'),
    disabled=False,
    min=0,
    max=25,
)

corporate_slider = ipywidgets.ToggleButtons(
    options=group_options,
    description='Group:',
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
)


directional_link(
    (corporate_slider, 'value'),
    (corporate_slider_range, "value"),
    transform=lambda v: (v, v)
)

multi_slider = lonboard.controls.MultiRangeSlider([date_slider, corporate_slider_range])

directional_link(
    (multi_slider, 'value'),
    (layer, "filter_range"),
)
<traitlets.traitlets.directional_link at 0x7864804edd00>

Display the map. Have fun exploring and moving around the time slider! Click on the buttons to switch between groups.

display(currentness_map, date_slider, corporate_slider)