Job Queue & Worker Pipeline¶
This page explains how City2TABULA hands off work from a queue to a pool of parallel workers. If you're new to Go channels, start here.
The problem it solves¶
Feature extraction runs the same set of SQL scripts for potentially thousands of building batches. We want to run many batches at the same time (one per CPU core) without them stepping on each other. The solution is a producer/consumer pipeline:
- One goroutine fills a queue with jobs (producer).
- Many worker goroutines pull jobs from a shared channel and execute them (consumers).
What is a Go channel?¶
Think of a channel as a thread-safe conveyor belt. You put things in one end, workers pick them up from the other end. Once the belt is empty and the sender signals it's done (by closing the channel), workers know there's nothing left to wait for.
ch := make(chan *Job, 10) // buffered channel — holds up to 10 items without blocking
ch <- job // put a job on the belt
j := <-ch // pick a job off the belt
close(ch) // signal: no more jobs coming
A for job := range ch loop in a worker will automatically stop when the channel is closed and drained — no manual "are we done?" check needed.
JobQueue.ToChannel()¶
ToChannel() is the bridge between the queue and the worker pool. It drains every job from the queue into a buffered channel, closes it, and returns it.
// internal/process/queue.go
func (q *JobQueue) ToChannel() <-chan *Job {
ch := make(chan *Job, q.Len()) // size the buffer to hold all jobs upfront
for !q.IsEmpty() {
if j := q.Dequeue(); j != nil {
ch <- j
}
}
close(ch) // workers will stop ranging once this is drained
return ch
}
Why buffer the whole queue? Buffering all jobs upfront means the producer never blocks — it fills the channel in one shot and returns. Workers then race to pick up jobs without any coordination from the caller.
Why close the channel here? Closing signals to all workers that there are no more jobs coming. Without it, workers would block forever waiting for the next item.
How the worker pool uses it¶
RunJobQueue is the main entry point. It calls ToChannel(), spins up one goroutine per configured thread, and waits for all of them to finish.
// internal/process/worker.go
func RunJobQueue(queue *JobQueue, conn *pgxpool.Pool, cfg *config.Config) error {
jobChan := queue.ToChannel() // drain queue → channel (closed and ready)
var wg sync.WaitGroup
for i := 1; i <= cfg.Batch.Threads; i++ {
wg.Add(1)
go NewWorker(i).Start(jobChan, conn, &wg, cfg) // each worker ranges over jobChan
}
wg.Wait() // block until every worker is done
return nil
}
Each worker just does:
for job := range jobChan { // stops automatically when channel is closed + empty
runner.RunJob(job, conn, workerID)
}
Flow diagram¶
flowchart TD
A[Build JobQueue\norchestrator.go] -->|Enqueue jobs| B[JobQueue]
B -->|ToChannel| C[Buffered Channel\nclosed immediately]
C --> W1[Worker 1]
C --> W2[Worker 2]
C --> W3[Worker N]
W1 -->|RunJob| R1[Runner → SQL scripts]
W2 -->|RunJob| R2[Runner → SQL scripts]
W3 -->|RunJob| R3[Runner → SQL scripts]
R1 & R2 & R3 --> DB[(PostGIS DB)]
subgraph Worker Pool
W1
W2
W3
end
Sequence: one job's journey¶
sequenceDiagram
participant O as Orchestrator
participant Q as JobQueue
participant C as Channel
participant W as Worker
participant R as Runner
participant DB as PostGIS
O->>Q: Enqueue(job)
O->>C: ToChannel() — drains Q, closes C
W->>C: range jobChan — picks up job
W->>R: RunJob(job)
loop For each Task in Job
R->>DB: ExecuteSQLScript()
DB-->>R: ok / error
R-->>R: retry if deadlock
end
R-->>W: done
W->>C: range continues (or exits if channel empty + closed)
Where to look in the code¶
| File | What it does |
|---|---|
internal/process/queue.go |
JobQueue struct, ToChannel() |
internal/process/worker.go |
RunJobQueue(), Worker.Start() |
internal/process/runner.go |
RunJob(), retry logic |
internal/process/orchestrator.go |
Queue builder functions (one per pipeline phase) |
internal/process/task.go |
Task struct — note LodLevel field |