From aa46cf2b4b33ab60b03b137e28fe12991c32c4c3 Mon Sep 17 00:00:00 2001 From: Aaron Po Date: Mon, 20 Apr 2026 03:31:42 -0400 Subject: [PATCH] updates --- pipeline/diagrams/activity-diagram.svg | 1 - pipeline/diagrams/class-diagram.svg | 1 - .../diagrams/future-possible-activity.puml | 453 +++++++++--------- 3 files changed, 230 insertions(+), 225 deletions(-) delete mode 100644 pipeline/diagrams/activity-diagram.svg delete mode 100644 pipeline/diagrams/class-diagram.svg diff --git a/pipeline/diagrams/activity-diagram.svg b/pipeline/diagrams/activity-diagram.svg deleted file mode 100644 index 0ec4f31..0000000 --- a/pipeline/diagrams/activity-diagram.svg +++ /dev/null @@ -1 +0,0 @@ -The Biergarten Data PipelineThe Biergarten Data PipelineValidatesmocked,model,temperature,top-p, etc.ParseArguments(argc, argv)spdlog::error usage infonoAre arguments valid?yesInit CurlGlobalState & LlamaBackendStateBinds CURLWebClient, WikipediaService,Gemma4JinjaPromptFormatter, andeither MockGenerator or LlamaGeneratordi::make_injector(...)injector.create<BiergartenDataGenerator>()BiergartenDataGenerator::Run()Return 0QueryCitiesWithCountries()Lookup failed?yesnospdlog::warn "context lookup failed"Store EnrichedCity{Location, region_context}Remaining citiesFor each sampled Location?DoneGenerateBreweries(enriched_cities)Exception thrown?yesnospdlog::warn "brewery generation failed"Store GeneratedBreweryspdlog::info dump of generated JSON fieldsLogResults()JsonLoader::LoadLocations("locations.json")std::ranges::sample(all_locations, 50)GetLocationContext(loc)FetchExtract("City, Country")FetchExtract("beer in Country")Backed by CURLWebClient::GetFetchExtract("beer in City")Generator ModeMockGeneratorLlamaGeneratorDeterministicHash(location)Select from kBreweryAdjectives, kBreweryNouns,kBreweryDescriptionsFormat BreweryResultPrepareRegionContext(region_context)LoadBrewerySystemPrompt("prompts/system.md")Format user_promptAttempt = 0Uses Gemma4JinjaPromptFormatter,llama_tokenize, and llama_sampler_sampleInfer(system_prompt, user_prompt, max_tokens, kBreweryJsonGrammar)ValidateBreweryJson(raw, brewery)Is JSON Valid?yesnomax_tokens += 700yesError == "incomplete JSON"Update user_prompt with validation errorAttempt++Attempt < 3?yesStill Invalid?yesnothrow std::runtime_errorReturn BreweryResultRemaining citiesFor each EnrichedCity?Donemain.ccBiergartenDataGeneratorJsonLoaderWikipediaServiceDataGenerator \ No newline at end of file diff --git a/pipeline/diagrams/class-diagram.svg b/pipeline/diagrams/class-diagram.svg deleted file mode 100644 index b7cb713..0000000 --- a/pipeline/diagrams/class-diagram.svg +++ /dev/null @@ -1 +0,0 @@ -The Biergarten Data Pipeline - Class DiagramThe Biergarten Data Pipeline - Class DiagramBiergartenDataGeneratorcontext_service_ : std::unique_ptr<IEnrichmentService>generator_ : std::unique_ptr<DataGenerator>generated_breweries_ : std::vector<GeneratedBrewery>Run() : boolQueryCitiesWithCountries() : std::vector<Location>GenerateBreweries(cities : std::span<const EnrichedCity>) : voidLogResults() : void«interface»IEnrichmentServiceGetLocationContext(loc : const Location&) : std::stringWikipediaServiceclient_ : std::unique_ptr<WebClient>extract_cache_ : std::unordered_map<std::string, std::string>GetLocationContext(loc : const Location&) : std::stringFetchExtract(query : std::string_view) : std::string«interface»WebClientGet(url : const std::string&) : std::stringUrlEncode(value : const std::string&) : std::stringCURLWebClientGet(url : const std::string&) : std::stringUrlEncode(value : const std::string&) : std::string«interface»DataGeneratorGenerateBrewery(location : const Location&, region_context : const std::string&) : BreweryResultGenerateUser(locale : const std::string&) : UserResultMockGeneratorGenerateBrewery(...) : BreweryResultGenerateUser(...) : UserResultDeterministicHash(location : const Location&) : size_tLlamaGeneratormodel_ : ModelHandlecontext_ : ContextHandleprompt_formatter_ : std::unique_ptr<IPromptFormatter>rng_ : std::mt19937GenerateBrewery(...) : BreweryResultGenerateUser(...) : UserResultLoad(model_path : const std::string&) : voidInfer(...) : std::stringInferFormatted(...) : std::stringLoadBrewerySystemPrompt(...) : std::string«interface»IPromptFormatterFormat(system_prompt : std::string_view, user_prompt : std::string_view) : std::stringGemma4JinjaPromptFormatterFormat(system_prompt : std::string_view, user_prompt : std::string_view) : std::stringJsonLoaderLoadLocations(filepath : const std::filesystem::path&) : std::vector<Location>ownsownsimplementsownsimplementsimplementsimplementsusesimplementsuses \ No newline at end of file diff --git a/pipeline/diagrams/future-possible-activity.puml b/pipeline/diagrams/future-possible-activity.puml index 0c27701..9cca054 100644 --- a/pipeline/diagrams/future-possible-activity.puml +++ b/pipeline/diagrams/future-possible-activity.puml @@ -1,270 +1,277 @@ @startuml skinparam style strictuml skinparam defaultFontName "DM Sans" -skinparam defaultFontSize 14 +skinparam defaultFontSize 13 skinparam titleFontName "Volkhov" skinparam titleFontSize 20 skinparam backgroundColor #FAFCF9 skinparam defaultFontColor #28342A skinparam titleFontColor #28342A skinparam ArrowColor #628A5B +skinparam SequenceLifeLineBorderColor #547461 +skinparam SequenceParticipantBorderColor #547461 +skinparam SequenceParticipantBackgroundColor #EAF0E8 +skinparam SequenceBoxBorderColor #547461 skinparam NoteBackgroundColor #EAF0E8 skinparam NoteBorderColor #547461 -skinparam ActivityBackgroundColor #FAFCF9 -skinparam ActivityBorderColor #547461 -skinparam ActivityDiamondBackgroundColor #FAFCF9 -skinparam ActivityDiamondBorderColor #628A5B -skinparam ActivityBarColor #628A5B -skinparam SwimlaneBorderColor #547461 -skinparam SwimlaneBorderThickness 0.3 +skinparam SequenceDividerBackgroundColor #EAF0E8 +skinparam SequenceDividerBorderColor #547461 -title The Biergarten Data Pipeline — Activity Diagram v3 (Mediator + Full Fixture Chain) +title The Biergarten Data Pipeline — Sequence Diagram v4 (Unified Orchestrator) -' ═════════════════════════════════════════════ +participant "main.cc" as main #F2F6F0 +participant "Orchestrator" as orch #EAF0E8 +participant "Thread U1\nUserProducer" as u1 #DCE8D8 +participant "Thread U2\nUserExportConsumer" as u2 #E0EAE0 +participant "Thread B1\nEnrichmentProducer" as b1 #DCE8D8 +participant "Thread B2\nBreweryGenerationConsumer" as b2 #E5EDE1 +participant "Thread B3\nBreweryExportConsumer" as b3 #E0EAE0 +participant "Thread R1\nBeerGenerationProducer" as r1 #DCE8D8 +participant "Thread R2\nBeerExportConsumer" as r2 #E0EAE0 +participant "Thread C1\nCheckinGenerationProducer" as c1 #DCE8D8 +participant "Thread C2\nCheckinExportConsumer" as c2 #E0EAE0 +participant "Thread G1\nRatingGenerationProducer" as g1 #DCE8D8 +participant "Thread G2\nRatingExportConsumer" as g2 #E0EAE0 + +' ───────────────────────────────────────────── ' STARTUP -' ═════════════════════════════════════════════ -|#F2F6F0|main.cc| -start -:ParseArguments(argc, argv); -if (Valid?) then (no) - :spdlog::error; - stop -else (yes) -endif -:Init CurlGlobalState & LlamaBackendState; -:Build DI injector; -note right - DataGenerator and IExportService - bound as shared_ptr — both the - orchestrator and mediator hold - a reference to the same instances. -end note -:Create BiergartenPipelineOrchestrator; -:Create BiergartenPipelineMediator\n(shared generator, shared exporter,\nJCurveCheckinStrategy); -:exporter->Initialize(); -:JsonLoader::LoadLocations("locations.json"); -:SamplingStrategy::Sample(all_locations); -:BiergartenPipelineOrchestrator::Run(); +' ───────────────────────────────────────────── +main -> main : ParseArguments(argc, argv) +alt Invalid args + main -> main : spdlog::error; stop +end -' ═════════════════════════════════════════════ -' PHASE 1 — USER GENERATION -' (independent, no FK dependencies) -' ═════════════════════════════════════════════ -|#EAF0E8|Orchestrator — Phase 1: Users| -note - Users have no FK dependencies. - Generated first so the full pool - exists before checkin weights - are assigned. +main -> main : Init CurlGlobalState & LlamaBackendState +main -> main : Build DI injector +note right of main + All dependencies bound with unique_ptr. + LlamaConfig or RestConfig injected + instead of ApplicationOptions. end note -:Spawn Thread U1 — UserProducer(sampled_locations); -:Spawn Thread U2 — UserExportConsumer(); -:Join U1, U2; -:Collect user_pool : std::vector; +main -> orch : exporter->Initialize() +note right of orch + Opens SQLite connection. + Creates schema for all five fixture types + in one DDL pass. Begins IMMEDIATE TRANSACTION. +end note -|#DCE8D8|Thread U1 — UserProducer| -while (For each Location?) is (remaining) - :generator->GenerateUser(location); - :user_channel_.Send(GeneratedUser); -endwhile (done) -:user_channel_.Close(); +main -> orch : JsonLoader::LoadLocations("locations.json") +main -> orch : ISamplingStrategy::Sample(all_locations) +main -> orch : BiergartenPipelineOrchestrator::Run() -|#E0EAE0|Thread U2 — UserExportConsumer| -while (user_channel_.Receive()?) is (available) - :exporter->ProcessUser(user) : sqlite3_int64; - note right - Returns inserted row ID. - Stored back into GeneratedUser.user_id - so mediator pool carries live FKs. +' ───────────────────────────────────────────── +' PHASE 1 — USERS +' ───────────────────────────────────────────── +== Phase 1 — Users (no FK dependencies) == + +orch -> u1 : spawn +orch -> u2 : spawn + +loop For each Location + u1 -> u1 : generator->GenerateUser(location) + u1 -> u2 : user_channel_.Send(GeneratedUser) +end +u1 -> u2 : user_channel_.Close() + +loop user_channel_.Receive() + u2 -> u2 : exporter->ProcessUser(user) : sqlite3_int64 + note right of u2 + Returns committed row ID. + Stored on GeneratedUser.user_id. end note - :Append to user_pool; -endwhile (nullopt) + u2 -> orch : Append to user_pool_ +end -|#EAF0E8|Orchestrator — Phase 1: Users| -:mediator->OnUsersComplete(user_pool); -note right - Mediator receives the full committed - user pool. Sets users_ready_ = true. - Calls TryOpenCheckinGate() — - gate stays closed until breweries - are also ready. +orch -> orch : join(U1, U2) +note right of orch + ── BARRIER 1 ── + user_pool_ is now fully committed. + Phase 2 begins. end note -' ═════════════════════════════════════════════ -' PHASE 2 — BREWERY GENERATION -' (depends on locations, runs after Phase 1) -' ═════════════════════════════════════════════ -|#EAF0E8|Orchestrator — Phase 2: Breweries| -note - Runs after Phase 1 completes. - Could be parallelised with Phase 1 - in future — FK dependency is only - on checkins, not on users directly. +' ───────────────────────────────────────────── +' PHASE 2 — BREWERIES +' ───────────────────────────────────────────── +== Phase 2 — Breweries (depends on locations only) == + +orch -> b1 : spawn +orch -> b2 : spawn +orch -> b3 : spawn + +loop For each Location + b1 -> b1 : BreweryContextStrategy::QueriesFor(location) + b1 -> b1 : WikipediaService::GetLocationContext(location,\nbrewery_context_strategy_) + alt failure + b1 -> b1 : LocationContext{ Absent } + else truncated + b1 -> b1 : LocationContext{ Partial } + else success + b1 -> b1 : LocationContext{ Full } + end + b1 -> b2 : enrichment_channel_.Send(EnrichedCity) + note right of b1 + Blocks if channel full. + Back-pressure against GPU consumer. + end note +end +b1 -> b2 : enrichment_channel_.Close() + +loop enrichment_channel_.Receive() + alt context.completeness == Absent + b2 -> b2 : spdlog::warn — proceeding with minimal prompt + end + b2 -> b2 : generator->GenerateBrewery(location, context) + b2 -> b3 : brewery_channel_.Send(GeneratedBrewery) +end +b2 -> b3 : brewery_channel_.Close() + +loop brewery_channel_.Receive() + b3 -> b3 : exporter->ProcessBrewery(brewery) : sqlite3_int64 + note right of b3 + Row ID stored on GeneratedBrewery.brewery_id. + No brewery_cache_ needed — orchestrator + threads the ID forward directly. + end note + b3 -> orch : Append to brewery_pool_ +end + +orch -> orch : join(B1, B2, B3) +note right of orch + ── BARRIER 2 ── + brewery_pool_ is now fully committed + with live brewery_id values. + Phase 3 begins. end note -:Spawn Thread B1 — EnrichmentProducer(sampled_locations); -:Spawn Thread B2 — BreweryGenerationConsumer(); -:Spawn Thread B3 — BreweryExportConsumer(); -:Join B1, B2, B3; -:Collect brewery_pool : std::vector; +' ───────────────────────────────────────────── +' PHASE 3 — BEERS +' ───────────────────────────────────────────── +== Phase 3 — Beers (depends on brewery_pool_) == -|#DCE8D8|Thread B1 — EnrichmentProducer| -while (For each Location?) is (remaining) - :BreweryContextStrategy::QueriesFor(location); - :WikipediaService::GetLocationContext\n(location, brewery_context_strategy_); - if (failure?) then (yes) - :LocationContext{ Absent }; - else if (truncated?) then (yes) - :LocationContext{ Partial }; - else (no) - :LocationContext{ Full }; - endif - :enrichment_channel_.Send(EnrichedCity); -endwhile (done) -:enrichment_channel_.Close(); +orch -> r1 : spawn +orch -> r2 : spawn -|#E5EDE1|Thread B2 — BreweryGenerationConsumer| -while (enrichment_channel_.Receive()?) is (available) - :generator->GenerateBrewery(location, context); - :brewery_channel_.Send(GeneratedBrewery); -endwhile (nullopt) -:brewery_channel_.Close(); +loop For each GeneratedBrewery in brewery_pool_ + r1 -> r1 : BeerContextStrategy::QueriesFor(location) + r1 -> r1 : WikipediaService::GetLocationContext(location,\nbeer_context_strategy_) + r1 -> r1 : generator->GenerateBeer(brewery.brewery_id,\nlocation, context) + r1 -> r2 : beer_channel_.Send(GeneratedBeer) +end +r1 -> r2 : beer_channel_.Close() -|#E0EAE0|Thread B3 — BreweryExportConsumer| -while (brewery_channel_.Receive()?) is (available) - :exporter->ProcessBrewery(brewery) : sqlite3_int64; - :Append to brewery_pool; -endwhile (nullopt) +loop beer_channel_.Receive() + r2 -> r2 : exporter->ProcessBeer(beer) : sqlite3_int64 + note right of r2 + Row ID stored on GeneratedBeer.beer_id. + end note + r2 -> orch : Append to beer_pool_ +end -|#EAF0E8|Orchestrator — Phase 2: Breweries| -:mediator->OnBreweriesComplete(brewery_pool); -note right - Mediator sets breweries_ready_ = true. - Calls TryOpenCheckinGate(). - Both flags now true — gate opens. - Mediator spawns checkin stage - asynchronously on its own thread. +orch -> orch : join(R1, R2) +note right of orch + ── BARRIER 3 ── + beer_pool_ is fully committed. + All three upstream pools ready. end note -' ═════════════════════════════════════════════ -' PHASE 3 — BEER GENERATION -' (depends on breweries) -' ═════════════════════════════════════════════ -|#EAF0E8|Orchestrator — Phase 3: Beers| -:RunBeerPhase(brewery_pool); +' ───────────────────────────────────────────── +' CHECKIN WEIGHT ASSIGNMENT +' ───────────────────────────────────────────── +== Checkin Weight Assignment == -:Spawn Thread R1 — BeerGenerationProducer(brewery_pool); -:Spawn Thread R2 — BeerExportConsumer(); -:Join R1, R2; -:Collect beer_pool : std::vector; - -|#DCE8D8|Thread R1 — BeerGenerationProducer| -while (For each GeneratedBrewery?) is (remaining) - :BeerContextStrategy::QueriesFor(location); - :WikipediaService::GetLocationContext\n(location, beer_context_strategy_); - :generator->GenerateBeer(brewery_id, location, context); - :beer_channel_.Send(GeneratedBeer); -endwhile (done) -:beer_channel_.Close(); - -|#E0EAE0|Thread R2 — BeerExportConsumer| -while (beer_channel_.Receive()?) is (available) - :exporter->ProcessBeer(beer) : sqlite3_int64; - :Append to beer_pool; -endwhile (nullopt) - -|#EAF0E8|Orchestrator — Phase 3: Beers| -:mediator->OnBeersComplete(beer_pool); -note right - Mediator stores beer_pool. - Rating stage can now reference - beer FKs. Ratings run after - checkins complete (checkin_id FK). -end note - -' ═════════════════════════════════════════════ -' MEDIATOR — CHECKIN GATE (triggered internally) -' Runs concurrently with Phase 3 -' ═════════════════════════════════════════════ -|#F0F5EE|Mediator — Checkin Stage (gated)| -note - This stage was triggered by - TryOpenCheckinGate() after both - OnUsersComplete and OnBreweriesComplete - fired. Runs concurrently with - Phase 3 beer generation. -end note - -:ICheckinDistributionStrategy::\nAssignActivityWeights(user_pool_); -note right - J-curve weights assigned across - the full user population before - any checkins are generated. +orch -> orch : ICheckinDistributionStrategy::\nAssignActivityWeights(user_pool_) +note right of orch + J-curve weights written onto + GeneratedUser.user.activity_weight. Small cohort gets high weight; long tail gets low weight. + Requires the full pool — this is why + users were committed first. end note -while (For each GeneratedUser in pool?) is (remaining) - :strategy->CheckinsForUser(user, brewery_count); - while (For each checkin index?) is (remaining) - :strategy->TimestampFor(user, index); - note right - Bursty weekend/evening +' ───────────────────────────────────────────── +' PHASE 4 — CHECKINS +' ───────────────────────────────────────────── +== Phase 4 — Check-ins (depends on user_pool_ + brewery_pool_) == + +orch -> c1 : spawn +orch -> c2 : spawn + +loop For each GeneratedUser in user_pool_ + c1 -> c1 : strategy->CheckinsForUser(user,\nbrewery_pool_.size()) + loop For each checkin index + c1 -> c1 : strategy->TimestampFor(user, index) + note right of c1 + Bursty weekend / evening distribution applied here. end note - :Select brewery from brewery_pool_\n(weighted random); - :generator->GenerateCheckin(user, brewery, timestamp); - :exporter->ProcessCheckin(checkin) : sqlite3_int64; - :mediator->OnCheckinGenerated(checkin); - endwhile (done) -endwhile (done) + c1 -> c1 : Select brewery from brewery_pool_\n(weighted random by activity_weight) + c1 -> c1 : generator->GenerateCheckin(user, brewery, timestamp) + c1 -> c2 : checkin_channel_.Send(GeneratedCheckin) + end +end +c1 -> c2 : checkin_channel_.Close() -:checkins_complete_ = true; -note right - Rating stage depends on checkin_id FK. - RunRatingStage() is called here, - after all checkins are committed. +loop checkin_channel_.Receive() + c2 -> c2 : exporter->ProcessCheckin(checkin) : sqlite3_int64 + note right of c2 + Row ID stored on GeneratedCheckin.checkin_id. + end note + c2 -> orch : Append to checkin_pool_ +end + +orch -> orch : join(C1, C2) +note right of orch + ── BARRIER 4 ── + checkin_pool_ is fully committed. + All FK dependencies for ratings satisfied. + Phase 5 begins. end note -' ═════════════════════════════════════════════ -' MEDIATOR — RATING STAGE -' Runs after checkins complete -' ═════════════════════════════════════════════ -|#F0F5EE|Mediator — Rating Stage| -note - Ratings reference user_id, beer_id, - and checkin_id. All three pools - are committed before this runs. - Strong positive skew applied - by RatingResult generation. -end note +' ───────────────────────────────────────────── +' PHASE 5 — RATINGS +' ───────────────────────────────────────────── +== Phase 5 — Ratings (depends on user_pool_ + beer_pool_ + checkin_pool_) == -while (For each GeneratedCheckin?) is (remaining) - if (Beer available for this brewery?) then (yes) - :Select beer from beer_pool_\n(match brewery_id); - :generator->GenerateRating(user, beer, checkin_id); - :exporter->ProcessRating(rating); - :mediator->OnRatingGenerated(rating); - else (no) - :Skip — no beers for this brewery yet; - endif -endwhile (done) +orch -> g1 : spawn +orch -> g2 : spawn -' ═════════════════════════════════════════════ +loop For each GeneratedCheckin in checkin_pool_ + g1 -> g1 : Resolve GeneratedUser from user_pool_\n(match user_id) + g1 -> g1 : Resolve GeneratedBeer from beer_pool_\n(match brewery_id, select one) + alt Beer found for this brewery + g1 -> g1 : generator->GenerateRating(user, beer,\ncheckin.checkin_id) + note right of g1 + Strong positive skew applied + inside GenerateRating. + end note + g1 -> g2 : rating_channel_.Send(GeneratedRating) + else No beer found + g1 -> g1 : spdlog::warn — no beer for brewery,\nskipping rating + end +end +g1 -> g2 : rating_channel_.Close() + +loop rating_channel_.Receive() + g2 -> g2 : exporter->ProcessRating(rating) +end + +orch -> orch : join(G1, G2) + +' ───────────────────────────────────────────── ' TEARDOWN -' ═════════════════════════════════════════════ -|#F2F6F0|main.cc| -:Await mediator completion; -:exporter->Finalize(); -note right - Single COMMIT covers all five - fixture types: users, breweries, - beers, checkins, ratings. - All-or-nothing consistency. +' ───────────────────────────────────────────── +== Teardown == + +orch -> main : return +main -> main : exporter->Finalize() +note right of main + Single COMMIT covers all five fixture types: + users, breweries, beers, checkins, ratings. + All-or-nothing consistency per run. end note -:spdlog::info "Pipeline complete"; -:return 0; -stop +main -> main : spdlog::info "Pipeline complete in X ms" +main -> main : return 0 @enduml