diff --git a/pipeline/diagrams/future-possible-activity.puml b/pipeline/diagrams/future-possible-activity.puml new file mode 100644 index 0000000..a5cb3e2 --- /dev/null +++ b/pipeline/diagrams/future-possible-activity.puml @@ -0,0 +1,192 @@ +@startuml future_possible_activity +skinparam defaultFontName "DM Sans" +skinparam defaultFontSize 13 +skinparam titleFontName "Volkhov" +skinparam titleFontSize 20 +skinparam backgroundColor #FAFCF9 +skinparam defaultFontColor #28342A +skinparam titleFontColor #28342A +skinparam ArrowColor #628A5B +skinparam ActivityBackgroundColor #EAF0E8 +skinparam ActivityBorderColor #547461 +skinparam ActivityDiamondBackgroundColor #DCE8D8 +skinparam ActivityDiamondBorderColor #547461 +skinparam NoteBackgroundColor #EAF0E8 +skinparam NoteBorderColor #547461 + +title The Biergarten Data Pipeline — Activity Diagram v7 + +|Main| +start +:ParseArguments(argc, argv); +if (Invalid args?) then (yes) + :spdlog::error; + stop +else (no) +endif +:Init CurlGlobalState & LlamaBackendState; +:Build DI injector; + +:JsonLoader::LoadLocations("locations.json"); +:JsonLoader::LoadBeerStyles("beer-styles.json"); + +:EnrichmentService::PreWarmBeerStyleCache(beer_styles); +note right + **NEW**: Beer styles do not need location context. + Wikipedia summaries for the entire palette are + fetched and cached globally at startup. +end note + +:Initialize SqliteExportService; +note right + Opens SQLite connection. + Begins a single transaction + covering all five fixture types. +end note +:BiergartenPipelineOrchestrator::Run(); + +' ═══════════════════════════════════════════ +' PHASE 0 — USER GENERATION +' ═══════════════════════════════════════════ +|Orchestrator| +:RunUserPhase(sampled_locations); +:Create BoundedChannels\n(user_llm_ch, user_exp_ch); + +fork + |Orchestrator| + :Loop: Send Locations → user_llm_ch; + :Close user_llm_ch; +fork again + |LLM Worker| + while (user_llm_ch has items?) is (yes) + :Receive Location; + :GenerateUser(location)\nvia DataGenerator; + :Send GeneratedUser → user_exp_ch; + endwhile (no) + :Close user_exp_ch; +fork again + |SQLite Worker| + while (user_exp_ch has items?) is (yes) + :Receive GeneratedUser; + :ProcessUser(user) → sqlite3_int64; + :Append → user_pool_; + endwhile (no) +end fork + +|Orchestrator| +:Join LLM Worker, SQLite Worker; + +' ═══════════════════════════════════════════ +' PHASE 1 — BREWERY & BEER GENERATION +' Combined into a single dependent unit of work. +' ═══════════════════════════════════════════ +:RunBreweryAndBeerPhase(sampled_locations); +:Create BoundedChannels\n(loc_ch, llm_ch, exp_ch); + +fork + |Orchestrator| + :Loop: Send Locations → loc_ch; + :Close loc_ch; +fork again + |Enrichment Workers (xN)| + while (loc_ch has items?) is (yes) + :Receive Location; + :GetLocationContext(location,\nBreweryContextStrategy); + :Send EnrichedCity → llm_ch; + endwhile (no) + |Orchestrator| + :Join Enrichment Workers; + :Close llm_ch; +fork again + |LLM Worker| + while (llm_ch has items?) is (yes) + :Receive EnrichedCity; + + :GenerateBrewery(location, context)\nvia DataGenerator; + + :IBeerSelectionStrategy::SelectStyles(\n brewery, beer_style_palette_); + + while (For each selected BeerStyle?) is (remaining) + :GetStyleContextFromCache(style); + note right + Guaranteed cache hit from startup. + end note + :GenerateBeer(brewery, style_context)\nvia DataGenerator; + :Attach GeneratedBeer to Brewery bundle; + endwhile (done) + + :Send BreweryWithBeers Bundle → exp_ch; + note right + The next generation of a brewery is + entirely dependent on the current + brewery and its beers completing. + end note + endwhile (no) + :Close exp_ch; +fork again + |SQLite Worker| + while (exp_ch has items?) is (yes) + :Receive BreweryWithBeers Bundle; + :ProcessBrewery(brewery) → brewery_id; + :Append → brewery_pool_; + + while (For each beer in bundle?) is (remaining) + :Set beer.brewery_id = brewery_id; + :ProcessBeer(beer) → sqlite3_int64; + :Append → beer_pool_; + endwhile (done) + endwhile (no) +end fork + +|Orchestrator| +:Join LLM Worker, SQLite Worker; +note right + Both brewery_pool_ and beer_pool_ + are now completely populated. +end note + +' ═══════════════════════════════════════════ +' PHASE 2 — CHECKIN GENERATION +' Sequential now that Breweries/Beers are done. +' ═══════════════════════════════════════════ +:RunCheckinPhase(); +:ICheckinDistributionStrategy::\nAssignActivityWeights(user_pool_); + +while (For each GeneratedUser in user_pool_?) is (remaining) + :CheckinsForUser(user, brewery_pool_.size()); + while (For each checkin index?) is (remaining) + :TimestampFor(user, index); + :Select brewery from brewery_pool_; + :GenerateCheckin(user, brewery, timestamp)\nvia DataGenerator; + :ProcessCheckin(checkin) → sqlite3_int64; + :Append → checkin_pool_; + endwhile (done) +endwhile (done) + +' ═══════════════════════════════════════════ +' PHASE 3 — RATING GENERATION +' ═══════════════════════════════════════════ +:RunRatingPhase(); + +while (For each GeneratedCheckin in checkin_pool_?) is (remaining) + :Match brewery_id → select beer\nfrom beer_pool_ (same brewery_id); + if (Beer exists for brewery?) then (yes) + :GenerateRating(user, beer, checkin_id)\nvia DataGenerator; + :ProcessRating(rating); + else (no) + :Skip — brewery has no beers; + endif +endwhile (done) + +' ═══════════════════════════════════════════ +' TEARDOWN +' ═══════════════════════════════════════════ +|Main| +:Finalize SqliteExportService; +note right + COMMIT covers all five fixture types. +end note +:spdlog::info "Pipeline complete in X ms"; +stop + +@enduml diff --git a/pipeline/diagrams/future-possible-architecture.puml b/pipeline/diagrams/future-possible-architecture.puml new file mode 100644 index 0000000..b52a0b5 --- /dev/null +++ b/pipeline/diagrams/future-possible-architecture.puml @@ -0,0 +1,528 @@ +@startuml future_possible_architecture +skinparam style strictuml +skinparam defaultFontName "DM Sans" +skinparam defaultFontSize 14 +skinparam titleFontName "Volkhov" +skinparam titleFontSize 20 +skinparam backgroundColor #FAFCF9 +skinparam defaultFontColor #28342A +skinparam titleFontColor #28342A +skinparam ArrowColor #628A5B + +skinparam class { + BackgroundColor #FAFCF9 + HeaderBackgroundColor #EAF0E8 + BorderColor #547461 + ArrowColor #628A5B + FontColor #28342A +} + +skinparam note { + BackgroundColor #EAF0E8 + BorderColor #547461 + FontColor #28342A +} + +skinparam package { + BackgroundColor #F2F6F0 + BorderColor #547461 + FontColor #28342A +} + +title The Biergarten Data Pipeline — Architecture v6 + +' ───────────────────────────────────────────── +' DOMAIN: VALUE OBJECTS +' ───────────────────────────────────────────── +package "Domain: Value Objects & Contracts" { + + class Location { + + city : std::string + + state_province : std::string + + iso3166_2 : std::string + + country : std::string + + iso3166_1 : std::string + + local_languages : std::vector + + latitude : double + + longitude : double + } + + class LocationContext { + + text : std::string + + completeness : Completeness + + char_count : size_t + -- + <> Completeness + Full + Partial + Absent + } + + class EnrichedCity { + + location : Location + + context : LocationContext + } + + class BeerStyle { + + name : std::string + + description : std::string + + min_abv : float + + max_abv : float + + min_ibu : int + + max_ibu : int + } + note right of BeerStyle + Loaded once at startup from + beer-styles.json via JsonLoader. + Passed as std::span + to IBeerSelectionStrategy. + Generator receives the selected + style as a parameter — it never + reads the palette directly. + end note + + class BreweryResult { + + name_en : std::string + + description_en : std::string + + name_local : std::string + + description_local : std::string + } + + class BeerResult { + + name_en : std::string + + description_en : std::string + + name_local : std::string + + description_local : std::string + + style : std::string + + abv : float + + ibu : int + } + + class UserResult { + + username : std::string + + bio : std::string + + activity_weight : float + } + note right of UserResult + activity_weight assigned by + ICheckinDistributionStrategy + after the full user pool is + committed. Drives J-curve + checkin volume per user. + end note + + class CheckinResult { + + checked_in_at : std::string + + note : std::string + } + + class RatingResult { + + score : float + + note : std::string + } + + class GeneratedBrewery { + + brewery_id : sqlite3_int64 + + location : Location + + brewery : BreweryResult + + context_completeness : LocationContext::Completeness + + generated_at : std::string + } + + class GeneratedBeer { + + beer_id : sqlite3_int64 + + brewery_id : sqlite3_int64 + + location : Location + + beer : BeerResult + + generated_at : std::string + } + + class GeneratedUser { + + user_id : sqlite3_int64 + + location : Location + + user : UserResult + + generated_at : std::string + } + note right of GeneratedUser + user_id populated after SQLite + insert. Live FK carried in pool + for checkin and rating references. + end note + + class GeneratedCheckin { + + checkin_id : sqlite3_int64 + + user_id : sqlite3_int64 + + brewery_id : sqlite3_int64 + + checkin : CheckinResult + + generated_at : std::string + } + + class GeneratedRating { + + user_id : sqlite3_int64 + + beer_id : sqlite3_int64 + + checkin_id : sqlite3_int64 + + rating : RatingResult + + generated_at : std::string + } + +} + +' ───────────────────────────────────────────── +' DOMAIN POLICY +' ───────────────────────────────────────────── +package "Domain Policy" { + + interface IContextStrategy <> { + + QueriesFor(loc : const Location&) : std::vector + + MaxContextChars() : size_t + } + + class BreweryContextStrategy { + + QueriesFor(loc : const Location&) : std::vector + + MaxContextChars() : size_t + } + + class BeerContextStrategy { + + QueriesFor(loc : const Location&) : std::vector + + MaxContextChars() : size_t + } + + interface ISamplingStrategy <> { + + Sample(locations : const std::vector&) : std::vector + } + + class UniformSamplingStrategy { + - sample_size_ : size_t + + Sample(locations : const std::vector&) : std::vector + } + + interface IBeerSelectionStrategy <> { + + SelectStyles(brewery : const GeneratedBrewery&,\n palette : std::span) : std::vector + } + note right of IBeerSelectionStrategy + Decides how many beers a brewery + gets and which styles are selected. + Count distribution and style + deduplication logic live here, + not in the orchestrator or generator. + end note + + class RandomBeerSelectionStrategy { + - rng_ : std::mt19937 + - min_beers_ : size_t + - max_beers_ : size_t + + SelectStyles(brewery : const GeneratedBrewery&,\n palette : std::span) : std::vector + } + note right of RandomBeerSelectionStrategy + Draws a random count in [min, max]. + Samples without replacement from + palette to avoid duplicate styles + per brewery. + end note + + interface ICheckinDistributionStrategy <> { + + AssignActivityWeights(users : std::vector&) : void + + CheckinsForUser(user : const GeneratedUser&,\n brewery_count : size_t) : size_t + + TimestampFor(user : const GeneratedUser&,\n index : size_t) : std::string + } + note right of ICheckinDistributionStrategy + Owns all statistical policy: + J-curve weight assignment, + bursty weekend timestamps, + per-user checkin volume. + end note + + class JCurveCheckinStrategy { + - rng_ : std::mt19937 + + AssignActivityWeights(users : std::vector&) : void + + CheckinsForUser(user : const GeneratedUser&,\n brewery_count : size_t) : size_t + + TimestampFor(user : const GeneratedUser&,\n index : size_t) : std::string + } + +} + +' ───────────────────────────────────────────── +' INFRASTRUCTURE: ENRICHMENT +' ───────────────────────────────────────────── +package "Infrastructure: Enrichment" { + + interface IEnrichmentService <> { + + GetLocationContext(loc : const Location&,\n strategy : const IContextStrategy&) : LocationContext + } + + class WikipediaService { + - client_ : std::unique_ptr + - extract_cache_ : std::unordered_map + + GetLocationContext(loc : const Location&,\n strategy : const IContextStrategy&) : LocationContext + - FetchExtract(query : std::string_view) : std::string + } + note right of WikipediaService + extract_cache_ keyed by query string. + Beer pass gets near-100% cache hits + since locations were already fetched + during the brewery pass. + end note + + interface WebClient <> { + + Get(url : const std::string&) : std::string + + UrlEncode(value : const std::string&) : std::string + } + + class CURLWebClient { + + Get(url : const std::string&) : std::string + + UrlEncode(value : const std::string&) : std::string + } + +} + +' ───────────────────────────────────────────── +' INFRASTRUCTURE: GENERATION +' ───────────────────────────────────────────── +package "Infrastructure: Generation" { + + interface DataGenerator <> { + + GenerateBrewery(location : const Location&,\n context : const LocationContext&) : BreweryResult + + GenerateBeer(brewery_id : sqlite3_int64,\n location : const Location&,\n context : const LocationContext&,\n style : const BeerStyle&) : BeerResult + + GenerateUser(location : const Location&) : UserResult + + GenerateCheckin(user : const GeneratedUser&,\n brewery : const GeneratedBrewery&,\n timestamp : const std::string&) : CheckinResult + + GenerateRating(user : const GeneratedUser&,\n beer : const GeneratedBeer&,\n checkin_id : sqlite3_int64) : RatingResult + } + note right of DataGenerator + GenerateBeer receives BeerStyle + as a parameter. Style selection + and count decisions live in + IBeerSelectionStrategy, not here. + end note + + class MockGenerator { + + GenerateBrewery(...) : BreweryResult + + GenerateBeer(...) : BeerResult + + GenerateUser(...) : UserResult + + GenerateCheckin(...) : CheckinResult + + GenerateRating(...) : RatingResult + - DeterministicHash(location : const Location&) : size_t + } + + class LlamaGenerator { + - model_ : ModelHandle + - context_ : ContextHandle + - prompt_formatter_ : std::unique_ptr + - config_ : LlamaConfig + - rng_ : std::mt19937 + + GenerateBrewery(...) : BreweryResult + + GenerateBeer(...) : BeerResult + + GenerateUser(...) : UserResult + + GenerateCheckin(...) : CheckinResult + + GenerateRating(...) : RatingResult + - Load(config : const LlamaConfig&) : void + - Infer(system_prompt, user_prompt,\n max_tokens, grammar) : std::string + - ValidateModelArchitecture() : void + } + + interface IPromptFormatter <> { + + Format(system_prompt : std::string_view,\n user_prompt : std::string_view) : std::string + + ExpectedArchitecture() : std::string_view + } + + class Gemma4JinjaPromptFormatter { + + Format(...) : std::string + + ExpectedArchitecture() : std::string_view + } + + class LlamaConfig { + + model_path : std::string + + temperature : float + + top_p : float + + top_k : uint32_t + + n_ctx : uint32_t + + seed : int + } + +} + +' ───────────────────────────────────────────── +' INFRASTRUCTURE: PIPELINE CHANNEL +' ───────────────────────────────────────────── +package "Infrastructure: Pipeline Channel" { + + class "BoundedChannel" as BoundedChannel { + - queue_ : std::queue + - mutex_ : std::mutex + - not_full_ : std::condition_variable + - not_empty_ : std::condition_variable + - capacity_ : size_t + - closed_ : bool + + Send(item : T) : void + + Receive() : std::optional + + Close() : void + } + note right of BoundedChannel + Used for user, brewery, and + checkin/rating phases. + Beer phase uses a simple + sequential loop — enrichment + is all cache hits, no fan-out + needed. + end note + +} + +' ───────────────────────────────────────────── +' INFRASTRUCTURE: EXPORT +' ───────────────────────────────────────────── +package "Infrastructure: Export" { + + interface IExportService <> { + + Initialize() : void + + ProcessBrewery(brewery : const GeneratedBrewery&) : sqlite3_int64 + + ProcessBeer(beer : const GeneratedBeer&) : sqlite3_int64 + + ProcessUser(user : const GeneratedUser&) : sqlite3_int64 + + ProcessCheckin(checkin : const GeneratedCheckin&) : sqlite3_int64 + + ProcessRating(rating : const GeneratedRating&) : void + + Finalize() : void + } + + class SqliteExportService { + - date_time_provider_ : std::unique_ptr + - db_handle_ : SqliteDatabaseHandle + - insert_location_stmt_ : SqliteStatementHandle + - insert_brewery_stmt_ : SqliteStatementHandle + - insert_beer_stmt_ : SqliteStatementHandle + - insert_user_stmt_ : SqliteStatementHandle + - insert_checkin_stmt_ : SqliteStatementHandle + - insert_rating_stmt_ : SqliteStatementHandle + - transaction_open_ : bool + - location_cache_ : std::unordered_map + - brewery_cache_ : std::unordered_map + + Initialize() : void + + ProcessBrewery(brewery : const GeneratedBrewery&) : sqlite3_int64 + + ProcessBeer(beer : const GeneratedBeer&) : sqlite3_int64 + + ProcessUser(user : const GeneratedUser&) : sqlite3_int64 + + ProcessCheckin(checkin : const GeneratedCheckin&) : sqlite3_int64 + + ProcessRating(rating : const GeneratedRating&) : void + + Finalize() : void + - InitializeSchema() : void + - PrepareStatements() : void + - RollbackAndCloseNoThrow() : void + - FinalizeStatements() : void + } + note right of SqliteExportService + brewery_cache_ restored. + Keyed by location string for + location deduplication, and + by brewery identity for beer + FK resolution without re-querying. + end note + + interface IDateTimeProvider <> { + + GetUtcTimestamp() : std::string + } + + class SystemDateTimeProvider { + + GetUtcTimestamp() : std::string + } + +} + +' ───────────────────────────────────────────── +' ORCHESTRATION +' ───────────────────────────────────────────── +package "Orchestration" { + + class BiergartenPipelineOrchestrator { + - enrichment_service_ : std::unique_ptr + - generator_ : std::unique_ptr + - exporter_ : std::unique_ptr + - brewery_context_strategy_ : std::unique_ptr + - beer_context_strategy_ : std::unique_ptr + - sampling_strategy_ : std::unique_ptr + - beer_selection_strategy_ : std::unique_ptr + - checkin_strategy_ : std::unique_ptr + - beer_style_palette_ : std::vector + -- + - user_pool_ : std::vector + - brewery_pool_ : std::vector + - beer_pool_ : std::vector + - checkin_pool_ : std::vector + -- + + Run() : bool + - RunUserPhase(locations : const std::vector&) : void + - RunBreweryPhase(locations : const std::vector&) : void + - RunBeerPhase() : void + - RunCheckinPhase() : void + - RunRatingPhase() : void + } + note right of BiergartenPipelineOrchestrator + beer_style_palette_ loaded once + at startup from beer-styles.json. + Passed as std::span + to IBeerSelectionStrategy per brewery. + RunBeerPhase() is a sequential loop — + no channels, no fan-out. Enrichment + is cache hits; LLM is the only cost. + end note + + class JsonLoader { + + {static} LoadLocations(filepath : const std::filesystem::path&) : std::vector + + {static} LoadBeerStyles(filepath : const std::filesystem::path&) : std::vector + } + note right of JsonLoader + LoadBeerStyles() added. + Reads beer-styles.json once + at startup into the palette + held by the orchestrator. + end note + +} + +' ───────────────────────────────────────────── +' RELATIONSHIPS +' ───────────────────────────────────────────── + +' Orchestration +BiergartenPipelineOrchestrator *-- IEnrichmentService : owns +BiergartenPipelineOrchestrator *-- DataGenerator : owns +BiergartenPipelineOrchestrator *-- IExportService : owns +BiergartenPipelineOrchestrator *-- ICheckinDistributionStrategy : owns +BiergartenPipelineOrchestrator *-- ISamplingStrategy : owns +BiergartenPipelineOrchestrator *-- IBeerSelectionStrategy : owns +BiergartenPipelineOrchestrator ..> JsonLoader : uses + +' Policy implementations +IContextStrategy <|.. BreweryContextStrategy : implements +IContextStrategy <|.. BeerContextStrategy : implements +ISamplingStrategy <|.. UniformSamplingStrategy : implements +IBeerSelectionStrategy <|.. RandomBeerSelectionStrategy : implements +ICheckinDistributionStrategy <|.. JCurveCheckinStrategy : implements + +' Enrichment +IEnrichmentService <|.. WikipediaService : implements +WikipediaService *-- WebClient : owns +WikipediaService ..> IContextStrategy : uses (parameter) +WebClient <|.. CURLWebClient : implements + +' Generation +DataGenerator <|.. MockGenerator : implements +DataGenerator <|.. LlamaGenerator : implements +LlamaGenerator *-- IPromptFormatter : owns +LlamaGenerator ..> LlamaConfig : constructed with +IPromptFormatter <|.. Gemma4JinjaPromptFormatter : implements + +' Export +IExportService <|.. SqliteExportService : implements +SqliteExportService *-- IDateTimeProvider : owns +IDateTimeProvider <|.. SystemDateTimeProvider : implements + +' Domain containment +EnrichedCity *-- Location : contains +EnrichedCity *-- LocationContext : contains +GeneratedBrewery *-- Location : contains +GeneratedBrewery *-- BreweryResult : contains +GeneratedBeer *-- Location : contains +GeneratedBeer *-- BeerResult : contains +GeneratedUser *-- Location : contains +GeneratedUser *-- UserResult : contains +GeneratedCheckin *-- CheckinResult : contains +GeneratedRating *-- RatingResult : contains + +@enduml