Extending the Pipeline¶
This page explains how to add a new processing step to City2TABULA. The pipeline is designed so that adding a new step — whether it enriches buildings with external data, computes a new derived attribute, or links to a third-party database — follows the same repeatable pattern every time.
What you provide: one SQL script and six small Go changes.
What the framework provides for free: parallel execution across CPU cores, deadlock retry, per-batch parameter substitution, idempotent re-runs, and CLI flag integration.
The pattern¶
Every pipeline step maps to the same structure:
flowchart LR
subgraph "You write"
SQL["SQL script\nsql/scripts/{type}/{name}.sql"]
GO["Go wiring\n6 small changes"]
end
subgraph "Framework provides"
BATCH["Spatial or ID-based\nbatching"]
QUEUE["Job queue\n+ worker pool"]
RETRY["Retry &<br>deadlock handling"]
PARAM["SQL parameter\nsubstitution"]
end
subgraph "Output"
DB[("PostgreSQL\ntable or update")]
end
SQL --> QUEUE
GO --> QUEUE
BATCH --> QUEUE
QUEUE --> RETRY
RETRY --> PARAM
PARAM --> DB
Step-by-step checklist¶
The six Go changes are always in the same six files. Once you know the pattern, adding a new step takes under an hour.
1. Write the SQL script¶
Create a file in sql/scripts/{type}/{name}.sql. The directory name ({type}) groups related scripts — use main for extraction steps, link for external data linking.
Use {placeholders} for any value that changes per run:
| Placeholder | Resolved to |
|---|---|
{city2tabula_schema} |
city2tabula |
{lod_schema} |
lod2 or lod3 |
{srid} |
Native CRS of the 3D data (e.g. 25832) |
{building_ids} |
(1, 2, 3, ...) — the current batch |
{country} |
Country name from config |
{pylovo_schema} |
PyLovo schema (e.g. public) |
Scripts within a directory are sorted alphabetically and executed in order. Use a numeric prefix (01_, 02_) to control that order.
2. Register the script directory¶
In internal/config/sql.go, add a directory constant and a field to SQLScripts:
// Add constant
SQLMyNewScriptDir = SQLScriptDir + "my-new-type" + string(os.PathSeparator)
// Add field to SQLScripts struct
MyNewScripts []string
// Load in LoadSQLScripts()
myNewScripts, err := loadSQLFilesFromDir(SQLMyNewScriptDir)
// ...add to return struct
3. Add a JobType and queue builder¶
In internal/process/orchestrator.go, add a constant and a queue builder function:
// Add JobType constant
MyNewStep JobType = "my_new_step"
// Add to the switch in createJob()
case MyNewStep:
prefix, lodLevel = "MY_NEW_STEP", 2 // set lodLevel=-1 if no LOD context needed
// Add queue builder function
func MyNewStepJobQueue(config *config.Config, batches [][]int64) (*JobQueue, error) {
scripts, queue, err := loadScriptsAndQueue(config)
if err != nil {
return nil, err
}
for _, batch := range batches {
queue.Enqueue(createJob(batch, scripts.MyNewScripts, MyNewStep))
}
return queue, nil
}
4. Add a Run function¶
In internal/process/feature_extraction.go, add the entry-point function. Choose the right batching strategy:
- ID-based batching — use
CreateBatches(ids, cfg.Batch.Size)when batch order doesn't matter - Spatial grid batching — use
getGridBatches(...)when buildings in a batch must be geographically co-located (e.g. for spatial joins against external datasets)
func RunMyNewStep(cfg *config.Config, pool *pgxpool.Pool) error {
ids, err := getBuildingIDsFromCityDB(pool, cfg.DB.Schemas.Lod2)
// ... apply BuildingLimit, create batches, build queue, run
return RunJobQueue(jobQueue, pool, cfg)
}
5. Add a CLI flag¶
In internal/flags/flags.go:
// Add to Flags struct
MyNewStep bool
// Register in ParseFlags()
flag.BoolVar(&f.MyNewStep, "my-new-step", false, "What this step does")
// Add message type and messages
type MyNewStepMsg Msg
MyNewStepMessages = MyNewStepMsg{
Progress: "Running my new step...",
Success: "My new step completed",
Error: "My new step failed",
}
6. Wire the flag in main¶
In cmd/main.go, add one block:
if f.MyNewStep {
utils.Info.Println(flagMessages.MyNewStep.Progress)
if err := process.RunMyNewStep(&config, pool); err != nil {
utils.Error.Fatalf(flagMessages.MyNewStep.Error+": %v", err)
}
utils.Info.Println(flagMessages.MyNewStep.Success)
}
Adding a new SQL parameter¶
If your script needs a value not already in SQLParameters, add it in two places:
// internal/config/sql.go — add to SQLParameters struct
MyNewParam string `param:"my_new_param"`
// Same file — populate in GetSQLParameters()
MyNewParam: c.DB.Schemas.MyNew, // or wherever the value comes from
Then add {my_new_param} in your SQL script. The substitution happens automatically.
Adding config from environment
If your new parameter comes from an environment variable, add the GetEnv call in the appropriate load*Config() function in internal/config/. Document the variable in .env.example.
What the framework handles automatically¶
You never need to implement these — they apply to every step by default:
| Concern | How it's handled |
|---|---|
| Parallel execution | RunJobQueue distributes batches across a worker pool (default: CPU count, configurable via THREAD_COUNT) |
| Deadlock retry | Runner retries up to 5 times with jitter on PostgreSQL deadlock errors |
| General error retry | Runner retries up to 3 times with exponential backoff |
| Idempotency | Your SQL script handles this — typically a DELETE ... WHERE building_id IN {building_ids} before the INSERT, or an ON CONFLICT DO UPDATE |
| Parameter substitution | All {placeholder} tokens in your script are substituted from config.SQLParameters before execution |
| Building limit | Respect cfg.Batch.BuildingLimit in your Run* function to cap processing during development |
Worked example: PyLovo link (-link-pylovo)¶
The PyLovo link step was added following this exact pattern.
| Nr. | Step | File | Change |
|---|---|---|---|
| 1 | SQL script | sql/scripts/link/pylovo/01_build_pylovo_link.sql |
Spatial join of lod2_building footprints against pylovo.res / oth using {pylovo_schema} and {srid}. A batch_bbox CTE pre-filters PyLovo before the IoU join, reducing runtime from 4 min to 1.56 s for 1,000 buildings. |
| 2 | Config | internal/config/sql.go |
Added SQLPylvoLinkScriptDir constant and PyLovoLinkScripts []string field. |
| 3 | Orchestrator | internal/process/orchestrator.go |
Added PyLovoLink JobType and PyLovoLinkJobQueue() queue builder. |
| 4 | Run function | internal/process/feature_extraction.go |
Added RunPyLovoLinkBuild() with spatial grid batching (getGridBatches). IoU join requires buildings to be geographically co-located so the bounding box pre-filter stays tight. |
| 5 | Flag | internal/flags/flags.go |
Added -link-pylovo, named after the data source. A future OGR2OGR source gets its own -link-ogr2ogr flag and subdirectory with no changes to existing code. |
| 6 | Main | cmd/main.go |
Added if f.LinkPylovo block. |