Co-authored-by: Copilot <copilot@github.com>
This commit is contained in:
Aaron Po
2026-04-23 13:49:37 -04:00
parent 8f5471d96c
commit 4331865281
3 changed files with 468 additions and 436 deletions

View File

@@ -1,25 +1,7 @@
@startuml biergarten_activity
skinparam defaultFontName "DM Sans"
!include ../biergarten-weizen-theme.puml
skinparam defaultFontSize 13
skinparam titleFontName "Volkhov"
skinparam titleFontSize 20
skinparam backgroundColor #FCFCF7
skinparam defaultFontColor #14180C
skinparam titleFontColor #14180C
skinparam ArrowColor #656F33
skinparam activityStartColor #EBECE3
skinparam activityEndColor #4A5837
skinparam activityStopColor #4A5837
skinparam ActivityBackgroundColor #EBECE3
skinparam ActivityBorderColor #4A5837
skinparam ActivityDiamondBackgroundColor #CBD2B5
skinparam ActivityDiamondBorderColor #4A5837
skinparam NoteBackgroundColor #DBEEDD
skinparam NoteFontColor #14180C
skinparam NoteBorderColor #4A5837
skinparam SwimlaneBorderColor #4A5837
skinparam SwimlaneBorderThickness 1
skinparam monochrome reverse
title The Biergarten Data Pipeline — Activity Diagram
@@ -37,8 +19,8 @@ endif
:Initialize SqliteExportService;
note right
Opens SQLite connection.
Begins a single transaction
covering all five fixture types.
(Transactions are now managed
per-phase via batching).
end note
:Create BoundedChannel<LogEntry> log_ch;
@@ -126,12 +108,18 @@ fork again
end note
fork again
|SQLite Worker|
:BEGIN TRANSACTION;
while (exp_ch has items?) is (yes)
:Receive GeneratedUser;
:ProcessUser(user);
:PipelineLogger::Log(Info, UserGeneration,\n city, user_id, "sqlite");
:Append -> user_pool_;
if (Batch size reached?) then (yes)
:COMMIT & BEGIN;
else (no)
endif
endwhile (no)
:COMMIT (Final);
end fork
|Orchestrator|
@@ -145,23 +133,24 @@ end fork
fork
|Orchestrator|
:Loop: Send Locations -> loc_ch;
:Loop: Sample User from user_pool_
and pair with Location;
:Send BreweryTask(Location, User) -> loc_ch;
:Close loc_ch;
fork again
|LLM Worker|
while (loc_ch has items?) is (yes)
:Receive Location;
:Receive BreweryTask(Location, User);
:GetLocationContextFromCache(location);
:GetLocationContextFromCache(task.location);
note right
Guaranteed cache hit from startup.
end note
:GenerateBrewery(enriched_city, context)\nvia DataGenerator;
:GenerateBrewery(enriched_city, context, task.user)\nvia DataGenerator;
note right
KV cache stays warm across all
brewery generations -- system prompt
does not change within this phase.
KV cache stays warm.
Brewery is linked to the sampled owner_user_id.
end note
:PipelineLogger::Log(Info,\n BreweryGeneration,\n city, brewery_id, "llm");
:Send GeneratedBrewery -> exp_ch;
@@ -169,12 +158,18 @@ fork again
:Close exp_ch;
fork again
|SQLite Worker|
:BEGIN TRANSACTION;
while (exp_ch has items?) is (yes)
:Receive GeneratedBrewery;
:ProcessBrewery(brewery);
:PipelineLogger::Log(Info,\n BreweryGeneration,\n city, brewery_id, "sqlite");
:Append -> brewery_pool_;
if (Batch size reached?) then (yes)
:COMMIT & BEGIN;
else (no)
endif
endwhile (no)
:COMMIT (Final);
end fork
|Orchestrator|
@@ -218,6 +213,7 @@ fork again
:Close exp_ch;
fork again
|SQLite Worker|
:BEGIN TRANSACTION;
while (exp_ch has items?) is (yes)
:Receive BeersBundle;
while (For each beer in bundle?) is (remaining)
@@ -226,7 +222,12 @@ fork again
:Append -> beer_pool_;
endwhile (done)
:PipelineLogger::Log(Info,\n BeerGeneration,\n city, brewery_id, "sqlite");
if (Batch size reached?) then (yes)
:COMMIT & BEGIN;
else (no)
endif
endwhile (no)
:COMMIT (Final);
end fork
|Orchestrator|
@@ -247,6 +248,7 @@ note right
emerges from persona distribution.
end note
:BEGIN TRANSACTION;
while (For each GeneratedUser in user_pool_?) is (remaining)
:CheckinsForUser(user, brewery_pool_.size());
while (For each checkin index?) is (remaining)
@@ -256,8 +258,13 @@ while (For each GeneratedUser in user_pool_?) is (remaining)
:ProcessCheckin(checkin);
:PipelineLogger::Log(Info, CheckinGeneration,\n nullopt, checkin_id, "sqlite");
:Append -> checkin_pool_;
if (Batch size reached?) then (yes)
:COMMIT & BEGIN;
else (no)
endif
endwhile (done)
endwhile (done)
:COMMIT (Final);
' ═══════════════════════════════════════════
' PHASE 3 — RATING GENERATION
@@ -269,33 +276,41 @@ note right
Rating skew modulated per persona.
end note
:BEGIN TRANSACTION;
while (For each GeneratedCheckin in checkin_pool_?) is (remaining)
:Match brewery_id, select beer from beer_pool_\n(same brewery_id, biased by persona affinities);
if (Beer exists for brewery?) then (yes)
:GenerateRating(user, beer, checkin_id)\nvia DataGenerator;
:ProcessRating(rating);
:PipelineLogger::Log(Info, RatingGeneration,\n nullopt, rating_id, "sqlite");
if (Batch size reached?) then (yes)
:COMMIT & BEGIN;
else (no)
endif
else (no)
:PipelineLogger::Log(Warn, RatingGeneration,\n nullopt, brewery_id, "sqlite");
:Skip -- brewery has no beers;
endif
endwhile (done)
:COMMIT (Final);
' ═══════════════════════════════════════════
' TEARDOWN
' ═══════════════════════════════════════════
|Main|
|Orchestrator|
:Finalize SqliteExportService;
note right
COMMIT covers all five fixture types.
Safely closes the DB connection.
end note
:Close log_ch;
|Main|
:spdlog::info "Pipeline complete in X ms";
:Join Log Worker;
note right
Drain guarantees no LogEntry is
dropped at shutdown.
end note
:spdlog::info "Pipeline complete in X ms";
stop
@enduml