End-to-end: ingest → spatial join → query

This tutorial shows the smallest end-to-end workflow:

  1. Ingest raw events into motac’s canonical events table.

  2. Assign each event to a regular grid cell (a “spatial join”).

  3. Query the resulting table (filter + aggregate).

The goal is to make the data contract concrete: once your events are in the canonical schema and have a cell_id, you can feed them into downstream models.

1) Raw input: newline-delimited JSON (JSONL)

A raw stream is a JSON object per line.

Required keys:

  • t (YYYY-MM-DD)

  • lat, lon

Optional keys:

  • event_id, value (defaults to 1), mark, meta (mapping), cell_id

from __future__ import annotations

import json
from pathlib import Path

raw_path = Path("events.jsonl")
rows = [
    {"t": "2020-01-02", "lat": 51.50, "lon": -0.10, "value": 2, "event_id": "a"},
    {"t": "2020-01-03", "lat": 51.52, "lon": -0.12, "mark": "x", "meta": {"src": "demo"}},
]
raw_path.write_text("\n".join(json.dumps(r) for r in rows) + "\n", encoding="utf-8")

2) Ingest to the canonical Arrow table (and optionally write Parquet)

import pyarrow as pa

from motac.ingestion import (
    ingest_jsonl_to_canonical_table,
    validate_canonical_events_table,
    write_canonical_events_parquet,
)

tbl = ingest_jsonl_to_canonical_table(raw_path)
validate_canonical_events_table(tbl)
assert isinstance(tbl, pa.Table)

# Optional: persist to Parquet (recommended for larger datasets)
write_canonical_events_parquet(tbl, "events.parquet")

At this point you have a strict, predictable schema (see motac.ingestion.validate_canonical_events_table).

3) Build a regular grid and assign cell_id

You can build a tiny regular grid from lon/lat bounds, then map each event to a cell_id.

import numpy as np
import pyarrow as pa

from motac.spatial.grid_builder import LonLatBounds, build_regular_grid
from motac.spatial.lookup import GridCellLookup

bounds = LonLatBounds(
    lon_min=-0.20,
    lon_max=0.05,
    lat_min=51.45,
    lat_max=51.60,
)

grid = build_regular_grid(bounds=bounds, cell_size_m=2_000.0)
lookup = GridCellLookup.from_grid(grid)

lon = np.asarray(tbl["lon"].to_numpy(zero_copy_only=False), dtype=float)
lat = np.asarray(tbl["lat"].to_numpy(zero_copy_only=False), dtype=float)
cell_id = lookup.lonlat_to_cell_id(lon=lon, lat=lat)

# Convention: -1 means outside the grid; we convert it to nulls in Arrow.
cell_arrow = pa.array([None if int(x) < 0 else int(x) for x in cell_id], type=pa.int32())

# Add/replace the column.
if "cell_id" in tbl.column_names:
    tbl = tbl.drop(["cell_id"]).append_column("cell_id", cell_arrow)
else:
    tbl = tbl.append_column("cell_id", cell_arrow)

# Keep canonical ordering: required cols, then optional in stable order.
cols = ["t", "lat", "lon", "value", "event_id", "cell_id", "mark", "meta_json"]
tbl = tbl.select([c for c in cols if c in tbl.column_names])

If you already have a substrate cache directory containing grid.npz, you can also map a point from the CLI:

motac spatial cell-id --grid path/to/cache_dir --lon -0.10 --lat 51.50

4) Query: filter and aggregate

Here’s a minimal example using Arrow compute:

import pyarrow.compute as pc

# Keep only rows that landed inside the grid.
mask_inside = pc.is_valid(tbl["cell_id"])
tbl_inside = tbl.filter(mask_inside)

# Example query: count events per day.
# (We treat `value` as the event weight.)
by_day = tbl_inside.group_by("t").aggregate([("value", "sum")])
print(by_day)

Next steps:

  • write the joined table back to Parquet,

  • build POI features on the same grid (see the POI tutorial),

  • fit a model using the canonical dataset interface.