From fd6ba35f68e9112017ac82a60a5563d0e2c0365b Mon Sep 17 00:00:00 2001 From: Aaron Po Date: Sun, 19 Apr 2026 23:10:08 -0400 Subject: [PATCH 1/2] add future plans --- .../diagrams/future-possible-activity.puml | 270 ++++++++++ .../future-possible-architecture.puml | 501 ++++++++++++++++++ 2 files changed, 771 insertions(+) create mode 100644 pipeline/diagrams/future-possible-activity.puml create mode 100644 pipeline/diagrams/future-possible-architecture.puml diff --git a/pipeline/diagrams/future-possible-activity.puml b/pipeline/diagrams/future-possible-activity.puml new file mode 100644 index 0000000..0c27701 --- /dev/null +++ b/pipeline/diagrams/future-possible-activity.puml @@ -0,0 +1,270 @@ +@startuml +skinparam style strictuml +skinparam defaultFontName "DM Sans" +skinparam defaultFontSize 14 +skinparam titleFontName "Volkhov" +skinparam titleFontSize 20 +skinparam backgroundColor #FAFCF9 +skinparam defaultFontColor #28342A +skinparam titleFontColor #28342A +skinparam ArrowColor #628A5B +skinparam NoteBackgroundColor #EAF0E8 +skinparam NoteBorderColor #547461 +skinparam ActivityBackgroundColor #FAFCF9 +skinparam ActivityBorderColor #547461 +skinparam ActivityDiamondBackgroundColor #FAFCF9 +skinparam ActivityDiamondBorderColor #628A5B +skinparam ActivityBarColor #628A5B +skinparam SwimlaneBorderColor #547461 +skinparam SwimlaneBorderThickness 0.3 + +title The Biergarten Data Pipeline — Activity Diagram v3 (Mediator + Full Fixture Chain) + +' ═════════════════════════════════════════════ +' STARTUP +' ═════════════════════════════════════════════ +|#F2F6F0|main.cc| +start +:ParseArguments(argc, argv); +if (Valid?) then (no) + :spdlog::error; + stop +else (yes) +endif +:Init CurlGlobalState & LlamaBackendState; +:Build DI injector; +note right + DataGenerator and IExportService + bound as shared_ptr — both the + orchestrator and mediator hold + a reference to the same instances. +end note +:Create BiergartenPipelineOrchestrator; +:Create BiergartenPipelineMediator\n(shared generator, shared exporter,\nJCurveCheckinStrategy); +:exporter->Initialize(); +:JsonLoader::LoadLocations("locations.json"); +:SamplingStrategy::Sample(all_locations); +:BiergartenPipelineOrchestrator::Run(); + +' ═════════════════════════════════════════════ +' PHASE 1 — USER GENERATION +' (independent, no FK dependencies) +' ═════════════════════════════════════════════ +|#EAF0E8|Orchestrator — Phase 1: Users| +note + Users have no FK dependencies. + Generated first so the full pool + exists before checkin weights + are assigned. +end note + +:Spawn Thread U1 — UserProducer(sampled_locations); +:Spawn Thread U2 — UserExportConsumer(); +:Join U1, U2; +:Collect user_pool : std::vector; + +|#DCE8D8|Thread U1 — UserProducer| +while (For each Location?) is (remaining) + :generator->GenerateUser(location); + :user_channel_.Send(GeneratedUser); +endwhile (done) +:user_channel_.Close(); + +|#E0EAE0|Thread U2 — UserExportConsumer| +while (user_channel_.Receive()?) is (available) + :exporter->ProcessUser(user) : sqlite3_int64; + note right + Returns inserted row ID. + Stored back into GeneratedUser.user_id + so mediator pool carries live FKs. + end note + :Append to user_pool; +endwhile (nullopt) + +|#EAF0E8|Orchestrator — Phase 1: Users| +:mediator->OnUsersComplete(user_pool); +note right + Mediator receives the full committed + user pool. Sets users_ready_ = true. + Calls TryOpenCheckinGate() — + gate stays closed until breweries + are also ready. +end note + +' ═════════════════════════════════════════════ +' PHASE 2 — BREWERY GENERATION +' (depends on locations, runs after Phase 1) +' ═════════════════════════════════════════════ +|#EAF0E8|Orchestrator — Phase 2: Breweries| +note + Runs after Phase 1 completes. + Could be parallelised with Phase 1 + in future — FK dependency is only + on checkins, not on users directly. +end note + +:Spawn Thread B1 — EnrichmentProducer(sampled_locations); +:Spawn Thread B2 — BreweryGenerationConsumer(); +:Spawn Thread B3 — BreweryExportConsumer(); +:Join B1, B2, B3; +:Collect brewery_pool : std::vector; + +|#DCE8D8|Thread B1 — EnrichmentProducer| +while (For each Location?) is (remaining) + :BreweryContextStrategy::QueriesFor(location); + :WikipediaService::GetLocationContext\n(location, brewery_context_strategy_); + if (failure?) then (yes) + :LocationContext{ Absent }; + else if (truncated?) then (yes) + :LocationContext{ Partial }; + else (no) + :LocationContext{ Full }; + endif + :enrichment_channel_.Send(EnrichedCity); +endwhile (done) +:enrichment_channel_.Close(); + +|#E5EDE1|Thread B2 — BreweryGenerationConsumer| +while (enrichment_channel_.Receive()?) is (available) + :generator->GenerateBrewery(location, context); + :brewery_channel_.Send(GeneratedBrewery); +endwhile (nullopt) +:brewery_channel_.Close(); + +|#E0EAE0|Thread B3 — BreweryExportConsumer| +while (brewery_channel_.Receive()?) is (available) + :exporter->ProcessBrewery(brewery) : sqlite3_int64; + :Append to brewery_pool; +endwhile (nullopt) + +|#EAF0E8|Orchestrator — Phase 2: Breweries| +:mediator->OnBreweriesComplete(brewery_pool); +note right + Mediator sets breweries_ready_ = true. + Calls TryOpenCheckinGate(). + Both flags now true — gate opens. + Mediator spawns checkin stage + asynchronously on its own thread. +end note + +' ═════════════════════════════════════════════ +' PHASE 3 — BEER GENERATION +' (depends on breweries) +' ═════════════════════════════════════════════ +|#EAF0E8|Orchestrator — Phase 3: Beers| +:RunBeerPhase(brewery_pool); + +:Spawn Thread R1 — BeerGenerationProducer(brewery_pool); +:Spawn Thread R2 — BeerExportConsumer(); +:Join R1, R2; +:Collect beer_pool : std::vector; + +|#DCE8D8|Thread R1 — BeerGenerationProducer| +while (For each GeneratedBrewery?) is (remaining) + :BeerContextStrategy::QueriesFor(location); + :WikipediaService::GetLocationContext\n(location, beer_context_strategy_); + :generator->GenerateBeer(brewery_id, location, context); + :beer_channel_.Send(GeneratedBeer); +endwhile (done) +:beer_channel_.Close(); + +|#E0EAE0|Thread R2 — BeerExportConsumer| +while (beer_channel_.Receive()?) is (available) + :exporter->ProcessBeer(beer) : sqlite3_int64; + :Append to beer_pool; +endwhile (nullopt) + +|#EAF0E8|Orchestrator — Phase 3: Beers| +:mediator->OnBeersComplete(beer_pool); +note right + Mediator stores beer_pool. + Rating stage can now reference + beer FKs. Ratings run after + checkins complete (checkin_id FK). +end note + +' ═════════════════════════════════════════════ +' MEDIATOR — CHECKIN GATE (triggered internally) +' Runs concurrently with Phase 3 +' ═════════════════════════════════════════════ +|#F0F5EE|Mediator — Checkin Stage (gated)| +note + This stage was triggered by + TryOpenCheckinGate() after both + OnUsersComplete and OnBreweriesComplete + fired. Runs concurrently with + Phase 3 beer generation. +end note + +:ICheckinDistributionStrategy::\nAssignActivityWeights(user_pool_); +note right + J-curve weights assigned across + the full user population before + any checkins are generated. + Small cohort gets high weight; + long tail gets low weight. +end note + +while (For each GeneratedUser in pool?) is (remaining) + :strategy->CheckinsForUser(user, brewery_count); + while (For each checkin index?) is (remaining) + :strategy->TimestampFor(user, index); + note right + Bursty weekend/evening + distribution applied here. + end note + :Select brewery from brewery_pool_\n(weighted random); + :generator->GenerateCheckin(user, brewery, timestamp); + :exporter->ProcessCheckin(checkin) : sqlite3_int64; + :mediator->OnCheckinGenerated(checkin); + endwhile (done) +endwhile (done) + +:checkins_complete_ = true; +note right + Rating stage depends on checkin_id FK. + RunRatingStage() is called here, + after all checkins are committed. +end note + +' ═════════════════════════════════════════════ +' MEDIATOR — RATING STAGE +' Runs after checkins complete +' ═════════════════════════════════════════════ +|#F0F5EE|Mediator — Rating Stage| +note + Ratings reference user_id, beer_id, + and checkin_id. All three pools + are committed before this runs. + Strong positive skew applied + by RatingResult generation. +end note + +while (For each GeneratedCheckin?) is (remaining) + if (Beer available for this brewery?) then (yes) + :Select beer from beer_pool_\n(match brewery_id); + :generator->GenerateRating(user, beer, checkin_id); + :exporter->ProcessRating(rating); + :mediator->OnRatingGenerated(rating); + else (no) + :Skip — no beers for this brewery yet; + endif +endwhile (done) + +' ═════════════════════════════════════════════ +' TEARDOWN +' ═════════════════════════════════════════════ +|#F2F6F0|main.cc| +:Await mediator completion; +:exporter->Finalize(); +note right + Single COMMIT covers all five + fixture types: users, breweries, + beers, checkins, ratings. + All-or-nothing consistency. +end note +:spdlog::info "Pipeline complete"; +:return 0; +stop + +@enduml diff --git a/pipeline/diagrams/future-possible-architecture.puml b/pipeline/diagrams/future-possible-architecture.puml new file mode 100644 index 0000000..39983a2 --- /dev/null +++ b/pipeline/diagrams/future-possible-architecture.puml @@ -0,0 +1,501 @@ +@startuml +skinparam style strictuml +skinparam defaultFontName "DM Sans" +skinparam defaultFontSize 14 +skinparam titleFontName "Volkhov" +skinparam titleFontSize 20 +skinparam backgroundColor #FAFCF9 +skinparam defaultFontColor #28342A +skinparam titleFontColor #28342A +skinparam ArrowColor #628A5B + +skinparam class { + BackgroundColor #FAFCF9 + HeaderBackgroundColor #EAF0E8 + BorderColor #547461 + ArrowColor #628A5B + FontColor #28342A +} + +skinparam note { + BackgroundColor #EAF0E8 + BorderColor #547461 + FontColor #28342A +} + +skinparam package { + BackgroundColor #F2F6F0 + BorderColor #547461 + FontColor #28342A +} + +title The Biergarten Data Pipeline — Architecture v4 (Unified Orchestrator) + +' ───────────────────────────────────────────── +' DOMAIN: VALUE OBJECTS +' ───────────────────────────────────────────── +package "Domain: Value Objects & Contracts" { + + class Location { + + city : std::string + + state_province : std::string + + iso3166_2 : std::string + + country : std::string + + iso3166_1 : std::string + + local_languages : std::vector + + latitude : double + + longitude : double + } + + class LocationContext { + + text : std::string + + completeness : Completeness + + char_count : size_t + -- + <> Completeness + Full + Partial + Absent + } + + class EnrichedCity { + + location : Location + + context : LocationContext + } + + class BreweryResult { + + name_en : std::string + + description_en : std::string + + name_local : std::string + + description_local : std::string + } + + class BeerResult { + + name_en : std::string + + description_en : std::string + + name_local : std::string + + description_local : std::string + + style : std::string + + abv : float + + ibu : int + } + + class UserResult { + + username : std::string + + bio : std::string + + activity_weight : float + } + note right of UserResult + activity_weight assigned by + ICheckinDistributionStrategy + after the full user pool is + committed. Drives J-curve + checkin volume per user. + end note + + class CheckinResult { + + checked_in_at : std::string + + note : std::string + } + + class RatingResult { + + score : float + + note : std::string + } + + class GeneratedBrewery { + + brewery_id : sqlite3_int64 + + location : Location + + brewery : BreweryResult + + context_completeness : LocationContext::Completeness + + generated_at : std::string + } + + class GeneratedBeer { + + beer_id : sqlite3_int64 + + brewery_id : sqlite3_int64 + + location : Location + + beer : BeerResult + + generated_at : std::string + } + + class GeneratedUser { + + user_id : sqlite3_int64 + + location : Location + + user : UserResult + + generated_at : std::string + } + + class GeneratedCheckin { + + checkin_id : sqlite3_int64 + + user_id : sqlite3_int64 + + brewery_id : sqlite3_int64 + + checkin : CheckinResult + + generated_at : std::string + } + + class GeneratedRating { + + user_id : sqlite3_int64 + + beer_id : sqlite3_int64 + + checkin_id : sqlite3_int64 + + rating : RatingResult + + generated_at : std::string + } + +} + +' ───────────────────────────────────────────── +' DOMAIN POLICY +' ───────────────────────────────────────────── +package "Domain Policy" { + + interface IContextStrategy <> { + + QueriesFor(loc : const Location&) : std::vector + + MaxContextChars() : size_t + } + + class BreweryContextStrategy { + + QueriesFor(loc : const Location&) : std::vector + + MaxContextChars() : size_t + } + + class BeerContextStrategy { + + QueriesFor(loc : const Location&) : std::vector + + MaxContextChars() : size_t + } + + interface ISamplingStrategy <> { + + Sample(locations : const std::vector&) : std::vector + } + + class UniformSamplingStrategy { + - sample_size_ : size_t + + Sample(locations : const std::vector&) : std::vector + } + + interface ICheckinDistributionStrategy <> { + + AssignActivityWeights(users : std::vector&) : void + + CheckinsForUser(user : const GeneratedUser&, brewery_count : size_t) : size_t + + TimestampFor(user : const GeneratedUser&, index : size_t) : std::string + } + note right of ICheckinDistributionStrategy + Injected into the orchestrator. + Owns all statistical policy: + J-curve weight assignment, + bursty weekend timestamps, + per-user checkin volume. + No mediator required to hold this — + the orchestrator calls it directly + before the checkin phase opens. + end note + + class JCurveCheckinStrategy { + - rng_ : std::mt19937 + + AssignActivityWeights(users : std::vector&) : void + + CheckinsForUser(user : const GeneratedUser&, brewery_count : size_t) : size_t + + TimestampFor(user : const GeneratedUser&, index : size_t) : std::string + } + +} + +' ───────────────────────────────────────────── +' INFRASTRUCTURE: ENRICHMENT +' ───────────────────────────────────────────── +package "Infrastructure: Enrichment" { + + interface IEnrichmentService <> { + + GetLocationContext(loc : const Location&, strategy : const IContextStrategy&) : LocationContext + } + + class WikipediaService { + - client_ : std::unique_ptr + - extract_cache_ : std::unordered_map + + GetLocationContext(loc : const Location&, strategy : const IContextStrategy&) : LocationContext + - FetchExtract(query : std::string_view) : std::string + } + + interface WebClient <> { + + Get(url : const std::string&) : std::string + + UrlEncode(value : const std::string&) : std::string + } + + class CURLWebClient { + + Get(url : const std::string&) : std::string + + UrlEncode(value : const std::string&) : std::string + } + +} + +' ───────────────────────────────────────────── +' INFRASTRUCTURE: GENERATION +' ───────────────────────────────────────────── +package "Infrastructure: Generation" { + + interface DataGenerator <> { + + GenerateBrewery(location : const Location&, context : const LocationContext&) : BreweryResult + + GenerateBeer(brewery_id : sqlite3_int64, location : const Location&, context : const LocationContext&) : BeerResult + + GenerateUser(location : const Location&) : UserResult + + GenerateCheckin(user : const GeneratedUser&, brewery : const GeneratedBrewery&, timestamp : const std::string&) : CheckinResult + + GenerateRating(user : const GeneratedUser&, beer : const GeneratedBeer&, checkin_id : sqlite3_int64) : RatingResult + } + + class MockGenerator { + + GenerateBrewery(...) : BreweryResult + + GenerateBeer(...) : BeerResult + + GenerateUser(...) : UserResult + + GenerateCheckin(...) : CheckinResult + + GenerateRating(...) : RatingResult + - DeterministicHash(location : const Location&) : size_t + } + + class LlamaGenerator { + - model_ : ModelHandle + - context_ : ContextHandle + - prompt_formatter_ : std::unique_ptr + - config_ : LlamaConfig + - rng_ : std::mt19937 + + GenerateBrewery(...) : BreweryResult + + GenerateBeer(...) : BeerResult + + GenerateUser(...) : UserResult + + GenerateCheckin(...) : CheckinResult + + GenerateRating(...) : RatingResult + - Load(config : const LlamaConfig&) : void + - Infer(system_prompt, user_prompt, max_tokens, grammar) : std::string + - ValidateModelArchitecture() : void + } + + class RestGenerator { + - config_ : RestConfig + + GenerateBrewery(...) : BreweryResult + + GenerateBeer(...) : BeerResult + + GenerateUser(...) : UserResult + + GenerateCheckin(...) : CheckinResult + + GenerateRating(...) : RatingResult + } + note right of RestGenerator + Future REST-backed implementation. + Slots in at the DI root with zero + changes to orchestration logic. + end note + + interface IPromptFormatter <> { + + Format(system_prompt : std::string_view, user_prompt : std::string_view) : std::string + + ExpectedArchitecture() : std::string_view + } + + class Gemma4JinjaPromptFormatter { + + Format(...) : std::string + + ExpectedArchitecture() : std::string_view + } + + class LlamaConfig { + + model_path : std::string + + temperature : float + + top_p : float + + top_k : uint32_t + + n_ctx : uint32_t + + seed : int + } + + class RestConfig { + + endpoint : std::string + + api_key : std::string + + timeout : std::chrono::milliseconds + } + +} + +' ───────────────────────────────────────────── +' INFRASTRUCTURE: PIPELINE CHANNEL +' ───────────────────────────────────────────── +package "Infrastructure: Pipeline Channel" { + + class "BoundedChannel" as BoundedChannel { + - queue_ : std::queue + - mutex_ : std::mutex + - not_full_ : std::condition_variable + - not_empty_ : std::condition_variable + - capacity_ : size_t + - closed_ : bool + + Send(item : T) : void + + Receive() : std::optional + + Close() : void + } + note right of BoundedChannel + Used within each phase to + decouple production from export. + Phase boundaries are explicit + sequential barriers in the + orchestrator's Run() method — + not channel-mediated. + end note + +} + +' ───────────────────────────────────────────── +' INFRASTRUCTURE: EXPORT +' ───────────────────────────────────────────── +package "Infrastructure: Export" { + + interface IExportService <> { + + Initialize() : void + + ProcessBrewery(brewery : const GeneratedBrewery&) : sqlite3_int64 + + ProcessBeer(beer : const GeneratedBeer&) : sqlite3_int64 + + ProcessUser(user : const GeneratedUser&) : sqlite3_int64 + + ProcessCheckin(checkin : const GeneratedCheckin&) : sqlite3_int64 + + ProcessRating(rating : const GeneratedRating&) : void + + Finalize() : void + } + note right of IExportService + Process* methods return + sqlite3_int64 row IDs. + Orchestrator uses these to + populate FK fields on all + downstream fixture types. + end note + + class SqliteExportService { + - date_time_provider_ : std::unique_ptr + - db_handle_ : SqliteDatabaseHandle + - insert_location_stmt_ : SqliteStatementHandle + - insert_brewery_stmt_ : SqliteStatementHandle + - insert_beer_stmt_ : SqliteStatementHandle + - insert_user_stmt_ : SqliteStatementHandle + - insert_checkin_stmt_ : SqliteStatementHandle + - insert_rating_stmt_ : SqliteStatementHandle + - transaction_open_ : bool + - location_cache_ : std::unordered_map + + Initialize() : void + + ProcessBrewery(brewery : const GeneratedBrewery&) : sqlite3_int64 + + ProcessBeer(beer : const GeneratedBeer&) : sqlite3_int64 + + ProcessUser(user : const GeneratedUser&) : sqlite3_int64 + + ProcessCheckin(checkin : const GeneratedCheckin&) : sqlite3_int64 + + ProcessRating(rating : const GeneratedRating&) : void + + Finalize() : void + - InitializeSchema() : void + - PrepareStatements() : void + - RollbackAndCloseNoThrow() : void + - FinalizeStatements() : void + } + note right of SqliteExportService + brewery_cache_ removed — row IDs + are now carried on GeneratedBrewery + and GeneratedBeer value objects + and threaded through by the + orchestrator directly. + end note + + interface IDateTimeProvider <> { + + GetUtcTimestamp() : std::string + } + + class SystemDateTimeProvider { + + GetUtcTimestamp() : std::string + } + +} + +' ───────────────────────────────────────────── +' ORCHESTRATION +' ───────────────────────────────────────────── +package "Orchestration" { + + class BiergartenPipelineOrchestrator { + - enrichment_service_ : std::unique_ptr + - generator_ : std::unique_ptr + - exporter_ : std::unique_ptr + - brewery_context_strategy_ : std::unique_ptr + - beer_context_strategy_ : std::unique_ptr + - sampling_strategy_ : std::unique_ptr + - checkin_strategy_ : std::unique_ptr + -- + - user_pool_ : std::vector + - brewery_pool_ : std::vector + - beer_pool_ : std::vector + - checkin_pool_ : std::vector + -- + + Run() : bool + - RunUserPhase(locations : const std::vector&) : void + - RunBreweryPhase(locations : const std::vector&) : void + - RunBeerPhase() : void + - RunCheckinPhase() : void + - RunRatingPhase() : void + } + note right of BiergartenPipelineOrchestrator + Single component owns all + sequencing. Run() reads as a + linear narrative: + 1. RunUserPhase + 2. RunBreweryPhase + 3. RunBeerPhase + 4. checkin_strategy_->AssignActivityWeights + 5. RunCheckinPhase + 6. RunRatingPhase + The checkin gate is an explicit + sequential barrier between steps + 3 and 5 — not a hidden internal + trigger in a separate object. + Pools are members: each phase + appends to them and the next + phase reads from them directly. + No mediator. No shared_ptr. + Ownership is unambiguous. + end note + + class JsonLoader { + + {static} LoadLocations(filepath : const std::filesystem::path&) : std::vector + } + +} + +' ───────────────────────────────────────────── +' RELATIONSHIPS +' ───────────────────────────────────────────── + +' Orchestration +BiergartenPipelineOrchestrator *-- IEnrichmentService : owns +BiergartenPipelineOrchestrator *-- DataGenerator : owns +BiergartenPipelineOrchestrator *-- IExportService : owns +BiergartenPipelineOrchestrator *-- ICheckinDistributionStrategy : owns +BiergartenPipelineOrchestrator *-- ISamplingStrategy : owns +BiergartenPipelineOrchestrator ..> JsonLoader : uses + +' Policy implementations +IContextStrategy <|.. BreweryContextStrategy : implements +IContextStrategy <|.. BeerContextStrategy : implements +ISamplingStrategy <|.. UniformSamplingStrategy : implements +ICheckinDistributionStrategy <|.. JCurveCheckinStrategy : implements + +' Enrichment +IEnrichmentService <|.. WikipediaService : implements +WikipediaService *-- WebClient : owns +WikipediaService ..> IContextStrategy : uses (parameter) +WebClient <|.. CURLWebClient : implements + +' Generation +DataGenerator <|.. MockGenerator : implements +DataGenerator <|.. LlamaGenerator : implements +DataGenerator <|.. RestGenerator : implements +LlamaGenerator *-- IPromptFormatter : owns +LlamaGenerator ..> LlamaConfig : constructed with +RestGenerator ..> RestConfig : constructed with +IPromptFormatter <|.. Gemma4JinjaPromptFormatter : implements + +' Export +IExportService <|.. SqliteExportService : implements +SqliteExportService *-- IDateTimeProvider : owns +IDateTimeProvider <|.. SystemDateTimeProvider : implements + +' Data flow +EnrichedCity *-- Location : contains +EnrichedCity *-- LocationContext : contains +GeneratedBrewery *-- Location : contains +GeneratedBrewery *-- BreweryResult : contains +GeneratedBeer *-- Location : contains +GeneratedBeer *-- BeerResult : contains +GeneratedUser *-- Location : contains +GeneratedUser *-- UserResult : contains +GeneratedCheckin *-- CheckinResult : contains +GeneratedRating *-- RatingResult : contains + +@enduml From aa46cf2b4b33ab60b03b137e28fe12991c32c4c3 Mon Sep 17 00:00:00 2001 From: Aaron Po Date: Mon, 20 Apr 2026 03:31:42 -0400 Subject: [PATCH 2/2] updates --- pipeline/diagrams/activity-diagram.svg | 1 - pipeline/diagrams/class-diagram.svg | 1 - .../diagrams/future-possible-activity.puml | 453 +++++++++--------- 3 files changed, 230 insertions(+), 225 deletions(-) delete mode 100644 pipeline/diagrams/activity-diagram.svg delete mode 100644 pipeline/diagrams/class-diagram.svg diff --git a/pipeline/diagrams/activity-diagram.svg b/pipeline/diagrams/activity-diagram.svg deleted file mode 100644 index 0ec4f31..0000000 --- a/pipeline/diagrams/activity-diagram.svg +++ /dev/null @@ -1 +0,0 @@ -The Biergarten Data PipelineThe Biergarten Data PipelineValidatesmocked,model,temperature,top-p, etc.ParseArguments(argc, argv)spdlog::error usage infonoAre arguments valid?yesInit CurlGlobalState & LlamaBackendStateBinds CURLWebClient, WikipediaService,Gemma4JinjaPromptFormatter, andeither MockGenerator or LlamaGeneratordi::make_injector(...)injector.create<BiergartenDataGenerator>()BiergartenDataGenerator::Run()Return 0QueryCitiesWithCountries()Lookup failed?yesnospdlog::warn "context lookup failed"Store EnrichedCity{Location, region_context}Remaining citiesFor each sampled Location?DoneGenerateBreweries(enriched_cities)Exception thrown?yesnospdlog::warn "brewery generation failed"Store GeneratedBreweryspdlog::info dump of generated JSON fieldsLogResults()JsonLoader::LoadLocations("locations.json")std::ranges::sample(all_locations, 50)GetLocationContext(loc)FetchExtract("City, Country")FetchExtract("beer in Country")Backed by CURLWebClient::GetFetchExtract("beer in City")Generator ModeMockGeneratorLlamaGeneratorDeterministicHash(location)Select from kBreweryAdjectives, kBreweryNouns,kBreweryDescriptionsFormat BreweryResultPrepareRegionContext(region_context)LoadBrewerySystemPrompt("prompts/system.md")Format user_promptAttempt = 0Uses Gemma4JinjaPromptFormatter,llama_tokenize, and llama_sampler_sampleInfer(system_prompt, user_prompt, max_tokens, kBreweryJsonGrammar)ValidateBreweryJson(raw, brewery)Is JSON Valid?yesnomax_tokens += 700yesError == "incomplete JSON"Update user_prompt with validation errorAttempt++Attempt < 3?yesStill Invalid?yesnothrow std::runtime_errorReturn BreweryResultRemaining citiesFor each EnrichedCity?Donemain.ccBiergartenDataGeneratorJsonLoaderWikipediaServiceDataGenerator \ No newline at end of file diff --git a/pipeline/diagrams/class-diagram.svg b/pipeline/diagrams/class-diagram.svg deleted file mode 100644 index b7cb713..0000000 --- a/pipeline/diagrams/class-diagram.svg +++ /dev/null @@ -1 +0,0 @@ -The Biergarten Data Pipeline - Class DiagramThe Biergarten Data Pipeline - Class DiagramBiergartenDataGeneratorcontext_service_ : std::unique_ptr<IEnrichmentService>generator_ : std::unique_ptr<DataGenerator>generated_breweries_ : std::vector<GeneratedBrewery>Run() : boolQueryCitiesWithCountries() : std::vector<Location>GenerateBreweries(cities : std::span<const EnrichedCity>) : voidLogResults() : void«interface»IEnrichmentServiceGetLocationContext(loc : const Location&) : std::stringWikipediaServiceclient_ : std::unique_ptr<WebClient>extract_cache_ : std::unordered_map<std::string, std::string>GetLocationContext(loc : const Location&) : std::stringFetchExtract(query : std::string_view) : std::string«interface»WebClientGet(url : const std::string&) : std::stringUrlEncode(value : const std::string&) : std::stringCURLWebClientGet(url : const std::string&) : std::stringUrlEncode(value : const std::string&) : std::string«interface»DataGeneratorGenerateBrewery(location : const Location&, region_context : const std::string&) : BreweryResultGenerateUser(locale : const std::string&) : UserResultMockGeneratorGenerateBrewery(...) : BreweryResultGenerateUser(...) : UserResultDeterministicHash(location : const Location&) : size_tLlamaGeneratormodel_ : ModelHandlecontext_ : ContextHandleprompt_formatter_ : std::unique_ptr<IPromptFormatter>rng_ : std::mt19937GenerateBrewery(...) : BreweryResultGenerateUser(...) : UserResultLoad(model_path : const std::string&) : voidInfer(...) : std::stringInferFormatted(...) : std::stringLoadBrewerySystemPrompt(...) : std::string«interface»IPromptFormatterFormat(system_prompt : std::string_view, user_prompt : std::string_view) : std::stringGemma4JinjaPromptFormatterFormat(system_prompt : std::string_view, user_prompt : std::string_view) : std::stringJsonLoaderLoadLocations(filepath : const std::filesystem::path&) : std::vector<Location>ownsownsimplementsownsimplementsimplementsimplementsusesimplementsuses \ No newline at end of file diff --git a/pipeline/diagrams/future-possible-activity.puml b/pipeline/diagrams/future-possible-activity.puml index 0c27701..9cca054 100644 --- a/pipeline/diagrams/future-possible-activity.puml +++ b/pipeline/diagrams/future-possible-activity.puml @@ -1,270 +1,277 @@ @startuml skinparam style strictuml skinparam defaultFontName "DM Sans" -skinparam defaultFontSize 14 +skinparam defaultFontSize 13 skinparam titleFontName "Volkhov" skinparam titleFontSize 20 skinparam backgroundColor #FAFCF9 skinparam defaultFontColor #28342A skinparam titleFontColor #28342A skinparam ArrowColor #628A5B +skinparam SequenceLifeLineBorderColor #547461 +skinparam SequenceParticipantBorderColor #547461 +skinparam SequenceParticipantBackgroundColor #EAF0E8 +skinparam SequenceBoxBorderColor #547461 skinparam NoteBackgroundColor #EAF0E8 skinparam NoteBorderColor #547461 -skinparam ActivityBackgroundColor #FAFCF9 -skinparam ActivityBorderColor #547461 -skinparam ActivityDiamondBackgroundColor #FAFCF9 -skinparam ActivityDiamondBorderColor #628A5B -skinparam ActivityBarColor #628A5B -skinparam SwimlaneBorderColor #547461 -skinparam SwimlaneBorderThickness 0.3 +skinparam SequenceDividerBackgroundColor #EAF0E8 +skinparam SequenceDividerBorderColor #547461 -title The Biergarten Data Pipeline — Activity Diagram v3 (Mediator + Full Fixture Chain) +title The Biergarten Data Pipeline — Sequence Diagram v4 (Unified Orchestrator) -' ═════════════════════════════════════════════ +participant "main.cc" as main #F2F6F0 +participant "Orchestrator" as orch #EAF0E8 +participant "Thread U1\nUserProducer" as u1 #DCE8D8 +participant "Thread U2\nUserExportConsumer" as u2 #E0EAE0 +participant "Thread B1\nEnrichmentProducer" as b1 #DCE8D8 +participant "Thread B2\nBreweryGenerationConsumer" as b2 #E5EDE1 +participant "Thread B3\nBreweryExportConsumer" as b3 #E0EAE0 +participant "Thread R1\nBeerGenerationProducer" as r1 #DCE8D8 +participant "Thread R2\nBeerExportConsumer" as r2 #E0EAE0 +participant "Thread C1\nCheckinGenerationProducer" as c1 #DCE8D8 +participant "Thread C2\nCheckinExportConsumer" as c2 #E0EAE0 +participant "Thread G1\nRatingGenerationProducer" as g1 #DCE8D8 +participant "Thread G2\nRatingExportConsumer" as g2 #E0EAE0 + +' ───────────────────────────────────────────── ' STARTUP -' ═════════════════════════════════════════════ -|#F2F6F0|main.cc| -start -:ParseArguments(argc, argv); -if (Valid?) then (no) - :spdlog::error; - stop -else (yes) -endif -:Init CurlGlobalState & LlamaBackendState; -:Build DI injector; -note right - DataGenerator and IExportService - bound as shared_ptr — both the - orchestrator and mediator hold - a reference to the same instances. -end note -:Create BiergartenPipelineOrchestrator; -:Create BiergartenPipelineMediator\n(shared generator, shared exporter,\nJCurveCheckinStrategy); -:exporter->Initialize(); -:JsonLoader::LoadLocations("locations.json"); -:SamplingStrategy::Sample(all_locations); -:BiergartenPipelineOrchestrator::Run(); +' ───────────────────────────────────────────── +main -> main : ParseArguments(argc, argv) +alt Invalid args + main -> main : spdlog::error; stop +end -' ═════════════════════════════════════════════ -' PHASE 1 — USER GENERATION -' (independent, no FK dependencies) -' ═════════════════════════════════════════════ -|#EAF0E8|Orchestrator — Phase 1: Users| -note - Users have no FK dependencies. - Generated first so the full pool - exists before checkin weights - are assigned. +main -> main : Init CurlGlobalState & LlamaBackendState +main -> main : Build DI injector +note right of main + All dependencies bound with unique_ptr. + LlamaConfig or RestConfig injected + instead of ApplicationOptions. end note -:Spawn Thread U1 — UserProducer(sampled_locations); -:Spawn Thread U2 — UserExportConsumer(); -:Join U1, U2; -:Collect user_pool : std::vector; +main -> orch : exporter->Initialize() +note right of orch + Opens SQLite connection. + Creates schema for all five fixture types + in one DDL pass. Begins IMMEDIATE TRANSACTION. +end note -|#DCE8D8|Thread U1 — UserProducer| -while (For each Location?) is (remaining) - :generator->GenerateUser(location); - :user_channel_.Send(GeneratedUser); -endwhile (done) -:user_channel_.Close(); +main -> orch : JsonLoader::LoadLocations("locations.json") +main -> orch : ISamplingStrategy::Sample(all_locations) +main -> orch : BiergartenPipelineOrchestrator::Run() -|#E0EAE0|Thread U2 — UserExportConsumer| -while (user_channel_.Receive()?) is (available) - :exporter->ProcessUser(user) : sqlite3_int64; - note right - Returns inserted row ID. - Stored back into GeneratedUser.user_id - so mediator pool carries live FKs. +' ───────────────────────────────────────────── +' PHASE 1 — USERS +' ───────────────────────────────────────────── +== Phase 1 — Users (no FK dependencies) == + +orch -> u1 : spawn +orch -> u2 : spawn + +loop For each Location + u1 -> u1 : generator->GenerateUser(location) + u1 -> u2 : user_channel_.Send(GeneratedUser) +end +u1 -> u2 : user_channel_.Close() + +loop user_channel_.Receive() + u2 -> u2 : exporter->ProcessUser(user) : sqlite3_int64 + note right of u2 + Returns committed row ID. + Stored on GeneratedUser.user_id. end note - :Append to user_pool; -endwhile (nullopt) + u2 -> orch : Append to user_pool_ +end -|#EAF0E8|Orchestrator — Phase 1: Users| -:mediator->OnUsersComplete(user_pool); -note right - Mediator receives the full committed - user pool. Sets users_ready_ = true. - Calls TryOpenCheckinGate() — - gate stays closed until breweries - are also ready. +orch -> orch : join(U1, U2) +note right of orch + ── BARRIER 1 ── + user_pool_ is now fully committed. + Phase 2 begins. end note -' ═════════════════════════════════════════════ -' PHASE 2 — BREWERY GENERATION -' (depends on locations, runs after Phase 1) -' ═════════════════════════════════════════════ -|#EAF0E8|Orchestrator — Phase 2: Breweries| -note - Runs after Phase 1 completes. - Could be parallelised with Phase 1 - in future — FK dependency is only - on checkins, not on users directly. +' ───────────────────────────────────────────── +' PHASE 2 — BREWERIES +' ───────────────────────────────────────────── +== Phase 2 — Breweries (depends on locations only) == + +orch -> b1 : spawn +orch -> b2 : spawn +orch -> b3 : spawn + +loop For each Location + b1 -> b1 : BreweryContextStrategy::QueriesFor(location) + b1 -> b1 : WikipediaService::GetLocationContext(location,\nbrewery_context_strategy_) + alt failure + b1 -> b1 : LocationContext{ Absent } + else truncated + b1 -> b1 : LocationContext{ Partial } + else success + b1 -> b1 : LocationContext{ Full } + end + b1 -> b2 : enrichment_channel_.Send(EnrichedCity) + note right of b1 + Blocks if channel full. + Back-pressure against GPU consumer. + end note +end +b1 -> b2 : enrichment_channel_.Close() + +loop enrichment_channel_.Receive() + alt context.completeness == Absent + b2 -> b2 : spdlog::warn — proceeding with minimal prompt + end + b2 -> b2 : generator->GenerateBrewery(location, context) + b2 -> b3 : brewery_channel_.Send(GeneratedBrewery) +end +b2 -> b3 : brewery_channel_.Close() + +loop brewery_channel_.Receive() + b3 -> b3 : exporter->ProcessBrewery(brewery) : sqlite3_int64 + note right of b3 + Row ID stored on GeneratedBrewery.brewery_id. + No brewery_cache_ needed — orchestrator + threads the ID forward directly. + end note + b3 -> orch : Append to brewery_pool_ +end + +orch -> orch : join(B1, B2, B3) +note right of orch + ── BARRIER 2 ── + brewery_pool_ is now fully committed + with live brewery_id values. + Phase 3 begins. end note -:Spawn Thread B1 — EnrichmentProducer(sampled_locations); -:Spawn Thread B2 — BreweryGenerationConsumer(); -:Spawn Thread B3 — BreweryExportConsumer(); -:Join B1, B2, B3; -:Collect brewery_pool : std::vector; +' ───────────────────────────────────────────── +' PHASE 3 — BEERS +' ───────────────────────────────────────────── +== Phase 3 — Beers (depends on brewery_pool_) == -|#DCE8D8|Thread B1 — EnrichmentProducer| -while (For each Location?) is (remaining) - :BreweryContextStrategy::QueriesFor(location); - :WikipediaService::GetLocationContext\n(location, brewery_context_strategy_); - if (failure?) then (yes) - :LocationContext{ Absent }; - else if (truncated?) then (yes) - :LocationContext{ Partial }; - else (no) - :LocationContext{ Full }; - endif - :enrichment_channel_.Send(EnrichedCity); -endwhile (done) -:enrichment_channel_.Close(); +orch -> r1 : spawn +orch -> r2 : spawn -|#E5EDE1|Thread B2 — BreweryGenerationConsumer| -while (enrichment_channel_.Receive()?) is (available) - :generator->GenerateBrewery(location, context); - :brewery_channel_.Send(GeneratedBrewery); -endwhile (nullopt) -:brewery_channel_.Close(); +loop For each GeneratedBrewery in brewery_pool_ + r1 -> r1 : BeerContextStrategy::QueriesFor(location) + r1 -> r1 : WikipediaService::GetLocationContext(location,\nbeer_context_strategy_) + r1 -> r1 : generator->GenerateBeer(brewery.brewery_id,\nlocation, context) + r1 -> r2 : beer_channel_.Send(GeneratedBeer) +end +r1 -> r2 : beer_channel_.Close() -|#E0EAE0|Thread B3 — BreweryExportConsumer| -while (brewery_channel_.Receive()?) is (available) - :exporter->ProcessBrewery(brewery) : sqlite3_int64; - :Append to brewery_pool; -endwhile (nullopt) +loop beer_channel_.Receive() + r2 -> r2 : exporter->ProcessBeer(beer) : sqlite3_int64 + note right of r2 + Row ID stored on GeneratedBeer.beer_id. + end note + r2 -> orch : Append to beer_pool_ +end -|#EAF0E8|Orchestrator — Phase 2: Breweries| -:mediator->OnBreweriesComplete(brewery_pool); -note right - Mediator sets breweries_ready_ = true. - Calls TryOpenCheckinGate(). - Both flags now true — gate opens. - Mediator spawns checkin stage - asynchronously on its own thread. +orch -> orch : join(R1, R2) +note right of orch + ── BARRIER 3 ── + beer_pool_ is fully committed. + All three upstream pools ready. end note -' ═════════════════════════════════════════════ -' PHASE 3 — BEER GENERATION -' (depends on breweries) -' ═════════════════════════════════════════════ -|#EAF0E8|Orchestrator — Phase 3: Beers| -:RunBeerPhase(brewery_pool); +' ───────────────────────────────────────────── +' CHECKIN WEIGHT ASSIGNMENT +' ───────────────────────────────────────────── +== Checkin Weight Assignment == -:Spawn Thread R1 — BeerGenerationProducer(brewery_pool); -:Spawn Thread R2 — BeerExportConsumer(); -:Join R1, R2; -:Collect beer_pool : std::vector; - -|#DCE8D8|Thread R1 — BeerGenerationProducer| -while (For each GeneratedBrewery?) is (remaining) - :BeerContextStrategy::QueriesFor(location); - :WikipediaService::GetLocationContext\n(location, beer_context_strategy_); - :generator->GenerateBeer(brewery_id, location, context); - :beer_channel_.Send(GeneratedBeer); -endwhile (done) -:beer_channel_.Close(); - -|#E0EAE0|Thread R2 — BeerExportConsumer| -while (beer_channel_.Receive()?) is (available) - :exporter->ProcessBeer(beer) : sqlite3_int64; - :Append to beer_pool; -endwhile (nullopt) - -|#EAF0E8|Orchestrator — Phase 3: Beers| -:mediator->OnBeersComplete(beer_pool); -note right - Mediator stores beer_pool. - Rating stage can now reference - beer FKs. Ratings run after - checkins complete (checkin_id FK). -end note - -' ═════════════════════════════════════════════ -' MEDIATOR — CHECKIN GATE (triggered internally) -' Runs concurrently with Phase 3 -' ═════════════════════════════════════════════ -|#F0F5EE|Mediator — Checkin Stage (gated)| -note - This stage was triggered by - TryOpenCheckinGate() after both - OnUsersComplete and OnBreweriesComplete - fired. Runs concurrently with - Phase 3 beer generation. -end note - -:ICheckinDistributionStrategy::\nAssignActivityWeights(user_pool_); -note right - J-curve weights assigned across - the full user population before - any checkins are generated. +orch -> orch : ICheckinDistributionStrategy::\nAssignActivityWeights(user_pool_) +note right of orch + J-curve weights written onto + GeneratedUser.user.activity_weight. Small cohort gets high weight; long tail gets low weight. + Requires the full pool — this is why + users were committed first. end note -while (For each GeneratedUser in pool?) is (remaining) - :strategy->CheckinsForUser(user, brewery_count); - while (For each checkin index?) is (remaining) - :strategy->TimestampFor(user, index); - note right - Bursty weekend/evening +' ───────────────────────────────────────────── +' PHASE 4 — CHECKINS +' ───────────────────────────────────────────── +== Phase 4 — Check-ins (depends on user_pool_ + brewery_pool_) == + +orch -> c1 : spawn +orch -> c2 : spawn + +loop For each GeneratedUser in user_pool_ + c1 -> c1 : strategy->CheckinsForUser(user,\nbrewery_pool_.size()) + loop For each checkin index + c1 -> c1 : strategy->TimestampFor(user, index) + note right of c1 + Bursty weekend / evening distribution applied here. end note - :Select brewery from brewery_pool_\n(weighted random); - :generator->GenerateCheckin(user, brewery, timestamp); - :exporter->ProcessCheckin(checkin) : sqlite3_int64; - :mediator->OnCheckinGenerated(checkin); - endwhile (done) -endwhile (done) + c1 -> c1 : Select brewery from brewery_pool_\n(weighted random by activity_weight) + c1 -> c1 : generator->GenerateCheckin(user, brewery, timestamp) + c1 -> c2 : checkin_channel_.Send(GeneratedCheckin) + end +end +c1 -> c2 : checkin_channel_.Close() -:checkins_complete_ = true; -note right - Rating stage depends on checkin_id FK. - RunRatingStage() is called here, - after all checkins are committed. +loop checkin_channel_.Receive() + c2 -> c2 : exporter->ProcessCheckin(checkin) : sqlite3_int64 + note right of c2 + Row ID stored on GeneratedCheckin.checkin_id. + end note + c2 -> orch : Append to checkin_pool_ +end + +orch -> orch : join(C1, C2) +note right of orch + ── BARRIER 4 ── + checkin_pool_ is fully committed. + All FK dependencies for ratings satisfied. + Phase 5 begins. end note -' ═════════════════════════════════════════════ -' MEDIATOR — RATING STAGE -' Runs after checkins complete -' ═════════════════════════════════════════════ -|#F0F5EE|Mediator — Rating Stage| -note - Ratings reference user_id, beer_id, - and checkin_id. All three pools - are committed before this runs. - Strong positive skew applied - by RatingResult generation. -end note +' ───────────────────────────────────────────── +' PHASE 5 — RATINGS +' ───────────────────────────────────────────── +== Phase 5 — Ratings (depends on user_pool_ + beer_pool_ + checkin_pool_) == -while (For each GeneratedCheckin?) is (remaining) - if (Beer available for this brewery?) then (yes) - :Select beer from beer_pool_\n(match brewery_id); - :generator->GenerateRating(user, beer, checkin_id); - :exporter->ProcessRating(rating); - :mediator->OnRatingGenerated(rating); - else (no) - :Skip — no beers for this brewery yet; - endif -endwhile (done) +orch -> g1 : spawn +orch -> g2 : spawn -' ═════════════════════════════════════════════ +loop For each GeneratedCheckin in checkin_pool_ + g1 -> g1 : Resolve GeneratedUser from user_pool_\n(match user_id) + g1 -> g1 : Resolve GeneratedBeer from beer_pool_\n(match brewery_id, select one) + alt Beer found for this brewery + g1 -> g1 : generator->GenerateRating(user, beer,\ncheckin.checkin_id) + note right of g1 + Strong positive skew applied + inside GenerateRating. + end note + g1 -> g2 : rating_channel_.Send(GeneratedRating) + else No beer found + g1 -> g1 : spdlog::warn — no beer for brewery,\nskipping rating + end +end +g1 -> g2 : rating_channel_.Close() + +loop rating_channel_.Receive() + g2 -> g2 : exporter->ProcessRating(rating) +end + +orch -> orch : join(G1, G2) + +' ───────────────────────────────────────────── ' TEARDOWN -' ═════════════════════════════════════════════ -|#F2F6F0|main.cc| -:Await mediator completion; -:exporter->Finalize(); -note right - Single COMMIT covers all five - fixture types: users, breweries, - beers, checkins, ratings. - All-or-nothing consistency. +' ───────────────────────────────────────────── +== Teardown == + +orch -> main : return +main -> main : exporter->Finalize() +note right of main + Single COMMIT covers all five fixture types: + users, breweries, beers, checkins, ratings. + All-or-nothing consistency per run. end note -:spdlog::info "Pipeline complete"; -:return 0; -stop +main -> main : spdlog::info "Pipeline complete in X ms" +main -> main : return 0 @enduml